Struct reactive_messaging::prelude::Multi  
source · pub struct Multi<ItemType, MultiChannelType, const INSTRUMENTS: usize, DerivedItemType>where
    ItemType: Debug + Sync + Send + 'static,
    MultiChannelType: FullDuplexMultiChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Sync + Send + 'static,
    DerivedItemType: Debug + Sync + Send + 'static,{
    pub multi_name: String,
    pub channel: Arc<MultiChannelType>,
    pub executor_infos: RwLock<IndexMap<String, ExecutorInfo>>,
    /* private fields */
}Expand description
Multi is an event handler capable of having several “listeners” – all of which receives all events.
With this struct, it is possible to:
- produce events
- spawn new Streams & executors
- close Streams (and executors) Example:
{reactive_mutiny::Instruments::MetricsWithoutLogs.into()}Fields§
§multi_name: String§channel: Arc<MultiChannelType>§executor_infos: RwLock<IndexMap<String, ExecutorInfo>>Implementations§
source§impl<ItemType, MultiChannelType, const INSTRUMENTS: usize, DerivedItemType> Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>where
    ItemType: Debug + Send + Sync + 'static,
    MultiChannelType: FullDuplexMultiChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Sync + Send + 'static,
    DerivedItemType: Debug + Sync + Send + 'static,
 
impl<ItemType, MultiChannelType, const INSTRUMENTS: usize, DerivedItemType> Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>where ItemType: Debug + Send + Sync + 'static, MultiChannelType: FullDuplexMultiChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Sync + Send + 'static, DerivedItemType: Debug + Sync + Send + 'static,
sourcepub fn new<IntoString>(
    multi_name: IntoString
) -> Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>where
    IntoString: Into<String>,
 
