Struct reactive_mutiny::multi::Multi
source · pub struct Multi<ItemType: Debug + Sync + Send + 'static, MultiChannelType: FullDuplexMultiChannel<'static, ItemType, DerivedItemType> + Sync + Send, const INSTRUMENTS: usize, DerivedItemType: Debug + Sync + Send + 'static> {
pub multi_name: String,
pub channel: Arc<MultiChannelType>,
pub executor_infos: RwLock<IndexMap<String, ExecutorInfo<INSTRUMENTS>>>,
/* private fields */
}Expand description
Multi is an event handler capable of having several “listeners” – all of which receives all events.
With this struct, it is possible to:
- produce events
- spawn new
Streams & executors - close
Streams (and executors) Example:
{reactive_mutiny::Instruments::MetricsWithoutLogs.into()}Fields§
§multi_name: String§channel: Arc<MultiChannelType>§executor_infos: RwLock<IndexMap<String, ExecutorInfo<INSTRUMENTS>>>Implementations§
source§impl<ItemType: Debug + Send + Sync + 'static, MultiChannelType: FullDuplexMultiChannel<'static, ItemType, DerivedItemType> + Sync + Send + 'static, const INSTRUMENTS: usize, DerivedItemType: Debug + Sync + Send + 'static> Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>
impl<ItemType: Debug + Send + Sync + 'static, MultiChannelType: FullDuplexMultiChannel<'static, ItemType, DerivedItemType> + Sync + Send + 'static, const INSTRUMENTS: usize, DerivedItemType: Debug + Sync + Send + 'static> Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType>
pub fn new<IntoString: Into<String>>(multi_name: IntoString) -> Self
pub fn stream_name(&self) -> &str
pub fn try_send<F: FnOnce(&mut ItemType)>(&self, setter: F) -> bool
pub fn send<F: FnOnce(&mut ItemType)>(&self, setter: F)
pub fn send_derived(&self, arc_item: &DerivedItemType)
pub fn try_send_movable(&self, item: ItemType) -> bool
pub fn buffer_size(&self) -> u32
pub fn pending_items_count(&self) -> u32
sourcepub async fn spawn_executor<IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send, ErrVoidAsyncType: Future<Output = ()> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
&self,
concurrency_limit: u32,
futures_timeout: Duration,
pipeline_name: IntoString,
pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static,
on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> Result<(), Box<dyn Error>>
pub async fn spawn_executor<IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send, ErrVoidAsyncType: Future<Output = ()> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>( &self, concurrency_limit: u32, futures_timeout: Duration, pipeline_name: IntoString, pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static, on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> Result<(), Box<dyn Error>>
Spawns a new listener of all subsequent events sent to this Multi, processing them through the Stream returned by pipeline_builder(),
which generates events that are Futures & Fallible.
sourcepub async fn spawn_oldies_executor<IntoString: Into<String>, OutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = OldiesOutType> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = NewiesOutType> + Sync + Send + 'static, OldiesOutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send, NewiesOutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send, ErrVoidAsyncType: Future<Output = ()> + Send + 'static, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
self: &Arc<Self>,
concurrency_limit: u32,
sequential_transition: bool,
futures_timeout: Duration,
oldies_pipeline_name: IntoString,
oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
oldies_on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
newies_pipeline_name: IntoString,
newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static,
newies_on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static,
on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static
) -> Result<(), Box<dyn Error>>
pub async fn spawn_oldies_executor<IntoString: Into<String>, OutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = OldiesOutType> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = NewiesOutType> + Sync + Send + 'static, OldiesOutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send, NewiesOutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send, ErrVoidAsyncType: Future<Output = ()> + Send + 'static, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static>( self: &Arc<Self>, concurrency_limit: u32, sequential_transition: bool, futures_timeout: Duration, oldies_pipeline_name: IntoString, oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType, oldies_on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static, newies_pipeline_name: IntoString, newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static, newies_on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static ) -> Result<(), Box<dyn Error>>
For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this Multi:
- One for past events – to be processed by the stream returned by
oldies_pipeline_builder(); - Another one for subsequent events – to be processed by the stream returned by
newies_pipeline_builder(). By using this method, it is assumed that both pipeline builders returnsFuture<Result>events. If this is not so, see one of the sibling methods.
The stream splitting is guaranteed not to drop any events andsequential_transitionmay be used to indicate if old events should be processed first or if both old and new events may be processed simultaneously (in an inevitable out-of-order fashion).
sourcepub async fn spawn_futures_executor<IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = OutItemType> + Send, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
&self,
concurrency_limit: u32,
futures_timeout: Duration,
pipeline_name: IntoString,
pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> Result<(), Box<dyn Error>>
pub async fn spawn_futures_executor<IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = OutItemType> + Send, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>( &self, concurrency_limit: u32, futures_timeout: Duration, pipeline_name: IntoString, pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType, on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> Result<(), Box<dyn Error>>
Spawns a new listener of all subsequent events sent to this Multi, processing them through the Stream returned by pipeline_builder(),
which generates events that are Futures.
sourcepub async fn spawn_futures_oldies_executor<IntoString: Into<String>, OutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = OldiesOutType> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = NewiesOutType> + Sync + Send + 'static, OldiesOutType: Future<Output = OutItemType> + Send, NewiesOutType: Future<Output = OutItemType> + Send, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
self: &Arc<Self>,
concurrency_limit: u32,
sequential_transition: bool,
futures_timeout: Duration,
oldies_pipeline_name: IntoString,
oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
oldies_on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
newies_pipeline_name: IntoString,
newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static,
newies_on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static
) -> Result<(), Box<dyn Error>>
pub async fn spawn_futures_oldies_executor<IntoString: Into<String>, OutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = OldiesOutType> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = NewiesOutType> + Sync + Send + 'static, OldiesOutType: Future<Output = OutItemType> + Send, NewiesOutType: Future<Output = OutItemType> + Send, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static>( self: &Arc<Self>, concurrency_limit: u32, sequential_transition: bool, futures_timeout: Duration, oldies_pipeline_name: IntoString, oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType, oldies_on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static, newies_pipeline_name: IntoString, newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static, newies_on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static ) -> Result<(), Box<dyn Error>>
For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this Multi:
- One for past events – to be processed by the stream returned by
oldies_pipeline_builder(); - Another one for subsequent events – to be processed by the stream returned by
newies_pipeline_builder(). By using this method, it is assumed that both pipeline builders returnsFutureevents. If this is not so, see one of the sibling methods.
The stream splitting is guaranteed not to drop any events andsequential_transitionmay be used to indicate if old events should be processed first or if both old and new events may be processed simultaneously (in an inevitable out-of-order fashion).
sourcepub async fn spawn_fallibles_executor<IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
&self,
concurrency_limit: u32,
pipeline_name: IntoString,
pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) + Send + Sync + 'static,
on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> Result<(), Box<dyn Error>>
pub async fn spawn_fallibles_executor<IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>( &self, concurrency_limit: u32, pipeline_name: IntoString, pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) + Send + Sync + 'static, on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> Result<(), Box<dyn Error>>
Spawns a new listener of all subsequent events sent to this Multi, processing them through the Stream returned by pipeline_builder(),
which generates events that are Fallible.
sourcepub async fn spawn_fallibles_oldies_executor<IntoString: Into<String>, OutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Sync + Send + 'static, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
self: &Arc<Self>,
concurrency_limit: u32,
sequential_transition: bool,
oldies_pipeline_name: IntoString,
oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
oldies_on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
newies_pipeline_name: IntoString,
newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static,
newies_on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static,
on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) + Send + Sync + 'static
) -> Result<(), Box<dyn Error>>
pub async fn spawn_fallibles_oldies_executor<IntoString: Into<String>, OutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Sync + Send + 'static, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static>( self: &Arc<Self>, concurrency_limit: u32, sequential_transition: bool, oldies_pipeline_name: IntoString, oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType, oldies_on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static, newies_pipeline_name: IntoString, newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static, newies_on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) + Send + Sync + 'static ) -> Result<(), Box<dyn Error>>
For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this Multi:
- One for past events – to be processed by the stream returned by
oldies_pipeline_builder(); - Another one for subsequent events – to be processed by the stream returned by
newies_pipeline_builder(). By using this method, it is assumed that both pipeline builders returns Fallible events. If this is not so, see one of the sibling methods.
The stream splitting is guaranteed not to drop any events andsequential_transitionmay be used to indicate if old events should be processed first or if both old and new events may be processed simultaneously (in an inevitable out-of-order fashion).
sourcepub async fn spawn_non_futures_non_fallible_executor<IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = OutItemType> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
&self,
concurrency_limit: u32,
pipeline_name: IntoString,
pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> Result<(), Box<dyn Error>>
pub async fn spawn_non_futures_non_fallible_executor<IntoString: Into<String>, OutItemType: Send + Debug, OutStreamType: Stream<Item = OutItemType> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>( &self, concurrency_limit: u32, pipeline_name: IntoString, pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType, on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> Result<(), Box<dyn Error>>
Spawns a new listener of all subsequent events sent to this Multi, processing them through the Stream returned by pipeline_builder(),
which generates events that are Non-Futures & Non-Fallible.
sourcepub async fn spawn_non_futures_non_fallible_oldies_executor<IntoString: Into<String>, OldiesOutItemType: Send + Debug, NewiesOutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = OldiesOutItemType> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = NewiesOutItemType> + Sync + Send + 'static, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
self: &Arc<Self>,
concurrency_limit: u32,
sequential_transition: bool,
oldies_pipeline_name: IntoString,
oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
oldies_on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
newies_pipeline_name: IntoString,
newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType,
newies_on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static
) -> Result<(), Box<dyn Error>>
pub async fn spawn_non_futures_non_fallible_oldies_executor<IntoString: Into<String>, OldiesOutItemType: Send + Debug, NewiesOutItemType: Send + Debug, OldiesOutStreamType: Stream<Item = OldiesOutItemType> + Sync + Send + 'static, NewiesOutStreamType: Stream<Item = NewiesOutItemType> + Sync + Send + 'static, OldiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static, NewiesCloseVoidAsyncType: Future<Output = ()> + Send + 'static>( self: &Arc<Self>, concurrency_limit: u32, sequential_transition: bool, oldies_pipeline_name: IntoString, oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType, oldies_on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static, newies_pipeline_name: IntoString, newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType, newies_on_close_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static ) -> Result<(), Box<dyn Error>>
For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this Multi:
- One for past events – to be processed by the stream returned by
oldies_pipeline_builder(); - Another one for subsequent events – to be processed by the stream returned by
newies_pipeline_builder(). By using this method, it is assumed that both pipeline builders returns non-Futures & non-Fallible events. If this is not so, see [spawn_oldies_executor].
The stream splitting is guaranteed not to drop any events andsequential_transitionmay be used to indicate if old events should be processed first or if both old and new events may be processed simultaneously (in an inevitable out-of-order fashion).
sourcepub async fn close(&self, timeout: Duration) -> bool
pub async fn close(&self, timeout: Duration) -> bool
Closes this Multi, in isolation – flushing pending events, closing the producers,
waiting for all events to be fully processed and calling all executor’s “on close” callbacks.
If this Multi share resources with another one (which will get dumped by the “on close”
callback), most probably you want to close them atomically – see multis_close_async!()
sourcepub async fn flush_and_cancel_executor<IntoString: Into<String>>(
&self,
pipeline_name: IntoString,
timeout: Duration
) -> bool
pub async fn flush_and_cancel_executor<IntoString: Into<String>>( &self, pipeline_name: IntoString, timeout: Duration ) -> bool
Asynchronously blocks until all resources associated with the executor responsible for pipeline_name are freed:
- immediately causes
pipeline_nameto cease receiving new elements by removing it from the active list - wait for all pending elements to be processed
- remove the queue/channel and wake the Stream to see that it has ended
- waits for the executor to inform it ceased its execution
- return, dropping all resources
Note it might make sense to spawn this operation by aTokio task, for it may block indefinitely if the Stream has no timeout.
Also note that timing out this operation is not advisable, for resources won’t be freed until it reaches the last step.
Returns false if there was no executor associated withpipeline_name.