pub struct Wire<'a> { /* private fields */ }Expand description
A mechanism to interact ergonomically with an interface using synchronous Rust I/O buffers.
This wire sets up an internal message queue of data needing to be sent, allowing bidirectional messaging even over locked buffers (such as stdio) without leading to race conditions.
Note that the process of reading messages from the other side of the wire will implicitly perform procedure calls to satisfy their requests. As such,
any failures in procedures will be propagated upward, failing this wire. If this behaviour is undesired, you should manually restart calls like .fill()
on any errors. Even in server scenarios, however, this behaviour is usually desired, as sending a response back to the client is extremely difficult.
Genuinely falible procedures are fully supported, and errors will be sent over the wire, by “failures in procedures”, we are typically referring to
errors in serialising return types for transmission, or the like.
§Termination
An important concept in IPFI is that there can be many Wires to one Interface, and that, although it’s generally not a good idea, there
can be many input/output buffers for a single Wire. However, fundamentally a wire represents a connection between the local program and some remote
program, which means that, when the remote program terminates, continuing to use the wire is invalid. As such, wires maintain an internal flag
that notes whether or not the other side has terminated them yet, and, if this flag is found to be set, all operation on the wire will fail.
The main method used for reading data into an IPFI Interface is wire.fill(), which will attempt to read as many full messages as it can before
one ends (messages can be sent in as many partials as the sender pleases). If an UnexpectedEof error occurs here, the termination flag will
automatically be set under the assumption that the remote program has terminated. In some rare cases, you may wish to recover from this, which can
be done by creating a new Wire (i.e. once a wire has been terminated, it can no longer be used, and all method calls will immediately fail).
§Procedure calls
§Call indices
IPFI manages procedure calls through a system of procedure indices (e.g. the print_hello() procedure might be assigned index 0, you as the programmer
control this) and call indices. Since a procedure call can be done piecemeal, with even fractions of arguments being sent as raw bytes, the remote
program must be able to know which partials are associated with each other so it can accumulate the partials into one place for later assembly. This
is done through call indices, which are essentially counters maintained by each wire that mark how many times it has executed a certain procedure.
Then, on the remote side, messages received on the same wire from the same call index of a certain procedure index will be grouped together and
interpreted as one set of arguments when a message is sent that marks the call as complete, allowing it to be properly executed.
§Transmitting arguments as bytes
There are several low-level methods within this struct that allow you to transmit partial arguments to a remote procedure directly, although
doing so is somewhat precarious. When you register a procedure locally, IPFI expects that procedure to take a tuple with an arbitrary number of
elements, but it will accept arguments in any list type (e.g. a procedure that takes three bools could have its arguments provided on the
remote caller as vec![true, false, true], [true, false, true], or (true, false, true)), which will then be serialized to a MessagePack
list type, which is fundamentally the serialization of all the component elements, preceded by a length prefix. It is the caller’s responsibility
to perform this serialization manually, ignoring the length prefix, which will be added on on the remote when the call is marked as complete.
This system allows the extreme flexibility of sending not just one argument at a time, but even fractions of single arguments, enabling advanced
streaming use-cases and interoperability with the underlying single-byte IPFI interface functions.
You should also be aware that sending zero bytes for your arguments will be interpreted by the remote as a termination order, and the message buffer it uses to receive the arguments for that particular procedure call will be closed. This would prevent sending any further arguments, and likely lead to a corrupt procedure call.
Note that, unless you’re working on extremely low-level applications, 99% of the time the .call() method will be absolutely fine for you, and
if you want to send a few complete arguments at a time, you can use the partial methods that let you provide something implementing [ProcedureArgs],
a trait that will perform the underlying serialization for you.
Implementations§
Source§impl<'a> Wire<'a>
impl<'a> Wire<'a>
Sourcepub fn new(interface: &'a Interface) -> Self
pub fn new(interface: &'a Interface) -> Self
Creates a new buffer-based wire to work with the given interface. This takes in the interface to work with and a writeable buffer to use for termination when this wire is dropped.
§Security
This method implicitly allows this wire to call functions on the remote, which is fine, except that calling a function means
you need to receive some kind of response (that reception will be done automatically by .fill(). it does not require waiting
on the call handle, that is only used to get the response to you and deserialize it). Response messages do not have access to
local message buffers, and their security is tightly controlled, however they present an additional attack surface that may be
totally unnecessary in module-style applications.
In short, although there are no known security holes in the procedure response system, programs that exist to have their procedures
called by others (called ‘module-style programs’), and that will not call procedures themselves, should prefer Wire::new_module,
which disables several wire features to reduce the attack surface.
Sourcepub fn new_module(interface: &'a Interface) -> Self
pub fn new_module(interface: &'a Interface) -> Self
Creates a new buffer-based wire to work with the given interface. This is the same as .new(), except it disables support for
response messages, which are unnecessary in some contexts, where they would only present an additional attack surface. If you
intend for this wire to be used by the remote to call local functions, but not the other way around, you should use this method.
Sourcepub fn max_ongoing_procedures(self, value: IpfiInteger) -> Self
pub fn max_ongoing_procedures(self, value: IpfiInteger) -> Self
Sets the maximum number of ongoing procedures that the other end of this wire may invoke. “Ongoing procedures” refer to two things: procedures that are still accumulating arguments, and procedures that haven’t finished executing yet. By setting a maximum number for these and rejecting any past that, we can prevent denial-of-service attacks that try to overload our end of the wire by starting too many procedure calls at once.
The default for this is 10, but this value may change without warning until v1.0, so you’re advised to set this manually when DoS protection matters!
For protecting a server, this should be combined with Self::new_module, which will prevent this wire from
processing response messages, which can be used to exhaust memory if used maliciously.
Sourcepub fn is_terminated(&self) -> bool
pub fn is_terminated(&self) -> bool
Asks the wire whether or not it has been terminated. This polls an internal flag that can be read by many threads simultaneously, and as such this operation is cheap.
See the struct documentation for further information about wire termination.
Sourcepub async fn call(
&self,
procedure_idx: ProcedureIndex,
args: impl ProcedureArgs,
) -> Result<CallHandle<'_>, Error>
pub async fn call( &self, procedure_idx: ProcedureIndex, args: impl ProcedureArgs, ) -> Result<CallHandle<'_>, Error>
Calls the procedure with the given remote procedure index. This will return a handle you can use to block waiting for the return value of the procedure.
Generally, this should be preferred as a high-level method, although several lower-level methods are available for sending one argument at a time, or similar piecemeal use-cases.
Sourcepub async fn start_call_with_partial_args(
&self,
procedure_idx: ProcedureIndex,
args: impl ProcedureArgs,
) -> Result<CallIndex, Error>
pub async fn start_call_with_partial_args( &self, procedure_idx: ProcedureIndex, args: impl ProcedureArgs, ) -> Result<CallIndex, Error>
Calls the procedure with the given remote procedure index. This will prepare the local interface for a response also. This function will transmit the given argument buffer assuming that it does not know all the arguments, and it will leave the remote buffer that stores these arguments open.
This will return the call index for this execution, for later reference in continuing or finishing the call. The index of the local message buffer where the response is held will be returned when the call is completed.
This is one of several low-level procedure calling methods, and you probably want to use .call() instead.
Sourcepub async fn start_call_with_partial_bytes(
&self,
procedure_idx: ProcedureIndex,
args: &[u8],
) -> Result<CallIndex, Error>
pub async fn start_call_with_partial_bytes( &self, procedure_idx: ProcedureIndex, args: &[u8], ) -> Result<CallIndex, Error>
Same as .start_call_with_partial_args(), but this works directly with bytes, allowing you to send strange things
like a two-thirds of an argument.
This is one of several low-level procedure calling methods, and you probably want to use .call() instead.
Sourcepub async fn call_with_bytes(
&self,
procedure_idx: ProcedureIndex,
args: &[u8],
) -> Result<CallHandle<'_>, Error>
pub async fn call_with_bytes( &self, procedure_idx: ProcedureIndex, args: &[u8], ) -> Result<CallHandle<'_>, Error>
Same as .call(), but this works directly with bytes. You must be careful to provide the full byte serialization here,
and be sure to follow the above guidance on this! (I.e. you must not include the length marker.)
This is one of several low-level procedure calling methods, and you probably want to use .call() instead.
Sourcepub fn continue_given_call_with_args(
&self,
procedure_idx: ProcedureIndex,
call_idx: CallIndex,
args: impl ProcedureArgs,
) -> Result<(), Error>
pub fn continue_given_call_with_args( &self, procedure_idx: ProcedureIndex, call_idx: CallIndex, args: impl ProcedureArgs, ) -> Result<(), Error>
Continues the procedure call with the given remote procedure index and call index by sending the given arguments. This will not terminate the message, and will leave it open for calling.
For an explanation of how call indices work, see Wire.
This is one of several low-level procedure calling methods, and you probably want to use .call() instead.
Sourcepub fn continue_given_call_with_bytes(
&self,
procedure_idx: ProcedureIndex,
call_idx: CallIndex,
args: &[u8],
) -> Result<(), Error>
pub fn continue_given_call_with_bytes( &self, procedure_idx: ProcedureIndex, call_idx: CallIndex, args: &[u8], ) -> Result<(), Error>
Same as .continue_given_call_with_args(), but this works directly with bytes.
This is one of several low-level procedure calling methods, and you probably want to use .call() instead.
Sourcepub fn end_given_call(
&self,
procedure_idx: ProcedureIndex,
call_idx: CallIndex,
) -> Result<CallHandle<'_>, Error>
pub fn end_given_call( &self, procedure_idx: ProcedureIndex, call_idx: CallIndex, ) -> Result<CallHandle<'_>, Error>
Terminates the given call by sending a zero-length argument payload.
This will return a handle the caller can use to wait on the return value of the remote procedure.
This is one of several low-level procedure calling methods, and you probably want to use .call() instead.
Sourcepub fn signal_end_of_input(&self) -> Result<(), Error>
pub fn signal_end_of_input(&self) -> Result<(), Error>
Writes a manual end-of-input signal to the output, which, when flushed (potentially automatically if you’ve called wire.start()),
will cause any wire.fill() calls in the remote program to return None, which can be checked for termination. This is
necessary when communicating with single-threaded programs, which must read all their input at once, to tell them to stop reading
and start doing other work. This does not signal the termination of the wire, or even that there will not be any input in future,
it simply allows you to signal to the remote that it should start doing something else. Internally, the reception of this case is
generally not handled, and it is up to you as a user to handle it.
Sourcepub fn signal_termination(&self)
pub fn signal_termination(&self)
Writes a termination signal to the output, which will permanently neuter this connection, and all further operations on this wire will fail. This will not mark this wire as terminated internally, but it will lead to the termination of the other side’s wire. To mark this wire as terminated (thereby halting ongoing procedures, etc.), you should drop it, or allow it to receive a termination signal from the other side.
This does a similar thing to signal_termination, except that this also requires flushing, and it can be more useful if wire.open
has consumed the writer handle. However, if you need to send a termination signal when your program shuts down after any errors,
signal_termination should be preferred.
Sourcepub async fn receive_one(
&self,
reader: &mut (impl AsyncRead + Unpin),
) -> Result<Option<bool>, Error>
pub async fn receive_one( &self, reader: &mut (impl AsyncRead + Unpin), ) -> Result<Option<bool>, Error>
Receives a single message from the given reader, sending it into the interface as appropriate. This contains the core logic that handles messages, partial procedure calls, etc.
If a procedure call is completed in this read, this method will automatically block waiting for the response, and it will followingly add said response to the internal writer queue.
This returns whether or not it read a message/call termination message). Remember, however, that receiving
a call termination message does not mean the response to that call has been sent in the case of a streaming
procedure! Alternately, None will be returned if there was a manual end of input message, or on a wire
termination.
Sourcepub async fn flush_partial(
&self,
writer: &mut (impl AsyncWrite + Unpin),
) -> Result<(), Error>
pub async fn flush_partial( &self, writer: &mut (impl AsyncWrite + Unpin), ) -> Result<(), Error>
Writes all messages currently in the internal write queue to the given output stream.
This is named .flush_partial() to make clear that this is usually not sufficient, and that an end-of-input
message needs to be sent first! To combine these two calls, use .flush_end().
Sourcepub async fn flush_end(
&self,
writer: &mut (impl AsyncWrite + Unpin),
) -> Result<(), Error>
pub async fn flush_end( &self, writer: &mut (impl AsyncWrite + Unpin), ) -> Result<(), Error>
Flushes everything that has been written to this wire, along with an end-of-input message. This does not terminate the wire, it merely states that we’ve written everything we can for now. In programs with a ping-pong structure (e.g. Alice calls a method and waits for a response from Bob), this is what you would use when you’re done with the ping and waiting for the pong.
Internally, this simply combines .signal_end_of_input() and .flush_partial().
Sourcepub async fn flush_terminate(
&self,
writer: &mut (impl AsyncWrite + Unpin),
) -> Result<(), Error>
pub async fn flush_terminate( &self, writer: &mut (impl AsyncWrite + Unpin), ) -> Result<(), Error>
Flushes everything that has been written to this wire, along with a termination signal, which tells the remote that your
program is now terminating, and that this wire has been rendered invalid. If your program may send further data later, you
should use .flush_end() instead, which sends a temporary end-of-input message. You can imagine the difference between
the two as .flush_end() being ‘over’ and this method being ‘over and out’ (which, in real radio communication, would
actually just be ‘out’).
IMPORTANT: Never call this while waiting for a response, such as to a function call! Once the other side receives a termination signal, it will immediately self-poison, causing all future method calls on their wire to fail. In many cases, you won’t actually need a termination signal, such as if you know the structure of both programs involved, and you know what messages they’ll send to each other. A termination signal is used to literally force the other program to stop sending data. To see how this poison impacts the wire, look at the source code and the little preamble before every single method: if the wire has terminated, they will all immediately fail! Call this only when communicating with an unknown or untrusted program, but keeping in mind that they could easily ignore the termination signal (i.e. this cannot be used as a superficial measure).
Sourcepub async fn fill(
&self,
reader: &mut (impl AsyncRead + Unpin),
) -> Result<(), Error>
pub async fn fill( &self, reader: &mut (impl AsyncRead + Unpin), ) -> Result<(), Error>
An ergonomic equivalent of .receive_one() that receives messages until either an error occurs, or until the remote program
sends a manual end-of-input signal (with .signal_end_of_input()). This should only be used in single-threaded scenarios
to read a block of messages, as otherwise this is rendered superfluous by .open(), which runs this automatically in a
loop.
Note that the remote program must manually send that end-of-input signal, and when it does this will be dependent on the needs of your unique program.
WARNING: This method will internally handle UnexpectedEof errors and terminate the wire itself, assuming that the given
buffer is the only means of communication with the other side of the wire. If this is not the case, you should manually call
.receive_one() until it returns None, to mimic the behaviour of this method. Note that such errors will still be returned,
after the termination flag has been set.
Source§impl Wire<'static>
impl Wire<'static>
Sourcepub fn start(
&self,
reader: impl AsyncRead + Unpin + Send + Sync + 'static,
writer: impl AsyncWrite + Unpin + Send + Sync + 'static,
) -> AutonomousWireHandle
pub fn start( &self, reader: impl AsyncRead + Unpin + Send + Sync + 'static, writer: impl AsyncWrite + Unpin + Send + Sync + 'static, ) -> AutonomousWireHandle
Starts an autonomous version of the wire by starting two new threads, one for reading and one for writing. If you call this,
it is superfluous to call .fill()/.flush(), as they will be automatically called from here on.
This method is only available when a 'static reference to the Interface is held, since only that can be passed safely between
threads. You must also own both the reader and writer in order to use this method (which typically means this method must hold
those two exclusively).