Struct Multi

Source
pub struct Multi<ItemType: Debug + Sync + Send + 'static, MultiChannelType: FullDuplexMultiChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Sync + Send + 'static, const INSTRUMENTS: usize, DerivedItemType: Debug + Sync + Send + 'static> {
    pub multi_name: String,
    pub channel: Arc<MultiChannelType>,
    pub executor_infos: RwLock<IndexMap<String, ExecutorInfo>>,
    /* private fields */
}
Expand description

Multi is an event handler capable of having several “listeners” – all of which receives all events.
With this struct, it is possible to:

  • produce events
  • spawn new Streams & executors
  • close Streams (and executors)

Example:

{reactive_mutiny::Instruments::MetricsWithoutLogs.into()}

Fields§

§multi_name: String§channel: Arc<MultiChannelType>§executor_infos: RwLock<IndexMap<String, ExecutorInfo>>

Implementations§

Source§

impl<ItemType: Debug + Send + Sync + 'static, MultiChannelType: FullDuplexMultiChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Sync + Send + 'static, const INSTRUMENTS: usize, DerivedItemType: Debug + Sync + Send + 'static> Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>

Source

pub fn new<IntoString: Into<String>>(multi_name: IntoString) -> Self

Creates a Multi, which implements the listener pattern, capable of:

  • creating Streams;
  • applying a user-provided processor to the Streams and executing them to depletion – the final Streams may produce a combination of fallible/non-fallible & futures/non-futures events;
  • producing events that are sent to those Streams.

multi_name is used for instrumentation purposes, depending on the INSTRUMENT generic argument passed to the Multi struct.

Source

pub fn name(&self) -> &str

Returns this Multi’s name

Source

pub fn send(&self, item: ItemType) -> RetryConsumerResult<(), ItemType, ()>

Source

pub fn send_with<F: FnOnce(&mut ItemType)>( &self, setter: F, ) -> RetryConsumerResult<(), F, ()>

Source

pub fn send_derived(&self, arc_item: &DerivedItemType) -> bool

Source

pub fn buffer_size(&self) -> u32

Source

pub fn pending_items_count(&self) -> u32

Source

pub async fn spawn_executor<IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send, ErrVoidAsyncType: Future<Output = ()> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>( &self, concurrency_limit: u32, futures_timeout: Duration, pipeline_name: IntoString, pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static, ) -> Result<(), Box<dyn Error>>

Spawns a new listener of all subsequent events sent to this Multi, processing them through the Stream returned by pipeline_builder(), which generates events that are Futures & Fallible.

Source

pub async fn spawn_oldies_executor<IntoString: Into<String>, OutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = OldiesOutType> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = NewiesOutType> + Sync + Send + 'static, OldiesOutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send, NewiesOutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send, ErrVoidAsyncType: Future<Output = ()> + Send + 'static, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static>( self: &Arc<Self>, concurrency_limit: u32, sequential_transition: bool, futures_timeout: Duration, oldies_pipeline_name: IntoString, oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType, oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static, newies_pipeline_name: IntoString, newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static, newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static, ) -> Result<(), Box<dyn Error>>

For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this Multi:

  1. One for past events – to be processed by the stream returned by oldies_pipeline_builder();
  2. Another one for subsequent events – to be processed by the stream returned by newies_pipeline_builder().

By using this method, it is assumed that both pipeline builders returns Future<Result> events. If this is not so, see one of the sibling methods.
The stream splitting is guaranteed not to drop any events and sequential_transition may be used to indicate if old events should be processed first or if both old and new events may be processed simultaneously (in an inevitable out-of-order fashion).

Source

pub async fn spawn_futures_executor<IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = OutItemType> + Send, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>( &self, concurrency_limit: u32, futures_timeout: Duration, pipeline_name: IntoString, pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static, ) -> Result<(), Box<dyn Error>>

Spawns a new listener of all subsequent events sent to this Multi, processing them through the Stream returned by pipeline_builder(), which generates events that are Futures.

Source

pub async fn spawn_futures_oldies_executor<IntoString: Into<String>, OutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = OldiesOutType> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = NewiesOutType> + Sync + Send + 'static, OldiesOutType: Future<Output = OutItemType> + Send, NewiesOutType: Future<Output = OutItemType> + Send, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static>( self: &Arc<Self>, concurrency_limit: u32, sequential_transition: bool, futures_timeout: Duration, oldies_pipeline_name: IntoString, oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType, oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static, newies_pipeline_name: IntoString, newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static, newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static, ) -> Result<(), Box<dyn Error>>

For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this Multi:

  1. One for past events – to be processed by the stream returned by oldies_pipeline_builder();
  2. Another one for subsequent events – to be processed by the stream returned by newies_pipeline_builder().

By using this method, it is assumed that both pipeline builders returns Future events. If this is not so, see one of the sibling methods.
The stream splitting is guaranteed not to drop any events and sequential_transition may be used to indicate if old events should be processed first or if both old and new events may be processed simultaneously (in an inevitable out-of-order fashion).

Source

pub async fn spawn_fallibles_executor<IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>( &self, concurrency_limit: u32, pipeline_name: IntoString, pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) + Send + Sync + 'static, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static, ) -> Result<(), Box<dyn Error>>

Spawns a new listener of all subsequent events sent to this Multi, processing them through the Stream returned by pipeline_builder(), which generates events that are Fallible.

