pub struct Uni<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType = ItemType>where
ItemType: Send + Sync + Debug + 'static,
UniChannelType: FullDuplexUniChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Send + Sync + 'static,
DerivedItemType: Send + Sync + Debug + 'static,{
pub channel: Arc<UniChannelType>,
pub stream_executors: Vec<Arc<StreamExecutor<INSTRUMENTS>>>,
pub finished_executors_count: AtomicU32,
/* private fields */
}
Expand description
Contains the producer-side Uni handle used to interact with the uni
event
– for closing the stream, requiring stats, …
Fields§
§channel: Arc<UniChannelType>
§stream_executors: Vec<Arc<StreamExecutor<INSTRUMENTS>>>
§finished_executors_count: AtomicU32
Trait Implementations§
Source§impl<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType> GenericUni for Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>
impl<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType> GenericUni for Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>
Source§const INSTRUMENTS: usize = INSTRUMENTS
const INSTRUMENTS: usize = INSTRUMENTS
The instruments this Uni will collect/report
Source§type UniChannelType = UniChannelType
type UniChannelType = UniChannelType
The channel through which payloads will travel from producers to consumers (see Uni for more info)
Source§type DerivedItemType = DerivedItemType
type DerivedItemType = DerivedItemType
The payload type this Uni’s
Stream
s will yieldSource§type MutinyStreamType = MutinyStream<'static, ItemType, UniChannelType, DerivedItemType>
type MutinyStreamType = MutinyStream<'static, ItemType, UniChannelType, DerivedItemType>
Defined as
the concrete type for the
MutinyStream<'static, ItemType, UniChannelType, DerivedItemType>
,the concrete type for the
Stream
of DerivedItemType
s to be given to consumersSource§fn new<IntoString>(
uni_name: IntoString,
) -> Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>
fn new<IntoString>( uni_name: IntoString, ) -> Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>
Source§fn send(
&self,
item: <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType,
) -> RetryResult<(), <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, (), ()>
fn send( &self, item: <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, ) -> RetryResult<(), <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, (), ()>
See [ChannelProducer::send()]
Source§fn send_with<F>(&self, setter: F) -> RetryResult<(), F, (), ()>where
F: FnOnce(&mut <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType),
fn send_with<F>(&self, setter: F) -> RetryResult<(), F, (), ()>where
F: FnOnce(&mut <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType),
See [ChannelProducer::send_with()]
Source§fn send_with_async<F, Fut>(
&'static self,
setter: F,
) -> impl Future<Output = RetryResult<(), F, (), ()>> + Sendwhere
F: FnOnce(&'static mut <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType) -> Fut + Send,
Fut: Future<Output = &'static mut <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType> + Send,
fn send_with_async<F, Fut>(
&'static self,
setter: F,
) -> impl Future<Output = RetryResult<(), F, (), ()>> + Sendwhere
F: FnOnce(&'static mut <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType) -> Fut + Send,
Fut: Future<Output = &'static mut <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType> + Send,
See [ChannelProducer::send_with_async()]
Source§fn reserve_slot(
&self,
) -> Option<&mut <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType>
fn reserve_slot( &self, ) -> Option<&mut <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType>
See [ChannelProducer::reserve_slot()]
Source§fn try_send_reserved(
&self,
reserved_slot: &mut <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType,
) -> bool
fn try_send_reserved( &self, reserved_slot: &mut <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, ) -> bool
See [ChannelProducer::try_send_reserved()]
Source§fn try_cancel_slot_reserve(
&self,
reserved_slot: &mut <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType,
) -> bool
fn try_cancel_slot_reserve( &self, reserved_slot: &mut <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, ) -> bool
See [ChannelProducer::try_cancel_slot_reserve()]
Source§fn consumer_stream(
self,
) -> (Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>, Vec<MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>>)
fn consumer_stream( self, ) -> (Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>, Vec<MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>>)
Sets this Uni to return
Stream
s instead of executing themSource§fn pending_items_count(&self) -> u32
fn pending_items_count(&self) -> u32
Tells how many events (collected by Self::send() or Self::send_with()) are waiting to be
consumed by the active
Stream
sSource§fn buffer_size(&self) -> u32
fn buffer_size(&self) -> u32
Tells the limit number of events that might be, at any given time, awaiting consumption from the active
Stream
s
– when exceeded, Self::send() & Self::send_with() will fail until consumption progressesSource§async fn flush(&self, duration: Duration) -> u32
async fn flush(&self, duration: Duration) -> u32
Waits (up to
Returns the pending items – which will be non-zero if
duration
) until Self::pending_items_count() is zero – possibly waking some tasks awaiting on the active Stream
s.Returns the pending items – which will be non-zero if
timeout
expired.Source§async fn close(&self, timeout: Duration) -> bool
async fn close(&self, timeout: Duration) -> bool
Closes this Uni, in isolation – flushing pending events, closing the producers,
waiting for all events to be fully processed and calling the “on close” callback.
Returns
If this Uni share resources with another one (which will get dumped by the “on close” callback), most probably you want to close them atomically – see unis_close_async!()
Returns
false
if the timeout kicked-in before it could be attested that the closing was complete.If this Uni share resources with another one (which will get dumped by the “on close” callback), most probably you want to close them atomically – see unis_close_async!()
Source§fn spawn_executors<OutItemType, OutStreamType, OutType, ErrVoidAsyncType, CloseVoidAsyncType>(
self,
concurrency_limit: u32,
futures_timeout: Duration,
pipeline_builder: impl Fn(MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>) -> OutStreamType,
on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static,
) -> Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>
fn spawn_executors<OutItemType, OutStreamType, OutType, ErrVoidAsyncType, CloseVoidAsyncType>( self, concurrency_limit: u32, futures_timeout: Duration, pipeline_builder: impl Fn(MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>) -> OutStreamType, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static, ) -> Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>
Spawns an optimized executor for the
Stream
returned by pipeline_builder()
, provided it produces elements which are Future
& fallible
(Actually, as many consumers as MAX_STREAMS
will be spawned).on_close_callback(stats)
is called when this Uni (and all Stream
s) are closed.on_err_callback(error)
is called whenever the Stream
returns an Err
element.Source§fn spawn_fallibles_executors<OutItemType, OutStreamType, CloseVoidAsyncType>(
self,
concurrency_limit: u32,
pipeline_builder: impl Fn(MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>) -> OutStreamType,
on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) + Send + Sync + 'static,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static,
) -> Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>
fn spawn_fallibles_executors<OutItemType, OutStreamType, CloseVoidAsyncType>( self, concurrency_limit: u32, pipeline_builder: impl Fn(MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>) -> OutStreamType, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) + Send + Sync + 'static, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static, ) -> Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>
Spawns an optimized executor for the
Stream
returned by pipeline_builder()
, provided it produces elements which are fallible & non-future
(Actually, as many consumers as MAX_STREAMS
will be spawned).on_close_callback(stats)
is called when this Uni (and all Stream
s) are closed.on_err_callback(error)
is called whenever the Stream
returns an Err
element.Source§fn spawn_futures_executors<OutItemType, OutStreamType, OutType, CloseVoidAsyncType>(
self,
concurrency_limit: u32,
futures_timeout: Duration,
pipeline_builder: impl Fn(MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>) -> OutStreamType,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static,
) -> Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>
fn spawn_futures_executors<OutItemType, OutStreamType, OutType, CloseVoidAsyncType>( self, concurrency_limit: u32, futures_timeout: Duration, pipeline_builder: impl Fn(MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>) -> OutStreamType, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static, ) -> Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>
Spawns an optimized executor for the
Stream
returned by pipeline_builder()
, provided it produces elements which are Future
& non-fallible
(Actually, as many consumers as MAX_STREAMS
will be spawned).on_close_callback(stats)
is called when this Uni (and all Stream
s) are closed.Source§fn spawn_non_futures_non_fallibles_executors<OutItemType, OutStreamType, CloseVoidAsyncType>(
self,
concurrency_limit: u32,
pipeline_builder: impl Fn(MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>) -> OutStreamType,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static,
) -> Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>
fn spawn_non_futures_non_fallibles_executors<OutItemType, OutStreamType, CloseVoidAsyncType>( self, concurrency_limit: u32, pipeline_builder: impl Fn(MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>) -> OutStreamType, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static, ) -> Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>
Spawns an optimized executor for the
Stream
returned by pipeline_builder()
, provided it produces elements which are non-future & non-fallible
(Actually, as many consumers as MAX_STREAMS
will be spawned).on_close_callback(stats)
is called when this Uni (and all Stream
s) are closed.Auto Trait Implementations§
impl<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType = ItemType> !Freeze for Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>
impl<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType> RefUnwindSafe for Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>
impl<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType> Send for Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>
impl<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType> Sync for Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>
impl<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType> Unpin for Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>
impl<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType> UnwindSafe for Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more