pub trait GenericUni {
    type ItemType: Send + Sync + Debug + 'static;
    type DerivedItemType: Send + Sync + Debug + 'static;
    type UniChannelType: FullDuplexUniChannel<ItemType = Self::ItemType, DerivedItemType = Self::DerivedItemType> + Send + Sync + 'static;
    type MutinyStreamType;

    const INSTRUMENTS: usize;

    // Required methods
    fn new<IntoString>(uni_name: IntoString) -> Self
       where IntoString: Into<String>;
    fn send(
        &self,
        item: Self::ItemType
    ) -> RetryResult<(), Self::ItemType, (), ()>;
    fn send_with<F>(&self, setter: F) -> RetryResult<(), F, (), ()>
       where F: FnOnce(&mut Self::ItemType);
    fn consumer_stream(
        self
    ) -> (Arc<Self>, Vec<MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>>);
    fn buffer_size(&self) -> u32;
    fn pending_items_count(&self) -> u32;
    fn flush<'life0, 'async_trait>(
        &'life0 self,
        timeout: Duration
    ) -> Pin<Box<dyn Future<Output = u32> + Send + 'async_trait>>
       where 'life0: 'async_trait,
             Self: 'async_trait;
    fn close<'life0, 'async_trait>(
        &'life0 self,
        timeout: Duration
    ) -> Pin<Box<dyn Future<Output = bool> + Send + 'async_trait>>
       where 'life0: 'async_trait,
             Self: 'async_trait;
    fn spawn_executors<OutItemType, OutStreamType, OutType, ErrVoidAsyncType, CloseVoidAsyncType>(
        self,
        concurrency_limit: u32,
        futures_timeout: Duration,
        pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
        on_err_callback: impl Fn(Box<dyn Error + Sync + Send>) -> ErrVoidAsyncType + Send + Sync + 'static,
        on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Sync + Send>) -> CloseVoidAsyncType + Send + Sync + 'static
    ) -> Arc<Self>
       where OutItemType: Send + Debug,
             OutStreamType: Stream<Item = OutType> + Send + 'static,
             OutType: Future<Output = Result<OutItemType, Box<dyn Error + Sync + Send>>> + Send,
             ErrVoidAsyncType: Future<Output = ()> + Send + 'static,
             CloseVoidAsyncType: Future<Output = ()> + Send + 'static;
    fn spawn_fallibles_executors<OutItemType, OutStreamType, CloseVoidAsyncType>(
        self,
        concurrency_limit: u32,
        pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
        on_err_callback: impl Fn(Box<dyn Error + Sync + Send>) + Send + Sync + 'static,
        on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Sync + Send>) -> CloseVoidAsyncType + Send + Sync + 'static
    ) -> Arc<Self>
       where OutItemType: Send + Debug,
             OutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Sync + Send>>> + Send + 'static,
             CloseVoidAsyncType: Future<Output = ()> + Send + 'static;
    fn spawn_futures_executors<OutItemType, OutStreamType, OutType, CloseVoidAsyncType>(
        self,
        concurrency_limit: u32,
        futures_timeout: Duration,
        pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
        on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Sync + Send>) -> CloseVoidAsyncType + Send + Sync + 'static
    ) -> Arc<Self>
       where OutItemType: Send + Debug,
             OutStreamType: Stream<Item = OutType> + Send + 'static,
             OutType: Future<Output = OutItemType> + Send,
             CloseVoidAsyncType: Future<Output = ()> + Send + 'static;
    fn spawn_non_futures_non_fallibles_executors<OutItemType, OutStreamType, CloseVoidAsyncType>(
        self,
        concurrency_limit: u32,
        pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
        on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Sync + Send>) -> CloseVoidAsyncType + Send + Sync + 'static
    ) -> Arc<Self>
       where OutItemType: Send + Debug,
             OutStreamType: Stream<Item = OutItemType> + Send + 'static,
             CloseVoidAsyncType: Future<Output = ()> + Send + 'static;
}
Expand description

This trait exists to allow simplifying generic declarations of concrete Uni types. See also [GenericMulti].
Usage:

    struct MyGenericStruct<T: GenericUni> { the_uni: T }
    let the_uni = Uni<Lots,And,Lots<Of,Generic,Arguments>>::new();
    let my_struct = MyGenericStruct { the_uni };
    // see more at `tests/use_cases.rs`

Required Associated Types§

source

type ItemType: Send + Sync + Debug + 'static

The payload type this Uni’s producers will receive

source

type DerivedItemType: Send + Sync + Debug + 'static

The payload type this Uni’s Streams will yield

source

type UniChannelType: FullDuplexUniChannel<ItemType = Self::ItemType, DerivedItemType = Self::DerivedItemType> + Send + Sync + 'static

The channel through which payloads will travel from producers to consumers (see Uni for more info)

source

type MutinyStreamType

Defined as MutinyStream<'static, ItemType, UniChannelType, DerivedItemType>,
the concrete type for the Stream of DerivedItemTypes to be given to consumers

Required Associated Constants§

source

const INSTRUMENTS: usize

The instruments this Uni will collect/report

Required Methods§

source

fn new<IntoString>(uni_name: IntoString) -> Self
where IntoString: Into<String>,

