Struct reactive_messaging::prelude::Uni
source · pub struct Uni<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType = ItemType>where
ItemType: Send + Sync + Debug + 'static,
UniChannelType: FullDuplexUniChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Send + Sync + 'static,
DerivedItemType: Send + Sync + Debug + 'static,{
pub channel: Arc<UniChannelType>,
pub stream_executors: Vec<Arc<StreamExecutor<INSTRUMENTS>>>,
pub finished_executors_count: AtomicU32,
/* private fields */
}Expand description
Contains the producer-side Uni handle used to interact with the uni event
– for closing the stream, requiring stats, …
Fields§
§channel: Arc<UniChannelType>§stream_executors: Vec<Arc<StreamExecutor<INSTRUMENTS>>>§finished_executors_count: AtomicU32Trait Implementations§
source§impl<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType> GenericUni for Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>where
ItemType: Send + Sync + Debug + 'static,
UniChannelType: FullDuplexUniChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Send + Sync + 'static,
DerivedItemType: Send + Sync + Debug + 'static,
impl<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType> GenericUni for Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>where ItemType: Send + Sync + Debug + 'static, UniChannelType: FullDuplexUniChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Send + Sync + 'static, DerivedItemType: Send + Sync + Debug + 'static,
source§const INSTRUMENTS: usize = INSTRUMENTS
const INSTRUMENTS: usize = INSTRUMENTS
The instruments this Uni will collect/report
§type UniChannelType = UniChannelType
type UniChannelType = UniChannelType
The channel through which payloads will travel from producers to consumers (see Uni for more info)
§type DerivedItemType = DerivedItemType
type DerivedItemType = DerivedItemType
The payload type this Uni’s
Streams will yield§type MutinyStreamType = MutinyStream<'static, ItemType, UniChannelType, DerivedItemType>
type MutinyStreamType = MutinyStream<'static, ItemType, UniChannelType, DerivedItemType>
Defined as
the concrete type for the
MutinyStream<'static, ItemType, UniChannelType, DerivedItemType>,the concrete type for the
Stream of DerivedItemTypes to be given to consumerssource§fn new<IntoString>(
uni_name: IntoString
) -> Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>where
IntoString: Into<String>,
fn new<IntoString>( uni_name: IntoString ) -> Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>where IntoString: Into<String>,
fn send( &self, item: <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType ) -> RetryResult<(), <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, (), ()>
fn send_with<F>(&self, setter: F) -> RetryResult<(), F, (), ()>where F: FnOnce(&mut <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType),
source§fn consumer_stream(
self
) -> (Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>, Vec<MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>>)
fn consumer_stream( self ) -> (Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>, Vec<MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>>)
Sets this Uni to return
Streams instead of executing themsource§fn pending_items_count(&self) -> u32
fn pending_items_count(&self) -> u32
Tells how many events (collected by Self::send() or Self::send_with()) are waiting to be
consumed by the active
Streamssource§fn buffer_size(&self) -> u32
fn buffer_size(&self) -> u32
Tells the limit number of events that might be, at any given time, awaiting consumption from the active
Streams
– when exceeded, Self::send() & Self::send_with() will fail until consumption progressessource§fn flush<'life0, 'async_trait>(
&'life0 self,
duration: Duration
) -> Pin<Box<dyn Future<Output = u32> + Send + 'async_trait>>where
'life0: 'async_trait,
Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>: 'async_trait,
fn flush<'life0, 'async_trait>( &'life0 self, duration: Duration ) -> Pin<Box<dyn Future<Output = u32> + Send + 'async_trait>>where 'life0: 'async_trait, Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>: 'async_trait,
Waits (up to
Returns the pending items – which will be non-zero if
duration) until Self::pending_items_count() is zero – possibly waking some tasks awaiting on the active Streams.Returns the pending items – which will be non-zero if
timeout expired.source§fn close<'life0, 'async_trait>(
&'life0 self,
timeout: Duration
) -> Pin<Box<dyn Future<Output = bool> + Send + 'async_trait>>where
'life0: 'async_trait,
Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>: 'async_trait,
fn close<'life0, 'async_trait>( &'life0 self, timeout: Duration ) -> Pin<Box<dyn Future<Output = bool> + Send + 'async_trait>>where 'life0: 'async_trait, Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>: 'async_trait,
Closes this Uni, in isolation – flushing pending events, closing the producers,
waiting for all events to be fully processed and calling the “on close” callback.
Returns
If this Uni share resources with another one (which will get dumped by the “on close” callback), most probably you want to close them atomically – see unis_close_async!()
Returns
false if the timeout kicked-in before it could be attested that the closing was complete.If this Uni share resources with another one (which will get dumped by the “on close” callback), most probably you want to close them atomically – see unis_close_async!()
source§fn spawn_executors<OutItemType, OutStreamType, OutType, ErrVoidAsyncType, CloseVoidAsyncType>(
self,
concurrency_limit: u32,
futures_timeout: Duration,
pipeline_builder: impl Fn(MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>) -> OutStreamType,
on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>where
OutItemType: Send + Debug,
OutStreamType: Stream<Item = OutType> + Send + 'static,
OutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send,
ErrVoidAsyncType: Future<Output = ()> + Send + 'static,
CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
fn spawn_executors<OutItemType, OutStreamType, OutType, ErrVoidAsyncType, CloseVoidAsyncType>( self, concurrency_limit: u32, futures_timeout: Duration, pipeline_builder: impl Fn(MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>) -> OutStreamType, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>where OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send, ErrVoidAsyncType: Future<Output = ()> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
Spawns an optimized executor for the
Stream returned by pipeline_builder(), provided it produces elements which are Future & fallible
(Actually, as many consumers as MAX_STREAMS will be spawned).on_close_callback(stats) is called when this Uni (and all Streams) are closed.on_err_callback(error) is called whenever the Stream returns an Err element.source§fn spawn_fallibles_executors<OutItemType, OutStreamType, CloseVoidAsyncType>(
self,
concurrency_limit: u32,
pipeline_builder: impl Fn(MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>) -> OutStreamType,
on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) + Send + Sync + 'static,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>where
OutItemType: Send + Debug,
OutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send + 'static,
CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
fn spawn_fallibles_executors<OutItemType, OutStreamType, CloseVoidAsyncType>( self, concurrency_limit: u32, pipeline_builder: impl Fn(MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>) -> OutStreamType, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) + Send + Sync + 'static, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>where OutItemType: Send + Debug, OutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
Spawns an optimized executor for the
Stream returned by pipeline_builder(), provided it produces elements which are fallible & non-future
(Actually, as many consumers as MAX_STREAMS will be spawned).on_close_callback(stats) is called when this Uni (and all Streams) are closed.on_err_callback(error) is called whenever the Stream returns an Err element.source§fn spawn_futures_executors<OutItemType, OutStreamType, OutType, CloseVoidAsyncType>(
self,
concurrency_limit: u32,
futures_timeout: Duration,
pipeline_builder: impl Fn(MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>) -> OutStreamType,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>where
OutItemType: Send + Debug,
OutStreamType: Stream<Item = OutType> + Send + 'static,
OutType: Future<Output = OutItemType> + Send,
CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
fn spawn_futures_executors<OutItemType, OutStreamType, OutType, CloseVoidAsyncType>( self, concurrency_limit: u32, futures_timeout: Duration, pipeline_builder: impl Fn(MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>) -> OutStreamType, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>where OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = OutItemType> + Send, CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
Spawns an optimized executor for the
Stream returned by pipeline_builder(), provided it produces elements which are Future & non-fallible
(Actually, as many consumers as MAX_STREAMS will be spawned).on_close_callback(stats) is called when this Uni (and all Streams) are closed.source§fn spawn_non_futures_non_fallibles_executors<OutItemType, OutStreamType, CloseVoidAsyncType>(
self,
concurrency_limit: u32,
pipeline_builder: impl Fn(MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>) -> OutStreamType,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static
) -> Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>where
OutItemType: Send + Debug,
OutStreamType: Stream<Item = OutItemType> + Send + 'static,
CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
fn spawn_non_futures_non_fallibles_executors<OutItemType, OutStreamType, CloseVoidAsyncType>( self, concurrency_limit: u32, pipeline_builder: impl Fn(MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>) -> OutStreamType, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>where OutItemType: Send + Debug, OutStreamType: Stream<Item = OutItemType> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static,
Spawns an optimized executor for the
Stream returned by pipeline_builder(), provided it produces elements which are non-future & non-fallible
(Actually, as many consumers as MAX_STREAMS will be spawned).on_close_callback(stats) is called when this Uni (and all Streams) are closed.Auto Trait Implementations§
impl<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType> RefUnwindSafe for Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>where DerivedItemType: RefUnwindSafe, ItemType: RefUnwindSafe, UniChannelType: RefUnwindSafe,
impl<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType> Send for Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>
impl<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType> Sync for Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>
impl<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType> Unpin for Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>
impl<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType> UnwindSafe for Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>where DerivedItemType: RefUnwindSafe, ItemType: RefUnwindSafe, UniChannelType: RefUnwindSafe,
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more