Struct reactive_messaging::prelude::Multi
source · pub struct Multi<ItemType, MultiChannelType, const INSTRUMENTS: usize, DerivedItemType>where
ItemType: Debug + Sync + Send + 'static,
MultiChannelType: FullDuplexMultiChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Sync + Send + 'static,
DerivedItemType: Debug + Sync + Send + 'static,{
pub multi_name: String,
pub channel: Arc<MultiChannelType, Global>,
pub executor_infos: RwLock<IndexMap<String, ExecutorInfo, RandomState>>,
/* private fields */
}Expand description
Multi is an event handler capable of having several “listeners” – all of which receives all events.
With this struct, it is possible to:
- produce events
- spawn new
Streams & executors - close
Streams (and executors) Example:
{reactive_mutiny::Instruments::MetricsWithoutLogs.into()}Fields§
§multi_name: String§channel: Arc<MultiChannelType, Global>§executor_infos: RwLock<IndexMap<String, ExecutorInfo, RandomState>>Implementations§
source§impl<ItemType, MultiChannelType, const INSTRUMENTS: usize, DerivedItemType> Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>where
ItemType: Debug + Send + Sync + 'static,
MultiChannelType: FullDuplexMultiChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Sync + Send + 'static,
DerivedItemType: Debug + Sync + Send + 'static,
impl<ItemType, MultiChannelType, const INSTRUMENTS: usize, DerivedItemType> Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>where ItemType: Debug + Send + Sync + 'static, MultiChannelType: FullDuplexMultiChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Sync + Send + 'static, DerivedItemType: Debug + Sync + Send + 'static,
sourcepub fn new<IntoString>(
multi_name: IntoString
) -> Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>where
IntoString: Into<String>,
pub fn new<IntoString>( multi_name: IntoString ) -> Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>where IntoString: Into<String>,
Creates a Multi, which implements the listener pattern, capable of:
- creating
Streams; - applying a user-provided
processorto theStreams and executing them to depletion – the finalStreams may produce a combination of fallible/non-fallible & futures/non-futures events; - producing events that are sent to those
Streams.multi_nameis used for instrumentation purposes, depending on theINSTRUMENTgeneric argument passed to the Multi struct.
pub fn send(&self, item: ItemType) -> RetryResult<(), ItemType, (), ()>
pub fn send_with<F>(&self, setter: F) -> RetryResult<(), F, (), ()>where F: FnOnce(&mut ItemType),
pub fn send_derived(&self, arc_item: &DerivedItemType) -> bool
pub fn buffer_size(&self) -> u32
pub fn pending_items_count(&self) -> u32
sourcepub async fn spawn_executor<IntoString, OutItemType, OutStreamType, OutType, ErrVoidAsyncType, CloseVoidAsyncType>(
&self,
concurrency_limit: u32,
futures_timeout: Duration,
pipeline_name: IntoString,
pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
on_err_callback: impl Fn(Box<dyn Error + Send + Sync, Global>) -> ErrVoidAsyncType + Send + Sync + 'static,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> impl Future<Output = Result<(), Box<dyn Error, Global>>>where
IntoString: Into<String>,
OutItemType: Send + Debug,
OutStreamType: Stream<Item = OutType> + Send + 'static,
OutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync, Global>>> + Send,
ErrVoidAsyncType: Future<Output = ()> + Send + 'static,
CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
pub async fn spawn_executor<IntoString, OutItemType, OutStreamType, OutType, ErrVoidAsyncType, CloseVoidAsyncType>( &self, concurrency_limit: u32, futures_timeout: Duration, pipeline_name: IntoString, pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType, on_err_callback: impl Fn(Box<dyn Error + Send + Sync, Global>) -> ErrVoidAsyncType + Send + Sync + 'static, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> impl Future<Output = Result<(), Box<dyn Error, Global>>>where IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync, Global>>> + Send, ErrVoidAsyncType: Future<Output = ()> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
Spawns a new listener of all subsequent events sent to this Multi, processing them through the Stream returned by pipeline_builder(),
which generates events that are Futures & Fallible.
sourcepub async fn spawn_oldies_executor<IntoString, OutItemType, OldiesOutStreamType, NewiesOutStreamType, OldiesOutType, NewiesOutType, ErrVoidAsyncType, OldiesCloseVoidAsyncType, NewiesCloseVoidAsyncType>(
self: &Arc<Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>, Global>,
concurrency_limit: u32,
sequential_transition: bool,
futures_timeout: Duration,
oldies_pipeline_name: IntoString,
oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
newies_pipeline_name: IntoString,
newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static,
newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static,
on_err_callback: impl Fn(Box<dyn Error + Send + Sync, Global>) -> ErrVoidAsyncType + Send + Sync + 'static
) -> impl Future<Output = Result<(), Box<dyn Error, Global>>>where
IntoString: Into<String>,
OutItemType: Send + Debug,
OldiesOutStreamType: Stream<Item = OldiesOutType> + Sync + Send + 'static,
NewiesOutStreamType: Stream<Item = NewiesOutType> + Sync + Send + 'static,
OldiesOutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync, Global>>> + Send,
NewiesOutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync, Global>>> + Send,
ErrVoidAsyncType: Future<Output = ()> + Send + 'static,
OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
pub async fn spawn_oldies_executor<IntoString, OutItemType, OldiesOutStreamType, NewiesOutStreamType, OldiesOutType, NewiesOutType, ErrVoidAsyncType, OldiesCloseVoidAsyncType, NewiesCloseVoidAsyncType>( self: &Arc<Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>, Global>, concurrency_limit: u32, sequential_transition: bool, futures_timeout: Duration, oldies_pipeline_name: IntoString, oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType, oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static, newies_pipeline_name: IntoString, newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static, newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static, on_err_callback: impl Fn(Box<dyn Error + Send + Sync, Global>) -> ErrVoidAsyncType + Send + Sync + 'static ) -> impl Future<Output = Result<(), Box<dyn Error, Global>>>where IntoString: Into<String>, OutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = OldiesOutType> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = NewiesOutType> + Sync + Send + 'static, OldiesOutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync, Global>>> + Send, NewiesOutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync, Global>>> + Send, ErrVoidAsyncType: Future<Output = ()> + Send + 'static, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this Multi:
- One for past events – to be processed by the stream returned by
oldies_pipeline_builder(); - Another one for subsequent events – to be processed by the stream returned by
newies_pipeline_builder(). By using this method, it is assumed that both pipeline builders returnsFuture<Result>events. If this is not so, see one of the sibling methods.
The stream splitting is guaranteed not to drop any events andsequential_transitionmay be used to indicate if old events should be processed first or if both old and new events may be processed simultaneously (in an inevitable out-of-order fashion).
sourcepub async fn spawn_futures_executor<IntoString, OutItemType, OutStreamType, OutType, CloseVoidAsyncType>(
&self,
concurrency_limit: u32,
futures_timeout: Duration,
pipeline_name: IntoString,
pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> impl Future<Output = Result<(), Box<dyn Error, Global>>>where
IntoString: Into<String>,
OutItemType: Send + Debug,
OutStreamType: Stream<Item = OutType> + Send + 'static,
OutType: Future<Output = OutItemType> + Send,
CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
pub async fn spawn_futures_executor<IntoString, OutItemType, OutStreamType, OutType, CloseVoidAsyncType>( &self, concurrency_limit: u32, futures_timeout: Duration, pipeline_name: IntoString, pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> impl Future<Output = Result<(), Box<dyn Error, Global>>>where IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = OutItemType> + Send, CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
Spawns a new listener of all subsequent events sent to this Multi, processing them through the Stream returned by pipeline_builder(),
which generates events that are Futures.
sourcepub async fn spawn_futures_oldies_executor<IntoString, OutItemType, OldiesOutStreamType, NewiesOutStreamType, OldiesOutType, NewiesOutType, OldiesCloseVoidAsyncType, NewiesCloseVoidAsyncType>(
self: &Arc<Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>, Global>,
concurrency_limit: u32,
sequential_transition: bool,
futures_timeout: Duration,
oldies_pipeline_name: IntoString,
oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
newies_pipeline_name: IntoString,
newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static,
newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static
) -> impl Future<Output = Result<(), Box<dyn Error, Global>>>where
IntoString: Into<String>,
OutItemType: Send + Debug,
OldiesOutStreamType: Stream<Item = OldiesOutType> + Sync + Send + 'static,
NewiesOutStreamType: Stream<Item = NewiesOutType> + Sync + Send + 'static,
OldiesOutType: Future<Output = OutItemType> + Send,
NewiesOutType: Future<Output = OutItemType> + Send,
OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
pub async fn spawn_futures_oldies_executor<IntoString, OutItemType, OldiesOutStreamType, NewiesOutStreamType, OldiesOutType, NewiesOutType, OldiesCloseVoidAsyncType, NewiesCloseVoidAsyncType>( self: &Arc<Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>, Global>, concurrency_limit: u32, sequential_transition: bool, futures_timeout: Duration, oldies_pipeline_name: IntoString, oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType, oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static, newies_pipeline_name: IntoString, newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static, newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static ) -> impl Future<Output = Result<(), Box<dyn Error, Global>>>where IntoString: Into<String>, OutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = OldiesOutType> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = NewiesOutType> + Sync + Send + 'static, OldiesOutType: Future<Output = OutItemType> + Send, NewiesOutType: Future<Output = OutItemType> + Send, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this Multi:
- One for past events – to be processed by the stream returned by
oldies_pipeline_builder(); - Another one for subsequent events – to be processed by the stream returned by
newies_pipeline_builder(). By using this method, it is assumed that both pipeline builders returnsFutureevents. If this is not so, see one of the sibling methods.
The stream splitting is guaranteed not to drop any events andsequential_transitionmay be used to indicate if old events should be processed first or if both old and new events may be processed simultaneously (in an inevitable out-of-order fashion).
sourcepub async fn spawn_fallibles_executor<IntoString, OutItemType, OutStreamType, CloseVoidAsyncType>(
&self,
concurrency_limit: u32,
pipeline_name: IntoString,
pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
on_err_callback: impl Fn(Box<dyn Error + Send + Sync, Global>) + Send + Sync + 'static,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> impl Future<Output = Result<(), Box<dyn Error, Global>>>where
IntoString: Into<String>,
OutItemType: Send + Debug,
OutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync, Global>>> + Send + 'static,
CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
pub async fn spawn_fallibles_executor<IntoString, OutItemType, OutStreamType, CloseVoidAsyncType>( &self, concurrency_limit: u32, pipeline_name: IntoString, pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType, on_err_callback: impl Fn(Box<dyn Error + Send + Sync, Global>) + Send + Sync + 'static, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> impl Future<Output = Result<(), Box<dyn Error, Global>>>where IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync, Global>>> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
Spawns a new listener of all subsequent events sent to this Multi, processing them through the Stream returned by pipeline_builder(),
which generates events that are Fallible.
sourcepub async fn spawn_fallibles_oldies_executor<IntoString, OutItemType, OldiesOutStreamType, NewiesOutStreamType, OldiesCloseVoidAsyncType, NewiesCloseVoidAsyncType>(
self: &Arc<Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>, Global>,
concurrency_limit: u32,
sequential_transition: bool,
oldies_pipeline_name: IntoString,
oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
newies_pipeline_name: IntoString,
newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static,
newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static,
on_err_callback: impl Fn(Box<dyn Error + Send + Sync, Global>) + Send + Sync + 'static
) -> impl Future<Output = Result<(), Box<dyn Error, Global>>>where
IntoString: Into<String>,
OutItemType: Send + Debug,
OldiesOutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync, Global>>> + Sync + Send + 'static,
NewiesOutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync, Global>>> + Sync + Send + 'static,
OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
pub async fn spawn_fallibles_oldies_executor<IntoString, OutItemType, OldiesOutStreamType, NewiesOutStreamType, OldiesCloseVoidAsyncType, NewiesCloseVoidAsyncType>( self: &Arc<Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>, Global>, concurrency_limit: u32, sequential_transition: bool, oldies_pipeline_name: IntoString, oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType, oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static, newies_pipeline_name: IntoString, newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static, newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static, on_err_callback: impl Fn(Box<dyn Error + Send + Sync, Global>) + Send + Sync + 'static ) -> impl Future<Output = Result<(), Box<dyn Error, Global>>>where IntoString: Into<String>, OutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync, Global>>> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync, Global>>> + Sync + Send + 'static, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this Multi:
- One for past events – to be processed by the stream returned by
oldies_pipeline_builder(); - Another one for subsequent events – to be processed by the stream returned by
newies_pipeline_builder(). By using this method, it is assumed that both pipeline builders returns Fallible events. If this is not so, see one of the sibling methods.
The stream splitting is guaranteed not to drop any events andsequential_transitionmay be used to indicate if old events should be processed first or if both old and new events may be processed simultaneously (in an inevitable out-of-order fashion).
sourcepub async fn spawn_non_futures_non_fallible_executor<IntoString, OutItemType, OutStreamType, CloseVoidAsyncType>(
&self,
concurrency_limit: u32,
pipeline_name: IntoString,
pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> impl Future<Output = Result<(), Box<dyn Error, Global>>>where
IntoString: Into<String>,
OutItemType: Send + Debug,
OutStreamType: Stream<Item = OutItemType> + Send + 'static,
CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
pub async fn spawn_non_futures_non_fallible_executor<IntoString, OutItemType, OutStreamType, CloseVoidAsyncType>( &self, concurrency_limit: u32, pipeline_name: IntoString, pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> impl Future<Output = Result<(), Box<dyn Error, Global>>>where IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = OutItemType> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
Spawns a new listener of all subsequent events sent to this Multi, processing them through the Stream returned by pipeline_builder(),
which generates events that are Non-Futures & Non-Fallible.
sourcepub async fn spawn_non_futures_non_fallible_oldies_executor<IntoString, OldiesOutItemType, NewiesOutItemType, OldiesOutStreamType, NewiesOutStreamType, OldiesCloseVoidAsyncType, NewiesCloseVoidAsyncType>(
self: &Arc<Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>, Global>,
concurrency_limit: u32,
sequential_transition: bool,
oldies_pipeline_name: IntoString,
oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
newies_pipeline_name: IntoString,
newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType,
newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static
) -> impl Future<Output = Result<(), Box<dyn Error, Global>>>where
IntoString: Into<String>,
OldiesOutItemType: Send + Debug,
NewiesOutItemType: Send + Debug,
OldiesOutStreamType: Stream<Item = OldiesOutItemType> + Sync + Send + 'static,
NewiesOutStreamType: Stream<Item = NewiesOutItemType> + Sync + Send + 'static,
OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
pub async fn spawn_non_futures_non_fallible_oldies_executor<IntoString, OldiesOutItemType, NewiesOutItemType, OldiesOutStreamType, NewiesOutStreamType, OldiesCloseVoidAsyncType, NewiesCloseVoidAsyncType>( self: &Arc<Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>, Global>, concurrency_limit: u32, sequential_transition: bool, oldies_pipeline_name: IntoString, oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType, oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static, newies_pipeline_name: IntoString, newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType, newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync, Global>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static ) -> impl Future<Output = Result<(), Box<dyn Error, Global>>>where IntoString: Into<String>, OldiesOutItemType: Send + Debug, NewiesOutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = OldiesOutItemType> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = NewiesOutItemType> + Sync + Send + 'static, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static,
For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this Multi:
- One for past events – to be processed by the stream returned by
oldies_pipeline_builder(); - Another one for subsequent events – to be processed by the stream returned by
newies_pipeline_builder(). By using this method, it is assumed that both pipeline builders returns non-Futures & non-Fallible events. If this is not so, see [spawn_oldies_executor].
The stream splitting is guaranteed not to drop any events andsequential_transitionmay be used to indicate if old events should be processed first or if both old and new events may be processed simultaneously (in an inevitable out-of-order fashion).
sourcepub async fn close(&self, timeout: Duration) -> impl Future<Output = bool>
pub async fn close(&self, timeout: Duration) -> impl Future<Output = bool>
Closes this Multi, in isolation – flushing pending events, closing the producers,
waiting for all events to be fully processed and calling all executor’s “on close” callbacks.
If this Multi share resources with another one (which will get dumped by the “on close”
callback), most probably you want to close them atomically – see multis_close_async!().
Returns true if all events could be flushed within the given timeout.
sourcepub async fn flush_and_cancel_executor<IntoString>(
&self,
pipeline_name: IntoString,
timeout: Duration
) -> impl Future<Output = bool>where
IntoString: Into<String>,
pub async fn flush_and_cancel_executor<IntoString>( &self, pipeline_name: IntoString, timeout: Duration ) -> impl Future<Output = bool>where IntoString: Into<String>,
Asynchronously blocks until all resources associated with the executor responsible for pipeline_name are freed:
- immediately causes
pipeline_nameto cease receiving new elements by removing it from the active list - wait for all pending elements to be processed
- remove the queue/channel and wake the Stream to see that it has ended
- waits for the executor to inform it ceased its execution
- return, dropping all resources
Note it might make sense to spawn this operation by aTokio task, for it may block indefinitely if the Stream has no timeout.
Also note that timing out this operation is not advisable, for resources won’t be freed until it reaches the last step.
Returns false if there was no executor associated withpipeline_name.
Trait Implementations§
source§impl<ItemType, MultiChannelType, const INSTRUMENTS: usize, DerivedItemType> GenericMulti<INSTRUMENTS> for Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>where
ItemType: Debug + Send + Sync + 'static,
MultiChannelType: FullDuplexMultiChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Sync + Send + 'static,
DerivedItemType: Debug + Sync + Send + 'static,
impl<ItemType, MultiChannelType, const INSTRUMENTS: usize, DerivedItemType> GenericMulti<INSTRUMENTS> for Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>where ItemType: Debug + Send + Sync + 'static, MultiChannelType: FullDuplexMultiChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Sync + Send + 'static, DerivedItemType: Debug + Sync + Send + 'static,
source§const INSTRUMENTS: usize = INSTRUMENTS
const INSTRUMENTS: usize = INSTRUMENTS
§type MultiChannelType = MultiChannelType
type MultiChannelType = MultiChannelType
§type DerivedItemType = DerivedItemType
type DerivedItemType = DerivedItemType
Streams will yield§type MutinyStreamType = MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>
type MutinyStreamType = MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>
MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>,the concrete type for the
Stream of DerivedItemTypes to be given to listeners