Interface

Struct Interface 

Source
pub struct Interface { /* private fields */ }
Expand description

An inter-process communication (IPC) interface based on the arbitrary reception of bytes.

This is formed in a message-based interface, with messages identified by variable-size integers. Each time a new value is sent into the interface, it will be added to the provided message, allowing data from multiple sources to be simultaneously accumulated. Once -1 is sent, the message will be marked as completed, and future attempts to write to it will fail. Thread-safety and mutability locks are maintained independently for every message we store.

Note that this system is perfectly thread-safe, and it is perfectly valid for multiple data sources to send multiple messages simultaneously, while multiple receivers simultaneously wait for multiple messages. Once a message is accumulated, it will be stored in the buffer until the interface is dropped.

§Chunks

Messages sent to an IPFI interface are sent as raw bytes, however, with the serde feature enabled, they can be deserialised into arbitrary types. Usually, a message will consist of only one type, however, sometimes, especially in the case of a procedure that yields multiple independent values (e.g. strings), a single message will consist of multiple discrete packets, all of which should be deserialised independently. For this use-case, IPFI implements a chunking system, whereby senders to an interface can explicitly terminate an individual chunk, before starting a new one. As a result, accessing the raw bytes of a message will yield a Vec<Vec<u8>>, a list of chunks (each of which is a list of bytes).

Sometimes, one may wish to access chunks in real-time, as they are terminated individually, and this can be done through .get_chunk_stream(), which will return a ChunkReceiver that handles deserialisation. However, if this method is called, then, every time a chunk is terminated, it will unquestioningly be sent to the receiver, rather than being saved in the message buffer. Hence, after a real-time receiver is created, calls to methods like .get() will still succeed, but they will return empty messages. Only .get_chunks() is wise to this: it will explicitly make sure that no real-time interface is present before accessing the underlying message data. Note that, regardless of the methods called, it is impossible for messages to enter a broken state, provided they were entered into the interface correctly.

Implementations§

Source§

impl Interface

Source

pub fn new() -> Self

Initializes a new interface to be used for connecting to as many other programs as necessary through wires, or through manual communication management.

Source

pub fn get_id(&self) -> WireId

Gets an ID for a wire or other communication primitive that will depend on this interface. Any IPC primitive that will call procedures should acquire one of these for itself to make sure its procedure calls do not overlap with those of other wires. Typically, this will be called internally by the crate::Wire type.

Source

pub fn relinquish_id(&self, id: WireId)

Marks the given wire identifier as relinquished. This will return it into the queue and recirculate it to the next new wire that requests an identifier. As such, the provided identifier must not be reused after it is provided to this call. Any messages associated with this wire identifier will be popped automatically if this is called with pop_messages, which it generally should be in server-like contexts (otherwise, a client could cause an out-of-memory error by submitting just shy of enough arguments for the same procedure many times over different wires).

Generally, this should be called within drop implementations, and it is not necessary to call this for the inbuilt [Wire].

Source

pub fn drop_associated_call_message( &self, procedure_idx: ProcedureIndex, call_idx: CallIndex, wire_id: WireId, )

Drops the message associated with accumulating arguments for the given procedure call. This could be called for both procedures that are still accumulating arguments and for those that are done accumulating, but which are still streaming. As these buffers have already been created, creation locks will never be an issue, while waiting for a completion lock would lead to waiting forever, as the internal lock would persist, but the version of it held by the interface would be dropped (however, waiting on completion locks for argument accumulation buffers would be very strange).

Source