Source

pub async fn spawn_fallibles_oldies_executor<IntoString: Into<String>, OutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Sync + Send + 'static, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static>( self: &Arc<Self>, concurrency_limit: u32, sequential_transition: bool, oldies_pipeline_name: IntoString, oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType, oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static, newies_pipeline_name: IntoString, newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static, newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) + Send + Sync + 'static, ) -> Result<(), Box<dyn Error>>

For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this Multi:

  1. One for past events – to be processed by the stream returned by oldies_pipeline_builder();
  2. Another one for subsequent events – to be processed by the stream returned by newies_pipeline_builder().

By using this method, it is assumed that both pipeline builders returns Fallible events. If this is not so, see one of the sibling methods.
The stream splitting is guaranteed not to drop any events and sequential_transition may be used to indicate if old events should be processed first or if both old and new events may be processed simultaneously (in an inevitable out-of-order fashion).

Source

pub async fn spawn_non_futures_non_fallible_executor<IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = OutItemType> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>( &self, concurrency_limit: u32, pipeline_name: IntoString, pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static, ) -> Result<(), Box<dyn Error>>

Spawns a new listener of all subsequent events sent to this Multi, processing them through the Stream returned by pipeline_builder(), which generates events that are Non-Futures & Non-Fallible.

Source

pub async fn spawn_non_futures_non_fallible_oldies_executor<IntoString: Into<String>, OldiesOutItemType: Send + Debug, NewiesOutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = OldiesOutItemType> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = NewiesOutItemType> + Sync + Send + 'static, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static>( self: &Arc<Self>, concurrency_limit: u32, sequential_transition: bool, oldies_pipeline_name: IntoString, oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType, oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static, newies_pipeline_name: IntoString, newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType, newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static, ) -> Result<(), Box<dyn Error>>

For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this Multi:

  1. One for past events – to be processed by the stream returned by oldies_pipeline_builder();
  2. Another one for subsequent events – to be processed by the stream returned by newies_pipeline_builder().

By using this method, it is assumed that both pipeline builders returns non-Futures & non-Fallible events. If this is not so, see [spawn_oldies_executor].
The stream splitting is guaranteed not to drop any events and sequential_transition may be used to indicate if old events should be processed first or if both old and new events may be processed simultaneously (in an inevitable out-of-order fashion).

Source

pub async fn close(&self, timeout: Duration) -> bool

Closes this Multi, in isolation – flushing pending events, closing the producers, waiting for all events to be fully processed and calling all executor’s “on close” callbacks.
If this Multi share resources with another one (which will get dumped by the “on close” callback), most probably you want to close them atomically – see multis_close_async!().
Returns true if all events could be flushed within the given timeout.

Source

pub async fn flush_and_cancel_executor<IntoString: Into<String>>( &self, pipeline_name: IntoString, timeout: Duration, ) -> bool

Asynchronously blocks until all resources associated with the executor responsible for pipeline_name are freed:

  1. immediately causes pipeline_name to cease receiving new elements by removing it from the active list
  2. wait for all pending elements to be processed
  3. remove the queue/channel and wake the Stream to see that it has ended
  4. waits for the executor to inform it ceased its execution
  5. return, dropping all resources.

Note it might make sense to spawn this operation by a Tokio task, for it may block indefinitely if the Stream has no timeout.
Also note that timing out this operation is not advisable, for resources won’t be freed until it reaches the last step.
Returns false if there was no executor associated with pipeline_name.

Trait Implementations§

Source§

impl<ItemType: Debug + Send + Sync + 'static, MultiChannelType: FullDuplexMultiChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Sync + Send + 'static, const INSTRUMENTS: usize, DerivedItemType: Debug + Sync + Send + 'static> GenericMulti<INSTRUMENTS> for Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>

Source§

const INSTRUMENTS: usize = INSTRUMENTS

The instruments this Multi will collect/report
Source§

type ItemType = ItemType

The payload type this Multi’s producers will receive
Source§

type MultiChannelType = MultiChannelType

The channel through which payloads will travel from producers to listeners (see Multi for more info)
Source§

type DerivedItemType = DerivedItemType

The payload type this Multi’s Streams will yield
Source§

type MutinyStreamType = MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>

Defined as MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>,
the concrete type for the Stream of DerivedItemTypes to be given to listeners
Source§

fn new<IntoString: Into<String>>(multi_name: IntoString) -> Self

Source§

fn to_multi( self, ) -> Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>

Auto Trait Implementations§

§

impl<ItemType, MultiChannelType, const INSTRUMENTS: usize, DerivedItemType> !Freeze for Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>

§

impl<ItemType, MultiChannelType, const INSTRUMENTS: usize, DerivedItemType> !RefUnwindSafe for Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>

§

impl<ItemType, MultiChannelType, const INSTRUMENTS: usize, DerivedItemType> Send for Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>

§

impl<ItemType, MultiChannelType, const INSTRUMENTS: usize, DerivedItemType> Sync for Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>

§

impl<ItemType, MultiChannelType, const INSTRUMENTS: usize, DerivedItemType> Unpin for Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>
where ItemType: Unpin, MultiChannelType: Unpin, DerivedItemType: Unpin,

§

impl<ItemType, MultiChannelType, const INSTRUMENTS: usize, DerivedItemType> !UnwindSafe for Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> Erased for T