Struct Uni

Source
pub struct Uni<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType = ItemType>
where ItemType: Send + Sync + Debug + 'static, UniChannelType: FullDuplexUniChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Send + Sync + 'static, DerivedItemType: Send + Sync + Debug + 'static,
{ pub channel: Arc<UniChannelType>, pub stream_executors: Vec<Arc<StreamExecutor<INSTRUMENTS>>>, pub finished_executors_count: AtomicU32, /* private fields */ }
Expand description

Contains the producer-side Uni handle used to interact with the uni event – for closing the stream, requiring stats, …

Fields§

§channel: Arc<UniChannelType>§stream_executors: Vec<Arc<StreamExecutor<INSTRUMENTS>>>§finished_executors_count: AtomicU32

Trait Implementations§

Source§

impl<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType> GenericUni for Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>
where ItemType: Send + Sync + Debug + 'static, UniChannelType: FullDuplexUniChannel<ItemType = ItemType, DerivedItemType = DerivedItemType> + Send + Sync + 'static, DerivedItemType: Send + Sync + Debug + 'static,

Source§

const INSTRUMENTS: usize = INSTRUMENTS

The instruments this Uni will collect/report
Source§

type ItemType = ItemType

The payload type this Uni’s producers will receive
Source§

type UniChannelType = UniChannelType

The channel through which payloads will travel from producers to consumers (see Uni for more info)
Source§

type DerivedItemType = DerivedItemType

The payload type this Uni’s Streams will yield
Source§

type MutinyStreamType = MutinyStream<'static, ItemType, UniChannelType, DerivedItemType>

Defined as MutinyStream<'static, ItemType, UniChannelType, DerivedItemType>,
the concrete type for the Stream of DerivedItemTypes to be given to consumers
Source§

fn new<IntoString>( uni_name: IntoString, ) -> Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>
where IntoString: Into<String>,

Creates a Uni, which implements the consumer pattern, capable of: Read more
Source§

fn send( &self, item: <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, ) -> RetryResult<(), <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, (), ()>

See [ChannelProducer::send()]
Source§

fn send_with<F>(&self, setter: F) -> RetryResult<(), F, (), ()>
where F: FnOnce(&mut <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType),

See [ChannelProducer::send_with()]
Source§