pub fn new<IntoString>( multi_name: IntoString ) -> Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>where IntoString: Into<String>,
Creates a Multi, which implements the listener pattern, capable of:
- creating Streams;
- applying a user-provided processorto theStreams and executing them to depletion – the finalStreams may produce a combination of fallible/non-fallible & futures/non-futures events;
- producing events that are sent to those Streams.multi_nameis used for instrumentation purposes, depending on theINSTRUMENTgeneric argument passed to the Multi struct.
pub fn send(&self, item: ItemType) -> RetryResult<(), ItemType, (), ()>
pub fn send_with<F>(&self, setter: F) -> RetryResult<(), F, (), ()>where F: FnOnce(&mut ItemType),
pub fn send_derived(&self, arc_item: &DerivedItemType) -> bool
pub fn buffer_size(&self) -> u32
pub fn pending_items_count(&self) -> u32
sourcepub async fn spawn_executor<IntoString, OutItemType, OutStreamType, OutType, ErrVoidAsyncType, CloseVoidAsyncType>(
    &self,
    concurrency_limit: u32,
    futures_timeout: Duration,
    pipeline_name: IntoString,
    pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
    on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static,
    on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> Result<(), Box<dyn Error>>where
    IntoString: Into<String>,
    OutItemType: Send + Debug,
    OutStreamType: Stream<Item = OutType> + Send + 'static,
    OutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send,
    ErrVoidAsyncType: Future<Output = ()> + Send + 'static,
    CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
 
pub async fn spawn_executor<IntoString, OutItemType, OutStreamType, OutType, ErrVoidAsyncType, CloseVoidAsyncType>( &self, concurrency_limit: u32, futures_timeout: Duration, pipeline_name: IntoString, pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> Result<(), Box<dyn Error>>where IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send, ErrVoidAsyncType: Future<Output = ()> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
Spawns a new listener of all subsequent events sent to this Multi, processing them through the Stream returned by pipeline_builder(),
which generates events that are Futures & Fallible.
sourcepub async fn spawn_oldies_executor<IntoString, OutItemType, OldiesOutStreamType, NewiesOutStreamType, OldiesOutType, NewiesOutType, ErrVoidAsyncType, OldiesCloseVoidAsyncType, NewiesCloseVoidAsyncType>(
    self: &Arc<Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>>,
    concurrency_limit: u32,
    sequential_transition: bool,
    futures_timeout: Duration,
    oldies_pipeline_name: IntoString,
    oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
    oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
    newies_pipeline_name: IntoString,
    newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static,
    newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static,
    on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static
) -> Result<(), Box<dyn Error>>where
    IntoString: Into<String>,
    OutItemType: Send + Debug,
    OldiesOutStreamType: Stream<Item = OldiesOutType> + Sync + Send + 'static,
    NewiesOutStreamType: Stream<Item = NewiesOutType> + Sync + Send + 'static,
    OldiesOutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send,
    NewiesOutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send,
    ErrVoidAsyncType: Future<Output = ()> + Send + 'static,
    OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
    NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
 
pub async fn spawn_oldies_executor<IntoString, OutItemType, OldiesOutStreamType, NewiesOutStreamType, OldiesOutType, NewiesOutType, ErrVoidAsyncType, OldiesCloseVoidAsyncType, NewiesCloseVoidAsyncType>( self: &Arc<Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>>, concurrency_limit: u32, sequential_transition: bool, futures_timeout: Duration, oldies_pipeline_name: IntoString, oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType, oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static, newies_pipeline_name: IntoString, newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static, newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static ) -> Result<(), Box<dyn Error>>where IntoString: Into<String>, OutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = OldiesOutType> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = NewiesOutType> + Sync + Send + 'static, OldiesOutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send, NewiesOutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send, ErrVoidAsyncType: Future<Output = ()> + Send + 'static, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this Multi:
- One for past events – to be processed by the stream returned by oldies_pipeline_builder();
- Another one for subsequent events – to be processed by the stream returned by newies_pipeline_builder(). By using this method, it is assumed that both pipeline builders returnsFuture<Result>events. If this is not so, see one of the sibling methods.
 The stream splitting is guaranteed not to drop any events andsequential_transitionmay be used to indicate if old events should be processed first or if both old and new events may be processed simultaneously (in an inevitable out-of-order fashion).
sourcepub async fn spawn_futures_executor<IntoString, OutItemType, OutStreamType, OutType, CloseVoidAsyncType>(
    &self,
    concurrency_limit: u32,
    futures_timeout: Duration,
    pipeline_name: IntoString,
    pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
    on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> Result<(), Box<dyn Error>>where
    IntoString: Into<String>,
    OutItemType: Send + Debug,
    OutStreamType: Stream<Item = OutType> + Send + 'static,
    OutType: Future<Output = OutItemType> + Send,
    CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
 
pub async fn spawn_futures_executor<IntoString, OutItemType, OutStreamType, OutType, CloseVoidAsyncType>( &self, concurrency_limit: u32, futures_timeout: Duration, pipeline_name: IntoString, pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> Result<(), Box<dyn Error>>where IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = OutItemType> + Send, CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
Spawns a new listener of all subsequent events sent to this Multi, processing them through the Stream returned by pipeline_builder(),
which generates events that are Futures.
sourcepub async fn spawn_futures_oldies_executor<IntoString, OutItemType, OldiesOutStreamType, NewiesOutStreamType, OldiesOutType, NewiesOutType, OldiesCloseVoidAsyncType, NewiesCloseVoidAsyncType>(
    self: &Arc<Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>>,
    concurrency_limit: u32,
    sequential_transition: bool,
    futures_timeout: Duration,
    oldies_pipeline_name: IntoString,
    oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
    oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
    newies_pipeline_name: IntoString,
    newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static,
    newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static
) -> Result<(), Box<dyn Error>>where
    IntoString: Into<String>,
    OutItemType: Send + Debug,
    OldiesOutStreamType: Stream<Item = OldiesOutType> + Sync + Send + 'static,
    NewiesOutStreamType: Stream<Item = NewiesOutType> + Sync + Send + 'static,
    OldiesOutType: Future<Output = OutItemType> + Send,
    NewiesOutType: Future<Output = OutItemType> + Send,
    OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
    NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
 
pub async fn spawn_futures_oldies_executor<IntoString, OutItemType, OldiesOutStreamType, NewiesOutStreamType, OldiesOutType, NewiesOutType, OldiesCloseVoidAsyncType, NewiesCloseVoidAsyncType>( self: &Arc<Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>>, concurrency_limit: u32, sequential_transition: bool, futures_timeout: Duration, oldies_pipeline_name: IntoString, oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType, oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static, newies_pipeline_name: IntoString, newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static, newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static ) -> Result<(), Box<dyn Error>>where IntoString: Into<String>, OutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = OldiesOutType> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = NewiesOutType> + Sync + Send + 'static, OldiesOutType: Future<Output = OutItemType> + Send, NewiesOutType: Future<Output = OutItemType> + Send, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this Multi:
- One for past events – to be processed by the stream returned by oldies_pipeline_builder();
- Another one for subsequent events – to be processed by the stream returned by newies_pipeline_builder(). By using this method, it is assumed that both pipeline builders returnsFutureevents. If this is not so, see one of the sibling methods.
 The stream splitting is guaranteed not to drop any events andsequential_transitionmay be used to indicate if old events should be processed first or if both old and new events may be processed simultaneously (in an inevitable out-of-order fashion).
sourcepub async fn spawn_fallibles_executor<IntoString, OutItemType, OutStreamType, CloseVoidAsyncType>(
    &self,
    concurrency_limit: u32,
    pipeline_name: IntoString,
    pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
    on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) + Send + Sync + 'static,
    on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> Result<(), Box<dyn Error>>where
    IntoString: Into<String>,
    OutItemType: Send + Debug,
    OutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send + 'static,
    CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
 
pub async fn spawn_fallibles_executor<IntoString, OutItemType, OutStreamType, CloseVoidAsyncType>( &self, concurrency_limit: u32, pipeline_name: IntoString, pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) + Send + Sync + 'static, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> Result<(), Box<dyn Error>>where IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
Spawns a new listener of all subsequent events sent to this Multi, processing them through the Stream returned by pipeline_builder(),
which generates events that are Fallible.
sourcepub async fn spawn_fallibles_oldies_executor<IntoString, OutItemType, OldiesOutStreamType, NewiesOutStreamType, OldiesCloseVoidAsyncType, NewiesCloseVoidAsyncType>(
    self: &Arc<Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>>,
    concurrency_limit: u32,
    sequential_transition: bool,
    oldies_pipeline_name: IntoString,
    oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
    oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
    newies_pipeline_name: IntoString,
    newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static,
    newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static,
    on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) + Send + Sync + 'static
) -> Result<(), Box<dyn Error>>where
    IntoString: Into<String>,
    OutItemType: Send + Debug,
    OldiesOutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Sync + Send + 'static,
    NewiesOutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Sync + Send + 'static,
    OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
    NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
 
pub async fn spawn_fallibles_oldies_executor<IntoString, OutItemType, OldiesOutStreamType, NewiesOutStreamType, OldiesCloseVoidAsyncType, NewiesCloseVoidAsyncType>( self: &Arc<Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>>, concurrency_limit: u32, sequential_transition: bool, oldies_pipeline_name: IntoString, oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType, oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static, newies_pipeline_name: IntoString, newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static, newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) + Send + Sync + 'static ) -> Result<(), Box<dyn Error>>where IntoString: Into<String>, OutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Sync + Send + 'static, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this Multi:
- One for past events – to be processed by the stream returned by oldies_pipeline_builder();
- Another one for subsequent events – to be processed by the stream returned by newies_pipeline_builder(). By using this method, it is assumed that both pipeline builders returns Fallible events. If this is not so, see one of the sibling methods.
 The stream splitting is guaranteed not to drop any events andsequential_transitionmay be used to indicate if old events should be processed first or if both old and new events may be processed simultaneously (in an inevitable out-of-order fashion).
sourcepub async fn spawn_non_futures_non_fallible_executor<IntoString, OutItemType, OutStreamType, CloseVoidAsyncType>(
    &self,
    concurrency_limit: u32,
    pipeline_name: IntoString,
    pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
    on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> Result<(), Box<dyn Error>>where
    IntoString: Into<String>,
    OutItemType: Send + Debug,
    OutStreamType: Stream<Item = OutItemType> + Send + 'static,
    CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
 
pub async fn spawn_non_futures_non_fallible_executor<IntoString, OutItemType, OutStreamType, CloseVoidAsyncType>( &self, concurrency_limit: u32, pipeline_name: IntoString, pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> Result<(), Box<dyn Error>>where IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = OutItemType> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
Spawns a new listener of all subsequent events sent to this Multi, processing them through the Stream returned by pipeline_builder(),
which generates events that are Non-Futures & Non-Fallible.
sourcepub async fn spawn_non_futures_non_fallible_oldies_executor<IntoString, OldiesOutItemType, NewiesOutItemType, OldiesOutStreamType, NewiesOutStreamType, OldiesCloseVoidAsyncType, NewiesCloseVoidAsyncType>(
    self: &Arc<Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>>,
    concurrency_limit: u32,
    sequential_transition: bool,
    oldies_pipeline_name: IntoString,
    oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
    oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
    newies_pipeline_name: IntoString,
    newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType,
    newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static
) -> Result<(), Box<dyn Error>>where
    IntoString: Into<String>,
    OldiesOutItemType: Send + Debug,
    NewiesOutItemType: Send + Debug,
    OldiesOutStreamType: Stream<Item = OldiesOutItemType> + Sync + Send + 'static,
    NewiesOutStreamType: Stream<Item = NewiesOutItemType> + Sync + Send + 'static,
    OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
    NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
 
pub async fn spawn_non_futures_non_fallible_oldies_executor<IntoString, OldiesOutItemType, NewiesOutItemType, OldiesOutStreamType, NewiesOutStreamType, OldiesCloseVoidAsyncType, NewiesCloseVoidAsyncType>( self: &Arc<Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>>, concurrency_limit: u32, sequential_transition: bool, oldies_pipeline_name: IntoString, oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType, oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static, newies_pipeline_name: IntoString, newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType, newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static ) -> Result<(), Box<dyn Error>>where IntoString: Into<String>, OldiesOutItemType: Send + Debug, NewiesOutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = OldiesOutItemType> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = NewiesOutItemType> + Sync + Send + 'static, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this Multi:
- One for past events – to be processed by the stream returned by oldies_pipeline_builder();
- Another one for subsequent events – to be processed by the stream returned by newies_pipeline_builder(). By using this method, it is assumed that both pipeline builders returns non-Futures & non-Fallible events. If this is not so, see [spawn_oldies_executor].
 The stream splitting is guaranteed not to drop any events andsequential_transitionmay be used to indicate if old events should be processed first or if both old and new events may be processed simultaneously (in an inevitable out-of-order fashion).
sourcepub async fn close(&self, timeout: Duration) -> bool
 
pub async fn close(&self, timeout: Duration) -> bool
Closes this Multi, in isolation – flushing pending events, closing the producers,
waiting for all events to be fully processed and calling all executor’s “on close” callbacks.
If this Multi share resources with another one (which will get dumped by the “on close”
callback), most probably you want to close them atomically – see multis_close_async!().
Returns true if all events could be flushed within the given timeout.
sourcepub async fn flush_and_cancel_executor<IntoString>(
    &self,
    pipeline_name: IntoString,
    timeout: Duration
) -> boolwhere
    IntoString: Into<String>,
 
pub async fn flush_and_cancel_executor<IntoString>( &self, pipeline_name: IntoString, timeout: Duration ) -> boolwhere IntoString: Into<String>,
Asynchronously blocks until all resources associated with the executor responsible for pipeline_name are freed:
- immediately causes pipeline_nameto cease receiving new elements by removing it from the active list
- wait for all pending elements to be processed
- remove the queue/channel and wake the Stream to see that it has ended
- waits for the executor to inform it ceased its execution
- return, dropping all resources
 Note it might make sense to spawn this operation by aTokio task, for it may block indefinitely if the Stream has no timeout.
 Also note that timing out this operation is not advisable, for resources won’t be freed until it reaches the last step.
 Returns false if there was no executor associated withpipeline_name.
Trait Implementations§
source§impl<ItemType, MultiChannelType, const INSTRUMENTS: usize, DerivedItemType> GenericMulti<INSTRUMENTS> for Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>where
    ItemType: Debug + Send + Sync + 'static,
    MultiChannelType: FullDuplexMultiChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Sync + Send + 'static,
    DerivedItemType: Debug + Sync + Send + 'static,
 
impl<ItemType, MultiChannelType, const INSTRUMENTS: usize, DerivedItemType> GenericMulti<INSTRUMENTS> for Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>where ItemType: Debug + Send + Sync + 'static, MultiChannelType: FullDuplexMultiChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Sync + Send + 'static, DerivedItemType: Debug + Sync + Send + 'static,
source§const INSTRUMENTS: usize = INSTRUMENTS
 
const INSTRUMENTS: usize = INSTRUMENTS
§type MultiChannelType = MultiChannelType
 
type MultiChannelType = MultiChannelType
§type DerivedItemType = DerivedItemType
 
type DerivedItemType = DerivedItemType
Streams will yield§type MutinyStreamType = MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>
 
type MutinyStreamType = MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>
MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>,the concrete type for the
Stream of DerivedItemTypes to be given to listeners