Trait reactive_messaging::prelude::GenericUni
source · pub trait GenericUni {
type ItemType: Send + Sync + Debug + 'static;
type DerivedItemType: Send + Sync + Debug + 'static;
type UniChannelType: FullDuplexUniChannel<ItemType = Self::ItemType, DerivedItemType = Self::DerivedItemType> + Send + Sync + 'static;
type MutinyStreamType;
const INSTRUMENTS: usize;
// Required methods
fn new<IntoString>(uni_name: IntoString) -> Self
where IntoString: Into<String>;
fn send(
&self,
item: Self::ItemType
) -> RetryResult<(), Self::ItemType, (), ()>;
fn send_with<F>(&self, setter: F) -> RetryResult<(), F, (), ()>
where F: FnOnce(&mut Self::ItemType);
fn consumer_stream(
self
) -> (Arc<Self>, Vec<MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>>);
fn buffer_size(&self) -> u32;
fn pending_items_count(&self) -> u32;
fn flush<'life0, 'async_trait>(
&'life0 self,
timeout: Duration
) -> Pin<Box<dyn Future<Output = u32> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait;
fn close<'life0, 'async_trait>(
&'life0 self,
timeout: Duration
) -> Pin<Box<dyn Future<Output = bool> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait;
fn spawn_executors<OutItemType, OutStreamType, OutType, ErrVoidAsyncType, CloseVoidAsyncType>(
self,
concurrency_limit: u32,
futures_timeout: Duration,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_err_callback: impl Fn(Box<dyn Error + Sync + Send>) -> ErrVoidAsyncType + Send + Sync + 'static,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Sync + Send>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> Arc<Self>
where OutItemType: Send + Debug,
OutStreamType: Stream<Item = OutType> + Send + 'static,
OutType: Future<Output = Result<OutItemType, Box<dyn Error + Sync + Send>>> + Send,
ErrVoidAsyncType: Future<Output = ()> + Send + 'static,
CloseVoidAsyncType: Future<Output = ()> + Send + 'static;
fn spawn_fallibles_executors<OutItemType, OutStreamType, CloseVoidAsyncType>(
self,
concurrency_limit: u32,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_err_callback: impl Fn(Box<dyn Error + Sync + Send>) + Send + Sync + 'static,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Sync + Send>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> Arc<Self>
where OutItemType: Send + Debug,
OutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Sync + Send>>> + Send + 'static,
CloseVoidAsyncType: Future<Output = ()> + Send + 'static;
fn spawn_futures_executors<OutItemType, OutStreamType, OutType, CloseVoidAsyncType>(
self,
concurrency_limit: u32,
futures_timeout: Duration,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Sync + Send>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> Arc<Self>
where OutItemType: Send + Debug,
OutStreamType: Stream<Item = OutType> + Send + 'static,
OutType: Future<Output = OutItemType> + Send,
CloseVoidAsyncType: Future<Output = ()> + Send + 'static;
fn spawn_non_futures_non_fallibles_executors<OutItemType, OutStreamType, CloseVoidAsyncType>(
self,
concurrency_limit: u32,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Sync + Send>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> Arc<Self>
where OutItemType: Send + Debug,
OutStreamType: Stream<Item = OutItemType> + Send + 'static,
CloseVoidAsyncType: Future<Output = ()> + Send + 'static;
}Expand description
This trait exists to allow simplifying generic declarations of concrete Uni types.
See also [GenericMulti].
Usage:
struct MyGenericStruct<T: GenericUni> { the_uni: T }
let the_uni = Uni<Lots,And,Lots<Of,Generic,Arguments>>::new();
let my_struct = MyGenericStruct { the_uni };
// see more at `tests/use_cases.rs`Required Associated Types§
sourcetype ItemType: Send + Sync + Debug + 'static
type ItemType: Send + Sync + Debug + 'static
The payload type this Uni’s producers will receive
sourcetype DerivedItemType: Send + Sync + Debug + 'static
type DerivedItemType: Send + Sync + Debug + 'static
The payload type this Uni’s Streams will yield
sourcetype UniChannelType: FullDuplexUniChannel<ItemType = Self::ItemType, DerivedItemType = Self::DerivedItemType> + Send + Sync + 'static
type UniChannelType: FullDuplexUniChannel<ItemType = Self::ItemType, DerivedItemType = Self::DerivedItemType> + Send + Sync + 'static
The channel through which payloads will travel from producers to consumers (see Uni for more info)
sourcetype MutinyStreamType
type MutinyStreamType
Defined as MutinyStream<'static, ItemType, UniChannelType, DerivedItemType>,
the concrete type for the Stream of DerivedItemTypes to be given to consumers
Required Associated Constants§
sourceconst INSTRUMENTS: usize
const INSTRUMENTS: usize
The instruments this Uni will collect/report
Required Methods§
sourcefn new<IntoString>(uni_name: IntoString) -> Self
fn new<IntoString>(uni_name: IntoString) -> Self
Creates a Uni, which implements the consumer pattern, capable of:
- creating
Streams; - applying a user-provided
processorto theStreams and executing them to depletion – the finalStreams may produce a combination of fallible/non-fallible & futures/non-futures events; - producing events that are sent to those
Streams.uni_nameis used for instrumentation purposes, depending on theINSTRUMENTgeneric argument passed to the Uni struct.
fn send(&self, item: Self::ItemType) -> RetryResult<(), Self::ItemType, (), ()>
fn send_with<F>(&self, setter: F) -> RetryResult<(), F, (), ()>
sourcefn consumer_stream(
self
) -> (Arc<Self>, Vec<MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>>)
fn consumer_stream( self ) -> (Arc<Self>, Vec<MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>>)
Sets this Uni to return Streams instead of executing them
sourcefn buffer_size(&self) -> u32
fn buffer_size(&self) -> u32
Tells the limit number of events that might be, at any given time, awaiting consumption from the active Streams
– when exceeded, Self::send() & Self::send_with() will fail until consumption progresses
sourcefn pending_items_count(&self) -> u32
fn pending_items_count(&self) -> u32
Tells how many events (collected by Self::send() or Self::send_with()) are waiting to be
consumed by the active Streams
sourcefn flush<'life0, 'async_trait>(
&'life0 self,
timeout: Duration
) -> Pin<Box<dyn Future<Output = u32> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn flush<'life0, 'async_trait>(
&'life0 self,
timeout: Duration
) -> Pin<Box<dyn Future<Output = u32> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Waits (up to duration) until Self::pending_items_count() is zero – possibly waking some tasks awaiting on the active Streams.
Returns the pending items – which will be non-zero if timeout expired.
sourcefn close<'life0, 'async_trait>(
&'life0 self,
timeout: Duration
) -> Pin<Box<dyn Future<Output = bool> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn close<'life0, 'async_trait>(
&'life0 self,
timeout: Duration
) -> Pin<Box<dyn Future<Output = bool> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Closes this Uni, in isolation – flushing pending events, closing the producers,
waiting for all events to be fully processed and calling the “on close” callback.
Returns false if the timeout kicked-in before it could be attested that the closing was complete.
If this Uni share resources with another one (which will get dumped by the “on close”
callback), most probably you want to close them atomically – see unis_close_async!()
sourcefn spawn_executors<OutItemType, OutStreamType, OutType, ErrVoidAsyncType, CloseVoidAsyncType>(
self,
concurrency_limit: u32,
futures_timeout: Duration,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_err_callback: impl Fn(Box<dyn Error + Sync + Send>) -> ErrVoidAsyncType + Send + Sync + 'static,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Sync + Send>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> Arc<Self>
fn spawn_executors<OutItemType, OutStreamType, OutType, ErrVoidAsyncType, CloseVoidAsyncType>( self, concurrency_limit: u32, futures_timeout: Duration, pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType, on_err_callback: impl Fn(Box<dyn Error + Sync + Send>) -> ErrVoidAsyncType + Send + Sync + 'static, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Sync + Send>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> Arc<Self>
Spawns an optimized executor for the Stream returned by pipeline_builder(), provided it produces elements which are Future & fallible
(Actually, as many consumers as MAX_STREAMS will be spawned).
on_close_callback(stats) is called when this Uni (and all Streams) are closed.
on_err_callback(error) is called whenever the Stream returns an Err element.
sourcefn spawn_fallibles_executors<OutItemType, OutStreamType, CloseVoidAsyncType>(
self,
concurrency_limit: u32,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_err_callback: impl Fn(Box<dyn Error + Sync + Send>) + Send + Sync + 'static,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Sync + Send>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> Arc<Self>
fn spawn_fallibles_executors<OutItemType, OutStreamType, CloseVoidAsyncType>( self, concurrency_limit: u32, pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType, on_err_callback: impl Fn(Box<dyn Error + Sync + Send>) + Send + Sync + 'static, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Sync + Send>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> Arc<Self>
Spawns an optimized executor for the Stream returned by pipeline_builder(), provided it produces elements which are fallible & non-future
(Actually, as many consumers as MAX_STREAMS will be spawned).
on_close_callback(stats) is called when this Uni (and all Streams) are closed.
on_err_callback(error) is called whenever the Stream returns an Err element.
sourcefn spawn_futures_executors<OutItemType, OutStreamType, OutType, CloseVoidAsyncType>(
self,
concurrency_limit: u32,
futures_timeout: Duration,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Sync + Send>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> Arc<Self>
fn spawn_futures_executors<OutItemType, OutStreamType, OutType, CloseVoidAsyncType>( self, concurrency_limit: u32, futures_timeout: Duration, pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Sync + Send>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> Arc<Self>
Spawns an optimized executor for the Stream returned by pipeline_builder(), provided it produces elements which are Future & non-fallible
(Actually, as many consumers as MAX_STREAMS will be spawned).
on_close_callback(stats) is called when this Uni (and all Streams) are closed.
sourcefn spawn_non_futures_non_fallibles_executors<OutItemType, OutStreamType, CloseVoidAsyncType>(
self,
concurrency_limit: u32,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Sync + Send>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> Arc<Self>
fn spawn_non_futures_non_fallibles_executors<OutItemType, OutStreamType, CloseVoidAsyncType>( self, concurrency_limit: u32, pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Sync + Send>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> Arc<Self>
Spawns an optimized executor for the Stream returned by pipeline_builder(), provided it produces elements which are non-future & non-fallible
(Actually, as many consumers as MAX_STREAMS will be spawned).
on_close_callback(stats) is called when this Uni (and all Streams) are closed.