fn send_with_async<F, Fut>( &'static self, setter: F, ) -> impl Future<Output = RetryResult<(), F, (), ()>> + Send
where F: FnOnce(&'static mut <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType) -> Fut + Send, Fut: Future<Output = &'static mut <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType> + Send,

See [ChannelProducer::send_with_async()]
Source§

fn reserve_slot( &self, ) -> Option<&mut <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType>

See [ChannelProducer::reserve_slot()]
Source§

fn try_send_reserved( &self, reserved_slot: &mut <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, ) -> bool

See [ChannelProducer::try_send_reserved()]
Source§

fn try_cancel_slot_reserve( &self, reserved_slot: &mut <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, ) -> bool

See [ChannelProducer::try_cancel_slot_reserve()]
Source§

fn consumer_stream( self, ) -> (Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>, Vec<MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>>)

Sets this Uni to return Streams instead of executing them
Source§

fn pending_items_count(&self) -> u32

Tells how many events (collected by Self::send() or Self::send_with()) are waiting to be consumed by the active Streams
Source§

fn buffer_size(&self) -> u32

Tells the limit number of events that might be, at any given time, awaiting consumption from the active Streams – when exceeded, Self::send() & Self::send_with() will fail until consumption progresses
Source§

async fn flush(&self, duration: Duration) -> u32

Waits (up to duration) until Self::pending_items_count() is zero – possibly waking some tasks awaiting on the active Streams.
Returns the pending items – which will be non-zero if timeout expired.
Source§

async fn close(&self, timeout: Duration) -> bool

Closes this Uni, in isolation – flushing pending events, closing the producers, waiting for all events to be fully processed and calling the “on close” callback.
Returns false if the timeout kicked-in before it could be attested that the closing was complete.
If this Uni share resources with another one (which will get dumped by the “on close” callback), most probably you want to close them atomically – see unis_close_async!()
Source§

fn spawn_executors<OutItemType, OutStreamType, OutType, ErrVoidAsyncType, CloseVoidAsyncType>( self, concurrency_limit: u32, futures_timeout: Duration, pipeline_builder: impl Fn(MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>) -> OutStreamType, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static, ) -> Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>
where OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send, ErrVoidAsyncType: Future<Output = ()> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static,

Spawns an optimized executor for the Stream returned by pipeline_builder(), provided it produces elements which are Future & fallible (Actually, as many consumers as MAX_STREAMS will be spawned).
on_close_callback(stats) is called when this Uni (and all Streams) are closed.
on_err_callback(error) is called whenever the Stream returns an Err element.
Source§

fn spawn_fallibles_executors<OutItemType, OutStreamType, CloseVoidAsyncType>( self, concurrency_limit: u32, pipeline_builder: impl Fn(MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>) -> OutStreamType, on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) + Send + Sync + 'static, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static, ) -> Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>
where OutItemType: Send + Debug, OutStreamType: Stream<Item = Result<OutItemType, Box<dyn Error + Send + Sync>>> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static,

Spawns an optimized executor for the Stream returned by pipeline_builder(), provided it produces elements which are fallible & non-future (Actually, as many consumers as MAX_STREAMS will be spawned).
on_close_callback(stats) is called when this Uni (and all Streams) are closed.
on_err_callback(error) is called whenever the Stream returns an Err element.
Source§

fn spawn_futures_executors<OutItemType, OutStreamType, OutType, CloseVoidAsyncType>( self, concurrency_limit: u32, futures_timeout: Duration, pipeline_builder: impl Fn(MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>) -> OutStreamType, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static, ) -> Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>
where OutItemType: Send + Debug, OutStreamType: Stream<Item = OutType> + Send + 'static, OutType: Future<Output = OutItemType> + Send, CloseVoidAsyncType: Future<Output = ()> + Send + 'static,

Spawns an optimized executor for the Stream returned by pipeline_builder(), provided it produces elements which are Future & non-fallible (Actually, as many consumers as MAX_STREAMS will be spawned).
on_close_callback(stats) is called when this Uni (and all Streams) are closed.
Source§

fn spawn_non_futures_non_fallibles_executors<OutItemType, OutStreamType, CloseVoidAsyncType>( self, concurrency_limit: u32, pipeline_builder: impl Fn(MutinyStream<'static, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::ItemType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::UniChannelType, <Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> as GenericUni>::DerivedItemType>) -> OutStreamType, on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static, ) -> Arc<Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>>
where OutItemType: Send + Debug, OutStreamType: Stream<Item = OutItemType> + Send + 'static, CloseVoidAsyncType: Future<Output = ()> + Send + 'static,

Spawns an optimized executor for the Stream returned by pipeline_builder(), provided it produces elements which are non-future & non-fallible (Actually, as many consumers as MAX_STREAMS will be spawned).
on_close_callback(stats) is called when this Uni (and all Streams) are closed.

Auto Trait Implementations§

§

impl<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType = ItemType> !Freeze for Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>

§

impl<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType> RefUnwindSafe for Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>
where UniChannelType: RefUnwindSafe, ItemType: RefUnwindSafe, DerivedItemType: RefUnwindSafe,

§

impl<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType> Send for Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>

§

impl<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType> Sync for Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>

§

impl<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType> Unpin for Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>

§

impl<ItemType, UniChannelType, const INSTRUMENTS: usize, DerivedItemType> UnwindSafe for Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType>
where UniChannelType: RefUnwindSafe, ItemType: RefUnwindSafe, DerivedItemType: RefUnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V