Creates a Uni, which implements the consumer pattern, capable of:

  • creating Streams;
  • applying a user-provided processor to the Streams and executing them to depletion – the final Streams may produce a combination of fallible/non-fallible & futures/non-futures events;
  • producing events that are sent to those Streams. uni_name is used for instrumentation purposes, depending on the INSTRUMENT generic argument passed to the Uni struct.
source

fn send(&self, item: Self::ItemType) -> RetryResult<(), Self::ItemType, (), ()>

source

fn send_with<F>(&self, setter: F) -> RetryResult<(), F, (), ()>
where F: FnOnce(&mut Self::ItemType),

source

fn consumer_stream( self ) -> (Arc<Self>, Vec<MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>>)

Sets this Uni to return Streams instead of executing them

source

fn buffer_size(&self) -> u32

Tells the limit number of events that might be, at any given time, awaiting consumption from the active Streams – when exceeded, Self::send() & Self::send_with() will fail until consumption progresses

source

fn pending_items_count(&self) -> u32

Tells how many events (collected by Self::send() or Self::send_with()) are waiting to be consumed by the active Streams

source

fn flush<'life0, 'async_trait>( &'life0 self, timeout: Duration ) -> Pin<Box<dyn Future<Output = u32> + Send + 'async_trait>>
where 'life0: 'async_trait, Self: 'async_trait,

Waits (up to duration) until Self::pending_items_count() is zero – possibly waking some tasks awaiting on the active Streams.
Returns the pending items – which will be non-zero if timeout expired.

source

fn close<'life0, 'async_trait>( &'life0 self, timeout: Duration ) -> Pin<Box<dyn Future<Output = bool> + Send + 'async_trait>>
where 'life0: 'async_trait, Self: 'async_trait,

Closes this Uni, in isolation – flushing pending events, closing the producers, waiting for all events to be fully processed and calling the “on close” callback.
Returns false if the timeout kicked-in before it could be attested that the closing was complete.
If this Uni share resources with another one (which will get dumped by the “on close” callback), most probably you want to close them atomically – see unis_close_async!()

source

fn spawn_executors<OutItemType, OutStreamType, OutType, ErrVoidAsyncType, CloseVoidAsyncType>( self, concurrency_limit: u32, futures_timeout: Duration, pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType, on_err_callback: impl Fn(Box<dyn Error + Sync + Send>) -> ErrVoidAsyncType + Send + Sync + 'static, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Sync + Send>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> Arc<Self>
where OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = Result<OutItemType, Box<dyn Error + Sync + Send>>> + Send, ErrVoidAsyncType: Future<Output = ()> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static,

Spawns an optimized executor for the Stream returned by pipeline_builder(), provided it produces elements which are Future & fallible (Actually, as many consumers as MAX_STREAMS will be spawned).
on_close_callback(stats) is called when this Uni (and all Streams) are closed.
on_err_callback(error) is called whenever the Stream returns an Err element.

source

fn spawn_fallibles_executors<OutItemType, OutStreamType, CloseVoidAsyncType>( self, concurrency_limit: u32, pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType, on_err_callback: impl Fn(Box<dyn Error + Sync + Send>) + Send + Sync + 'static, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Sync + Send>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> Arc<Self>
where OutItemType: Send + Debug, OutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Sync + Send>>> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static,

Spawns an optimized executor for the Stream returned by pipeline_builder(), provided it produces elements which are fallible & non-future (Actually, as many consumers as MAX_STREAMS will be spawned).
on_close_callback(stats) is called when this Uni (and all Streams) are closed.
on_err_callback(error) is called whenever the Stream returns an Err element.

source

fn spawn_futures_executors<OutItemType, OutStreamType, OutType, CloseVoidAsyncType>( self, concurrency_limit: u32, futures_timeout: Duration, pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Sync + Send>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> Arc<Self>
where OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = OutItemType> + Send, CloseVoidAsyncType: Future<Output = ()> + Send + 'static,

Spawns an optimized executor for the Stream returned by pipeline_builder(), provided it produces elements which are Future & non-fallible (Actually, as many consumers as MAX_STREAMS will be spawned).
on_close_callback(stats) is called when this Uni (and all Streams) are closed.

source

fn spawn_non_futures_non_fallibles_executors<OutItemType, OutStreamType, CloseVoidAsyncType>( self, concurrency_limit: u32, pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Sync + Send>) -> CloseVoidAsyncType + Send + Sync + 'static ) -> Arc<Self>
where OutItemType: Send + Debug, OutStreamType: Stream<Item = OutItemType> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static,

Spawns an optimized executor for the Stream returned by pipeline_builder(), provided it produces elements which are non-future & non-fallible (Actually, as many consumers as MAX_STREAMS will be spawned).
on_close_callback(stats) is called when this Uni (and all Streams) are closed.

Object Safety§

This trait is not object safe.

Implementors§

source§

impl<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType> GenericUni for Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>
where ItemType: Send + Sync + Debug + 'static, UniChannelType: FullDuplexUniChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Send + Sync + 'static, DerivedItemType: Send + Sync + Debug + 'static,

source§

const INSTRUMENTS: usize = INSTRUMENTS

§

type ItemType = ItemType

§

type UniChannelType = UniChannelType

§

type DerivedItemType = DerivedItemType

§

type MutinyStreamType = MutinyStream<'static, ItemType, UniChannelType, DerivedItemType>