pub fn add_procedure<A: Serialize + DeserializeOwned + Tuple, R: Serialize + DeserializeOwned>( &self, idx: IpfiInteger, f: impl Fn(A) -> R + Send + Sync + 'static, )

Adds the given function as a procedure on this interface, which will allow other programs interacting with this one to execute it. Critically, no validation of intent or origin is requires to remotely execute procedures over IPFI, so you must be certain of two things when using this method:

  1. You never expose an IPFI interface to an untrusted program, or to a program that executes untrusted user code.
  2. You are happy for the functions you register using this method to be accessible by any programs that you allow to interface with your code.

You must also ensure that any invariants you expect from the arguments provided to your procedure are checked by your procedure, since arguments will be deserialized from raw bytes, and, although this process will catch structurally invalid input, it will not catch logically invalid input. As an example, of you have a function that adds two positive numbers, and you expect both given arguments to be positive, something you previously upheld at the caller level, you must no uphold that invariant within the procedure itself, otherwise any program could use IPFI to pass invalid integers. Alternately, you can use a custom serialization/deserialization process to uphold invariants, provided you are satisfied that is secure against totally untrusted input.

Source

pub fn add_sequence_procedure<A: Serialize + DeserializeOwned + Tuple, R: Serialize + DeserializeOwned>( &self, idx: IpfiInteger, f: impl Fn(Box<dyn Fn(R, bool) -> Result<(), Error> + Send + Sync + 'static>, A) + Send + Sync + 'static, )

Adds a streaming procedure with serialisation handled automatically. This is designed for procedures that will yield something that can eventually be interpreted as a list of values, and, as such, each yield will be interpreted as a separate chunk. On the other side, the caller will be able to deserialise the response as a list of values. Hence, the yielder passed to such procedures only requires the caller to specify whether or not the yielded message is the final chunk. If you wish to send partial chunks, this should be done manually through .add_raw_streaming_procedure().

Source

pub fn add_raw_procedure( &self, idx: IpfiInteger, f: impl Fn(&[u8]) -> Vec<u8> + Send + Sync + 'static, )

Same as .add_procedure(), but this accepts procedures that work directly with raw bytes, involving no serialization or deserialization process. This method is only recommended when the serde feature cannot be used for whatever reason, as it requires you to carefully manage byte streams yourself, as there will be absolutely no validity checking of them by IPFI. Unlike .add_procedure(), this will accept any bytes and pass them straight to you, performing no intermediary steps.

Source

pub fn add_raw_streaming_procedure( &self, idx: IpfiInteger, f: impl Fn(Box<dyn Fn(Vec<u8>, Terminator) -> Result<(), Error> + Send + Sync + 'static>, &[u8]) + Send + Sync + 'static, )

Same as .add_procedure_raw(), but this will add a streaming procedure, which, rather than returning its output all in one go, will yield chunks of it gradually, allowing it to continue operating in the background while other messages are processed. This can be used both for procedures that gather parts of their output gradually (e.g. those that yield real-time information) and for asynchronous procedures, which will return all their output at once, but after they have finished a computation in parallel.

Note that streaming procedures should provide all their output through the given closure (which can be thought of as analogous to the yield keyword in languages with native support for generators). The second argument to this closure is whether or not your procedure is finished. After you have called yielder(_, true), any subsequent calls will be propagated, but disregarded by a compliant IPFI implementation on the other side of the wire.

You should be especially careful of the value of Terminator you set for each yield. Of course, the final yield should be Terminator::Complete, however streaming procedures have the notion of chunks, which are discrete packets for later deserialisation. For instance, you could use a streaming procedure to stream parts of a single string, or you could stream independent strings. These have different serialised representations, and, as such, it is typical to want to maintain a count of how many chunks have been sent. By setting Terminator::Chunk, you can explicitly mark such chunks. Note that this also means you can send partial chunks.

Warning: this function is deliberately low-level, and performs absolutely no thread management. Your procedure should return as soon as possible, and should start a thread/task that then performs subsequent yields. If you’d like your procedure to automatically be executed in another thread/task, or if you’d like to return an actual Stream, then you may want to use another, higher-level method on Interface. Additionally, bear in mind that working with serialisation is somewhat harder when working with streams, as all data will be collated into the same buffer for deserialisation when received. This means procedures that are not accumulating some larger object, but rather a sequence of smaller objects, will need to yield a MessagePack array prefix as their first item. This is handled automatically by the higher-level method .add_sequence_procedure().

Source

pub async fn call_procedure( &self, procedure_idx: ProcedureIndex, call_idx: CallIndex, wire_id: WireId, yielder: impl Fn(Vec<u8>, Terminator) -> Result<(), Error> + Send + Sync + 'static, ) -> Result<Option<Vec<u8>>, Error>

Calls the procedure with the given index, returning the raw serialized byte vector it produces. This will get its argument information from the given internal message buffer index, which it expects to be completed. This method will not block waiting for a completion (or creation) of that buffer.

If this calls a streaming procedure, this will return Ok(None) on a success, as the return value will be sent through the given yielder function.

There is no method provided to avoid byte serialization, because procedure calls over IPFI are intended to be made solely by remote communicators.

Source

pub async fn push(&self) -> IpfiInteger

Allocates space for a new message buffer, creating a new completion lock. This will also mark a relevant creation lock as completed if one exists.

Source

pub async fn send( &self, datum: i8, message_id: IpfiInteger, ) -> Result<bool, Error>

Sends the given element through to the interface, adding it to the byte array of the message with the given 32-bit identifier. If -1 is provided, the message will be marked as completed. This will return an error if the message was already completed, or out-of-bounds, or otherwise Ok(true) if the message buffer is still open, or Ok(false) if this call caused it to complete.

If the message identifier provided does not exist (which may be because the message had already been completed and read), then

§Errors

This will fail if the message with the given identifier had already been completed, or it did not exist. Either of these cases can be trivially caused by a malicious client, and the caller should therefore be careful in how it handles these errors.

Notably, this will still append to a poisoned message buffer (because a poisoned completion lock merely indicates that it is unlikely that the message will ever be completed, it doesn’t hurt to try). If this call would complete a poisoned buffer, its state will be changed to completed.

Source

pub async fn send_many( &self, data: &[u8], message_id: IpfiInteger, ) -> Result<(), Error>

Sends many bytes through to the interface. When you have many bytes instead of just one at a time, this method should be preferred. Note that this method will not allow the termination of a message or chunk, and that should be handled separately.

Like .send(), this will ignore poisoned message buffers, and will try to complete them if possible (without changing their state).

Source

pub async fn terminate_message( &self, message_id: IpfiInteger, ) -> Result<(), Error>

Explicitly terminates the message with the given index. This will return an error if the message has already been terminated or if it was out-of-bounds. This will not change anything about the chunk layout of the message, and, for the final chunk, this should be called instead of terminating the chunk, otherwise an additional, empty chunk will be created. If there is a chunk receiver registered, this will send the final chunk through it (only if that chunk hasn’t been sent before).

For messages with real-time chunk receivers registered, this will drop the only sender, thus ending the channel, and it will pop the message, allowing the index to be reused.

If this terminates a message with a poisoned completion lock, it will change the lock’s state from poisoned to completed.

Source

pub async fn terminate_chunk( &self, message_id: IpfiInteger, ) -> Result<(), Error>

Terminates the current chunk in the given message buffer, creating a new chunk to accept further data. This should not be called to terminate the final chunk, you should use .terminate_message() for that to properly seal the buffer. If there is a real-time chunk receiver registered for this message buffer, this method will take the terminated chunk and send it directly to that receiver, thereby removing it from the message buffer. This kind of potential “corruption” is signalled by the Some(_) value of message.sender.

This will fail if the message itself has been terminated, but will succeed if the message has a poisoned completion lock.

Source

pub async fn get_chunk_stream( &self, message_id: IpfiInteger, ) -> Result<ChunkReceiver, Error>

Creates a sender/receiver pair that can be used to accessing the chunks in a message in real-time. Once this method is called, any future .get_chunks() calls are guaranteed to return None, as they cannot provide useful information. Other .get()-type calls will yield no information, because chunks will be streamed out in real-time, and thereby removed from the message buffer. Essentially, once this is called, all bytes are piped out to the created receiver, rather than being saved.

If the provided message buffer does not yet exist, this will wait for it to before creating the sender/receiver pair. This is done to avoid disrupting any existing creation locks that might be held on the message.

Source

pub async fn is_chunked(&self, message_id: IpfiInteger) -> Result<bool, Error>

Returns whether or not the given message is chunked. This will wait until the message has been completed before returning anything. Note that whether or not a message is chunked is purely determined by whether or not the sender terminates a chunk explicitly (thereby creating a second chunk, etc.).

Source

pub async fn has_stream(&self, message_id: IpfiInteger) -> Result<bool, Error>

Returns whether or not the given message has a real-time chunk receiver registered on it. If so, this likely indicates that the message buffer will be incomplete and should not be fetched directly. See Interface for further details on chunking. This method will wait until the given message has been completed.

Source

pub async fn get<T: DeserializeOwned>( &self, message_id: IpfiInteger, ) -> Result<T, Error>

Gets an object of the given type from the given message buffer index of the interface. This will block waiting for the given message buffer to be (1) created and (2) marked as complete. Depending on the caller’s behaviour, this may block forever if they never complete the message.

For messages with multiple chunks, this will only pay attention to the first chunk. For chunked messages, you should prefer .get_chunks(), which will deserialise as many chunks as are available. To determine whether or not a message has been chunked, you can use .is_chunked().

Note that this method will extract the underlying message from the given buffer index, leaving it available for future messages or procedure call metadata. This means that requesting the same message index may yield completely different data.

Source

pub async fn get_chunks<T: DeserializeOwned>( &self, message_id: IpfiInteger, ) -> Result<Option<Vec<T>>, Error>

Same as .get(), except this is designed to be used for procedures that are known to stream their output in discrete chunks. There are two broad types of streaming procedures: those that stream bytes gradually that eventually become part of a greater whole (all deserialised at once), and those that stream discrete objects that should all be deserialised independently. This function is for the latter type.

Note that, if used on any other type of message, this will still work, it will just produce a vector of length 1 (as there was only one chunk involved).

Bear in mind that, if .get_chunk_stream() has been previously called on this message, then this will produce no data (as the chunks will have been streamed to the receiver in real-time). In that case, None will be returned for clarity.

Source

pub async fn get_raw( &self, message_id: IpfiInteger, ) -> Result<Vec<Vec<u8>>, Error>

Same as .get(), but gets the raw byte array instead. This will return the chunks in which the message was sent, which, for the vast majority of messages, will be only one. See Interface to learn more about chunks.

This will block until the message with the given identifier has been created and completed.

Note that this method will extract the underlying message from the given buffer index, leaving it available for future messages or procedure call metadata. This means that requesting the same message index may yield completely different data. It is typically useful to use this method through some higher-level structure, such as a crate::CallHandle.

Trait Implementations§

Source§

impl Default for Interface

Source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.