pub trait GenericUni {
type ItemType: Send + Sync + Debug + 'static;
type DerivedItemType: Send + Sync + Debug + 'static;
type UniChannelType: FullDuplexUniChannel<ItemType = Self::ItemType, DerivedItemType = Self::DerivedItemType> + Send + Sync + 'static;
type MutinyStreamType;
const INSTRUMENTS: usize;
Show 16 methods
// Required methods
fn new<IntoString: Into<String>>(uni_name: IntoString) -> Self;
fn send(
&self,
item: Self::ItemType,
) -> RetryConsumerResult<(), Self::ItemType, ()>;
fn send_with<F: FnOnce(&mut Self::ItemType)>(
&self,
setter: F,
) -> RetryConsumerResult<(), F, ()>;
fn send_with_async<F: FnOnce(&'static mut Self::ItemType) -> Fut + Send, Fut: Future<Output = &'static mut Self::ItemType> + Send>(
&'static self,
setter: F,
) -> impl Future<Output = RetryConsumerResult<(), F, ()>> + Send;
fn reserve_slot(&self) -> Option<&mut Self::ItemType>;
fn try_send_reserved(&self, reserved_slot: &mut Self::ItemType) -> bool;
fn try_cancel_slot_reserve(
&self,
reserved_slot: &mut Self::ItemType,
) -> bool;
fn consumer_stream(
self,
) -> (Arc<Self>, Vec<MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>>);
fn buffer_size(&self) -> u32;
fn pending_items_count(&self) -> u32;
fn flush(&self, timeout: Duration) -> impl Future<Output = u32> + Send;
fn close(&self, timeout: Duration) -> impl Future<Output = bool> + Send;
fn spawn_executors<OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send, ErrVoidAsyncType: Future<Output = ()> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
self,
concurrency_limit: u32,
futures_timeout: Duration,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static,
) -> Arc<Self>;
fn spawn_fallibles_executors<OutItemType: Send + Debug, OutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
self,
concurrency_limit: u32,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) + Send + Sync + 'static,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static,
) -> Arc<Self>;
fn spawn_futures_executors<OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = OutItemType> + Send, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
self,
concurrency_limit: u32,
futures_timeout: Duration,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static,
) -> Arc<Self>;
fn spawn_non_futures_non_fallibles_executors<OutItemType: Send + Debug, OutStreamType: Stream<Item = OutItemType> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
self,
concurrency_limit: u32,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static,
) -> Arc<Self>;
}Expand description
This trait exists to allow simplifying generic declarations of concrete Uni types.
See also [GenericMulti].
Usage:
struct MyGenericStruct<T: GenericUni> { the_uni: T }
let the_uni = Uni<Lots,And,Lots<Of,Generic,Arguments>>::new();
let my_struct = MyGenericStruct { the_uni };
// see more at `tests/use_cases.rs`Required Associated Constants§
Sourceconst INSTRUMENTS: usize
const INSTRUMENTS: usize
The instruments this Uni will collect/report
Required Associated Types§
Sourcetype ItemType: Send + Sync + Debug + 'static
type ItemType: Send + Sync + Debug + 'static
The payload type this Uni’s producers will receive
Sourcetype DerivedItemType: Send + Sync + Debug + 'static
type DerivedItemType: Send + Sync + Debug + 'static
The payload type this Uni’s Streams will yield
Sourcetype UniChannelType: FullDuplexUniChannel<ItemType = Self::ItemType, DerivedItemType = Self::DerivedItemType> + Send + Sync + 'static
type UniChannelType: FullDuplexUniChannel<ItemType = Self::ItemType, DerivedItemType = Self::DerivedItemType> + Send + Sync + 'static
The channel through which payloads will travel from producers to consumers (see Uni for more info)
Sourcetype MutinyStreamType
type MutinyStreamType
Defined as MutinyStream<'static, ItemType, UniChannelType, DerivedItemType>,
the concrete type for the Stream of DerivedItemTypes to be given to consumers
Required Methods§
Sourcefn new<IntoString: Into<String>>(uni_name: IntoString) -> Self
fn new<IntoString: Into<String>>(uni_name: IntoString) -> Self
Creates a Uni, which implements the consumer pattern, capable of:
- creating
Streams; - applying a user-provided
processorto theStreams and executing them to depletion – the finalStreams may produce a combination of fallible/non-fallible & futures/non-futures events; - producing events that are sent to those
Streams.
uni_name is used for instrumentation purposes, depending on the INSTRUMENT generic
argument passed to the Uni struct.
Sourcefn send(
&self,
item: Self::ItemType,
) -> RetryConsumerResult<(), Self::ItemType, ()>
fn send( &self, item: Self::ItemType, ) -> RetryConsumerResult<(), Self::ItemType, ()>
See [ChannelProducer::send()]
Sourcefn send_with<F: FnOnce(&mut Self::ItemType)>(
&self,
setter: F,
) -> RetryConsumerResult<(), F, ()>
fn send_with<F: FnOnce(&mut Self::ItemType)>( &self, setter: F, ) -> RetryConsumerResult<(), F, ()>
See [ChannelProducer::send_with()]
Sourcefn send_with_async<F: FnOnce(&'static mut Self::ItemType) -> Fut + Send, Fut: Future<Output = &'static mut Self::ItemType> + Send>(
&'static self,
setter: F,
) -> impl Future<Output = RetryConsumerResult<(), F, ()>> + Send
fn send_with_async<F: FnOnce(&'static mut Self::ItemType) -> Fut + Send, Fut: Future<Output = &'static mut Self::ItemType> + Send>( &'static self, setter: F, ) -> impl Future<Output = RetryConsumerResult<(), F, ()>> + Send
See [ChannelProducer::send_with_async()]
Sourcefn reserve_slot(&self) -> Option<&mut Self::ItemType>
fn reserve_slot(&self) -> Option<&mut Self::ItemType>
See [ChannelProducer::reserve_slot()]
Sourcefn try_send_reserved(&self, reserved_slot: &mut Self::ItemType) -> bool
fn try_send_reserved(&self, reserved_slot: &mut Self::ItemType) -> bool
See [ChannelProducer::try_send_reserved()]
Sourcefn try_cancel_slot_reserve(&self, reserved_slot: &mut Self::ItemType) -> bool
fn try_cancel_slot_reserve(&self, reserved_slot: &mut Self::ItemType) -> bool
See [ChannelProducer::try_cancel_slot_reserve()]
Sourcefn consumer_stream(
self,
) -> (Arc<Self>, Vec<MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>>)
fn consumer_stream( self, ) -> (Arc<Self>, Vec<MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>>)
Sets this Uni to return Streams instead of executing them
Sourcefn buffer_size(&self) -> u32
fn buffer_size(&self) -> u32
Tells the limit number of events that might be, at any given time, awaiting consumption from the active Streams
– when exceeded, Self::send() & Self::send_with() will fail until consumption progresses
Sourcefn pending_items_count(&self) -> u32
fn pending_items_count(&self) -> u32
Tells how many events (collected by Self::send() or Self::send_with()) are waiting to be
consumed by the active Streams
Sourcefn flush(&self, timeout: Duration) -> impl Future<Output = u32> + Send
fn flush(&self, timeout: Duration) -> impl Future<Output = u32> + Send
Waits (up to duration) until Self::pending_items_count() is zero – possibly waking some tasks awaiting on the active Streams.
Returns the pending items – which will be non-zero if timeout expired.
Sourcefn close(&self, timeout: Duration) -> impl Future<Output = bool> + Send
fn close(&self, timeout: Duration) -> impl Future<Output = bool> + Send
Closes this Uni, in isolation – flushing pending events, closing the producers,
waiting for all events to be fully processed and calling the “on close” callback.
Returns false if the timeout kicked-in before it could be attested that the closing was complete.
If this Uni share resources with another one (which will get dumped by the “on close”
callback), most probably you want to close them atomically – see unis_close_async!()
Sourcefn spawn_executors<OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send, ErrVoidAsyncType: Future<Output = ()> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
self,
concurrency_limit: u32,
futures_timeout: Duration,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static,
) -> Arc<Self>
fn spawn_executors<OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send, ErrVoidAsyncType: Future<Output = ()> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>( self, concurrency_limit: u32, futures_timeout: Duration, pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static, ) -> Arc<Self>
Spawns an optimized executor for the Stream returned by pipeline_builder(), provided it produces elements which are Future & fallible
(Actually, as many consumers as MAX_STREAMS will be spawned).
on_close_callback(stats) is called when this Uni (and all Streams) are closed.
on_err_callback(error) is called whenever the Stream returns an Err element.
Sourcefn spawn_fallibles_executors<OutItemType: Send + Debug, OutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
self,
concurrency_limit: u32,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) + Send + Sync + 'static,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static,
) -> Arc<Self>
fn spawn_fallibles_executors<OutItemType: Send + Debug, OutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>( self, concurrency_limit: u32, pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) + Send + Sync + 'static, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static, ) -> Arc<Self>
Spawns an optimized executor for the Stream returned by pipeline_builder(), provided it produces elements which are fallible & non-future
(Actually, as many consumers as MAX_STREAMS will be spawned).
on_close_callback(stats) is called when this Uni (and all Streams) are closed.
on_err_callback(error) is called whenever the Stream returns an Err element.
Sourcefn spawn_futures_executors<OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = OutItemType> + Send, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
self,
concurrency_limit: u32,
futures_timeout: Duration,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static,
) -> Arc<Self>
fn spawn_futures_executors<OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = OutItemType> + Send, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>( self, concurrency_limit: u32, futures_timeout: Duration, pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static, ) -> Arc<Self>
Spawns an optimized executor for the Stream returned by pipeline_builder(), provided it produces elements which are Future & non-fallible
(Actually, as many consumers as MAX_STREAMS will be spawned).
on_close_callback(stats) is called when this Uni (and all Streams) are closed.
Sourcefn spawn_non_futures_non_fallibles_executors<OutItemType: Send + Debug, OutStreamType: Stream<Item = OutItemType> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>(
self,
concurrency_limit: u32,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static,
) -> Arc<Self>
fn spawn_non_futures_non_fallibles_executors<OutItemType: Send + Debug, OutStreamType: Stream<Item = OutItemType> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static>( self, concurrency_limit: u32, pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static, ) -> Arc<Self>
Spawns an optimized executor for the Stream returned by pipeline_builder(), provided it produces elements which are non-future & non-fallible
(Actually, as many consumers as MAX_STREAMS will be spawned).
on_close_callback(stats) is called when this Uni (and all Streams) are closed.
Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.