pub struct Multi<ItemType: Debug + Sync + Send + 'static, MultiChannelType: FullDuplexMultiChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Sync + Send + 'static, const INSTRUMENTS: usize, DerivedItemType: Debug + Sync + Send + 'static> {
pub multi_name: String,
pub channel: Arc<MultiChannelType>,
pub executor_infos: RwLock<IndexMap<String, ExecutorInfo>>,
/* private fields */
}
Expand description
Multi
is an event handler capable of having several “listeners” – all of which receives all events.
With this struct, it is possible to:
- produce events
- spawn new
Stream
s & executors - close
Stream
s (and executors)
Example:
{reactive_mutiny::Instruments::MetricsWithoutLogs.into()}
Fields§
§multi_name: String
§channel: Arc<MultiChannelType>
§executor_infos: RwLock<IndexMap<String, ExecutorInfo>>
Implementations§
Source§impl<ItemType: Debug + Send + Sync + 'static, MultiChannelType: FullDuplexMultiChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Sync + Send + 'static, const INSTRUMENTS: usize, DerivedItemType: Debug + Sync + Send + 'static> Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>
impl<ItemType: Debug + Send + Sync + 'static, MultiChannelType: FullDuplexMultiChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Sync + Send + 'static, const INSTRUMENTS: usize, DerivedItemType: Debug + Sync + Send + 'static> Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>
Sourcepub fn new<IntoString: Into<String>>(multi_name: IntoString) -> Self
pub fn new<IntoString: Into<String>>(multi_name: IntoString) -> Self
Creates a Multi, which implements the listener pattern
, capable of:
- creating
Stream
s; - applying a user-provided
processor
to theStream
s and executing them to depletion – the finalStream
s may produce a combination of fallible/non-fallible & futures/non-futures events; - producing events that are sent to those
Stream
s.
multi_name
is used for instrumentation purposes, depending on the INSTRUMENT
generic
argument passed to the Multi struct.
pub fn send(&self, item: ItemType) -> RetryConsumerResult<(), ItemType, ()>
pub fn send_with<F: FnOnce(&mut ItemType)>( &self, setter: F, ) -> RetryConsumerResult<(), F, ()>
pub fn send_derived(&self, arc_item: &DerivedItemType) -> bool
pub fn buffer_size(&self) -> u32
pub fn pending_items_count(&self) -> u32
Sourcepub async fn spawn_executor<IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send, ErrVoidAsyncType: Future<Output = ()> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
&self,
concurrency_limit: u32,
futures_timeout: Duration,
pipeline_name: IntoString,
pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static,
) -> Result<(), Box<dyn Error>>
pub async fn spawn_executor<IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send, ErrVoidAsyncType: Future<Output = ()> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>( &self, concurrency_limit: u32, futures_timeout: Duration, pipeline_name: IntoString, pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static, ) -> Result<(), Box<dyn Error>>
Spawns a new listener of all subsequent events sent to this Multi
, processing them through the Stream
returned by pipeline_builder()
,
which generates events that are Futures & Fallible.
Sourcepub async fn spawn_oldies_executor<IntoString: Into<String>, OutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = OldiesOutType> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = NewiesOutType> + Sync + Send + 'static, OldiesOutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send, NewiesOutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send, ErrVoidAsyncType: Future<Output = ()> + Send + 'static, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
self: &Arc<Self>,
concurrency_limit: u32,
sequential_transition: bool,
futures_timeout: Duration,
oldies_pipeline_name: IntoString,
oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
newies_pipeline_name: IntoString,
newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static,
newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static,
on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static,
) -> Result<(), Box<dyn Error>>
pub async fn spawn_oldies_executor<IntoString: Into<String>, OutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = OldiesOutType> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = NewiesOutType> + Sync + Send + 'static, OldiesOutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send, NewiesOutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send, ErrVoidAsyncType: Future<Output = ()> + Send + 'static, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static>( self: &Arc<Self>, concurrency_limit: u32, sequential_transition: bool, futures_timeout: Duration, oldies_pipeline_name: IntoString, oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType, oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static, newies_pipeline_name: IntoString, newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static, newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static, ) -> Result<(), Box<dyn Error>>
For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this Multi
:
- One for past events – to be processed by the stream returned by
oldies_pipeline_builder()
; - Another one for subsequent events – to be processed by the stream returned by
newies_pipeline_builder()
.
By using this method, it is assumed that both pipeline builders returns Future<Result>
events. If this is not so, see one of the sibling methods.
The stream splitting is guaranteed not to drop any events and sequential_transition
may be used to indicate if old events should be processed first or if both old and new events
may be processed simultaneously (in an inevitable out-of-order fashion).
Sourcepub async fn spawn_futures_executor<IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = OutItemType> + Send, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
&self,
concurrency_limit: u32,
futures_timeout: Duration,
pipeline_name: IntoString,
pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static,
) -> Result<(), Box<dyn Error>>
pub async fn spawn_futures_executor<IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = OutItemType> + Send, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>( &self, concurrency_limit: u32, futures_timeout: Duration, pipeline_name: IntoString, pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static, ) -> Result<(), Box<dyn Error>>
Spawns a new listener of all subsequent events sent to this Multi
, processing them through the Stream
returned by pipeline_builder()
,
which generates events that are Futures.
Sourcepub async fn spawn_futures_oldies_executor<IntoString: Into<String>, OutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = OldiesOutType> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = NewiesOutType> + Sync + Send + 'static, OldiesOutType: Future<Output = OutItemType> + Send, NewiesOutType: Future<Output = OutItemType> + Send, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
self: &Arc<Self>,
concurrency_limit: u32,
sequential_transition: bool,
futures_timeout: Duration,
oldies_pipeline_name: IntoString,
oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
newies_pipeline_name: IntoString,
newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static,
newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static,
) -> Result<(), Box<dyn Error>>
pub async fn spawn_futures_oldies_executor<IntoString: Into<String>, OutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = OldiesOutType> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = NewiesOutType> + Sync + Send + 'static, OldiesOutType: Future<Output = OutItemType> + Send, NewiesOutType: Future<Output = OutItemType> + Send, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static>( self: &Arc<Self>, concurrency_limit: u32, sequential_transition: bool, futures_timeout: Duration, oldies_pipeline_name: IntoString, oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType, oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static, newies_pipeline_name: IntoString, newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static, newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static, ) -> Result<(), Box<dyn Error>>
For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this Multi
:
- One for past events – to be processed by the stream returned by
oldies_pipeline_builder()
; - Another one for subsequent events – to be processed by the stream returned by
newies_pipeline_builder()
.
By using this method, it is assumed that both pipeline builders returns Future
events. If this is not so, see one of the sibling methods.
The stream splitting is guaranteed not to drop any events and sequential_transition
may be used to indicate if old events should be processed first or if both old and new events
may be processed simultaneously (in an inevitable out-of-order fashion).
Sourcepub async fn spawn_fallibles_executor<IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
&self,
concurrency_limit: u32,
pipeline_name: IntoString,
pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) + Send + Sync + 'static,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static,
) -> Result<(), Box<dyn Error>>
pub async fn spawn_fallibles_executor<IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>( &self, concurrency_limit: u32, pipeline_name: IntoString, pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) + Send + Sync + 'static, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static, ) -> Result<(), Box<dyn Error>>
Spawns a new listener of all subsequent events sent to this Multi
, processing them through the Stream
returned by pipeline_builder()
,
which generates events that are Fallible.
Sourcepub async fn spawn_fallibles_oldies_executor<IntoString: Into<String>, OutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Sync + Send + 'static, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
self: &Arc<Self>,
concurrency_limit: u32,
sequential_transition: bool,
oldies_pipeline_name: IntoString,
oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
newies_pipeline_name: IntoString,
newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static,
newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static,
on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) + Send + Sync + 'static,
) -> Result<(), Box<dyn Error>>
pub async fn spawn_fallibles_oldies_executor<IntoString: Into<String>, OutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Sync + Send + 'static, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static>( self: &Arc<Self>, concurrency_limit: u32, sequential_transition: bool, oldies_pipeline_name: IntoString, oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType, oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static, newies_pipeline_name: IntoString, newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static, newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) + Send + Sync + 'static, ) -> Result<(), Box<dyn Error>>
For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this Multi
:
- One for past events – to be processed by the stream returned by
oldies_pipeline_builder()
; - Another one for subsequent events – to be processed by the stream returned by
newies_pipeline_builder()
.
By using this method, it is assumed that both pipeline builders returns Fallible events. If this is not so, see one of the sibling methods.
The stream splitting is guaranteed not to drop any events and sequential_transition
may be used to indicate if old events should be processed first or if both old and new events
may be processed simultaneously (in an inevitable out-of-order fashion).
Sourcepub async fn spawn_non_futures_non_fallible_executor<IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = OutItemType> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
&self,
concurrency_limit: u32,
pipeline_name: IntoString,
pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static,
) -> Result<(), Box<dyn Error>>
pub async fn spawn_non_futures_non_fallible_executor<IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = OutItemType> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>( &self, concurrency_limit: u32, pipeline_name: IntoString, pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static, ) -> Result<(), Box<dyn Error>>
Spawns a new listener of all subsequent events sent to this Multi
, processing them through the Stream
returned by pipeline_builder()
,
which generates events that are Non-Futures & Non-Fallible.
Sourcepub async fn spawn_non_futures_non_fallible_oldies_executor<IntoString: Into<String>, OldiesOutItemType: Send + Debug, NewiesOutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = OldiesOutItemType> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = NewiesOutItemType> + Sync + Send + 'static, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
self: &Arc<Self>,
concurrency_limit: u32,
sequential_transition: bool,
oldies_pipeline_name: IntoString,
oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
newies_pipeline_name: IntoString,
newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType,
newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static,
) -> Result<(), Box<dyn Error>>
pub async fn spawn_non_futures_non_fallible_oldies_executor<IntoString: Into<String>, OldiesOutItemType: Send + Debug, NewiesOutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = OldiesOutItemType> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = NewiesOutItemType> + Sync + Send + 'static, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static>( self: &Arc<Self>, concurrency_limit: u32, sequential_transition: bool, oldies_pipeline_name: IntoString, oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType, oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static, newies_pipeline_name: IntoString, newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType, newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static, ) -> Result<(), Box<dyn Error>>
For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this Multi
:
- One for past events – to be processed by the stream returned by
oldies_pipeline_builder()
; - Another one for subsequent events – to be processed by the stream returned by
newies_pipeline_builder()
.
By using this method, it is assumed that both pipeline builders returns non-Futures & non-Fallible events. If this is not so, see [spawn_oldies_executor].
The stream splitting is guaranteed not to drop any events and sequential_transition
may be used to indicate if old events should be processed first or if both old and new events
may be processed simultaneously (in an inevitable out-of-order fashion).
Sourcepub async fn close(&self, timeout: Duration) -> bool
pub async fn close(&self, timeout: Duration) -> bool
Closes this Multi
, in isolation – flushing pending events, closing the producers,
waiting for all events to be fully processed and calling all executor’s “on close” callbacks.
If this Multi
share resources with another one (which will get dumped by the “on close”
callback), most probably you want to close them atomically – see multis_close_async!().
Returns true
if all events could be flushed within the given timeout
.
Sourcepub async fn flush_and_cancel_executor<IntoString: Into<String>>(
&self,
pipeline_name: IntoString,
timeout: Duration,
) -> bool
pub async fn flush_and_cancel_executor<IntoString: Into<String>>( &self, pipeline_name: IntoString, timeout: Duration, ) -> bool
Asynchronously blocks until all resources associated with the executor responsible for pipeline_name
are freed:
- immediately causes
pipeline_name
to cease receiving new elements by removing it from the active list - wait for all pending elements to be processed
- remove the queue/channel and wake the Stream to see that it has ended
- waits for the executor to inform it ceased its execution
- return, dropping all resources.
Note it might make sense to spawn this operation by a Tokio task
, for it may block indefinitely if the Stream has no timeout.
Also note that timing out this operation is not advisable, for resources won’t be freed until it reaches the last step.
Returns false if there was no executor associated with pipeline_name
.
Trait Implementations§
Source§impl<ItemType: Debug + Send + Sync + 'static, MultiChannelType: FullDuplexMultiChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Sync + Send + 'static, const INSTRUMENTS: usize, DerivedItemType: Debug + Sync + Send + 'static> GenericMulti<INSTRUMENTS> for Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>
impl<ItemType: Debug + Send + Sync + 'static, MultiChannelType: FullDuplexMultiChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Sync + Send + 'static, const INSTRUMENTS: usize, DerivedItemType: Debug + Sync + Send + 'static> GenericMulti<INSTRUMENTS> for Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>
Source§const INSTRUMENTS: usize = INSTRUMENTS
const INSTRUMENTS: usize = INSTRUMENTS
Source§type MultiChannelType = MultiChannelType
type MultiChannelType = MultiChannelType
Source§type DerivedItemType = DerivedItemType
type DerivedItemType = DerivedItemType
Stream
s will yieldSource§type MutinyStreamType = MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>
type MutinyStreamType = MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>
MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>
,the concrete type for the
Stream
of DerivedItemType
s to be given to listeners