reactive_mutiny/uni/
uni.rs

1//! See [super]
2
3use crate::stream_executor::StreamExecutorStats;
4
5use super::super::{
6    stream_executor::StreamExecutor,
7    mutiny_stream::MutinyStream,
8    types::FullDuplexUniChannel,
9};
10use std::{fmt::Debug, time::Duration, sync::{Arc, atomic::{AtomicU32, Ordering::Relaxed}}};
11use std::future::Future;
12use std::marker::PhantomData;
13use futures::future::BoxFuture;
14use futures::Stream;
15use keen_retry::RetryConsumerResult;
16use tokio::sync::Mutex;
17
18
19/// Contains the producer-side [Uni] handle used to interact with the `uni` event
20/// -- for closing the stream, requiring stats, ...
21pub struct Uni<ItemType:          Send + Sync + Debug + 'static,
22               UniChannelType:    FullDuplexUniChannel<ItemType=ItemType, DerivedItemType=DerivedItemType> + Send + Sync + 'static,
23               const INSTRUMENTS: usize,
24               DerivedItemType:   Send + Sync + Debug + 'static = ItemType> {
25    pub channel:                  Arc<UniChannelType>,
26    pub stream_executors:         Vec<Arc<StreamExecutor<INSTRUMENTS>>>,
27    pub finished_executors_count: AtomicU32,
28        _phantom:                 PhantomData<(&'static ItemType, &'static DerivedItemType)>,
29}
30
31impl<ItemType:          Send + Sync + Debug + 'static,
32     UniChannelType:    FullDuplexUniChannel<ItemType=ItemType, DerivedItemType=DerivedItemType> + Send + Sync + 'static,
33     const INSTRUMENTS: usize,
34     DerivedItemType:   Send + Sync + Debug + 'static>
35GenericUni for
36Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> {
37    const INSTRUMENTS: usize = INSTRUMENTS;
38    type ItemType            = ItemType;
39    type UniChannelType      = UniChannelType;
40    type DerivedItemType     = DerivedItemType;
41    type MutinyStreamType    = MutinyStream<'static, ItemType, UniChannelType, DerivedItemType>;
42
43
44    fn new<IntoString:Into<String> >(uni_name: IntoString) -> Self {
45        Uni {
46            channel:                  UniChannelType::new(uni_name),
47            stream_executors:         vec![],
48            finished_executors_count: AtomicU32::new(0),
49            _phantom:                 PhantomData,
50        }
51    }
52
53    #[inline(always)]
54    fn send(&self, item:Self::ItemType) -> keen_retry::RetryConsumerResult<(),Self::ItemType,()>  {
55        self.channel.send(item)
56    }
57
58    #[inline(always)]
59    fn send_with<F:FnOnce(&mut Self::ItemType)>(&self, setter:F) -> keen_retry::RetryConsumerResult<(),F,()>  {
60        self.channel.send_with(setter)
61    }
62
63    #[inline(always)]
64    fn send_with_async<F:   FnOnce(&'static mut Self::ItemType) -> Fut + Send,
65                       Fut: Future<Output=&'static mut Self::ItemType> + Send>
66                      (&'static self,
67                       setter: F)
68                      -> impl Future<Output=RetryConsumerResult<(), F, ()>> + Send {
69        self.channel.send_with_async(setter)
70    }
71
72    #[inline(always)]
73    fn reserve_slot(&self) -> Option<&mut Self::ItemType> {
74        self.channel.reserve_slot()
75    }
76
77    #[inline(always)]
78    fn try_send_reserved(&self, reserved_slot: &mut Self::ItemType) -> bool {
79        self.channel.try_send_reserved(reserved_slot)
80    }
81
82    #[inline(always)]
83    fn try_cancel_slot_reserve(&self, reserved_slot: &mut Self::ItemType) -> bool {
84        self.channel.try_cancel_slot_reserve(reserved_slot)
85    }
86    
87    fn consumer_stream(self) -> (Arc<Self> ,Vec<MutinyStream<'static,Self::ItemType,Self::UniChannelType,Self::DerivedItemType> >) {
88        let streams = self.consumer_stream_internal();
89        let arc_self = Arc::new(self);
90        (arc_self, streams)
91    }
92
93    #[inline(always)]
94    fn pending_items_count(&self) -> u32 {
95        self.channel.pending_items_count()
96    }
97
98    #[inline(always)]
99    fn buffer_size(&self) -> u32 {
100        self.channel.buffer_size()
101    }
102
103    async fn flush(&self, duration: Duration) -> u32 {
104        self.channel.flush(duration).await
105    }
106
107    async fn close(&self, timeout: Duration) -> bool {
108        self.channel.gracefully_end_all_streams(timeout).await == 0
109    }
110
111    fn spawn_executors<OutItemType:        Send + Debug,
112                       OutStreamType:      Stream<Item=OutType> + Send + 'static,
113                       OutType:            Future<Output=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send,
114                       ErrVoidAsyncType:   Future<Output=()> + Send + 'static,
115                       CloseVoidAsyncType: Future<Output=()> + Send + 'static>
116
117                      (mut self,
118                       concurrency_limit:         u32,
119                       futures_timeout:           Duration,
120                       pipeline_builder:          impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
121                       on_err_callback:           impl Fn(Box<dyn std::error::Error + Send + Sync>)                                           -> ErrVoidAsyncType   + Send + Sync + 'static,
122                       on_close_callback:         impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>)                                     -> CloseVoidAsyncType + Send + Sync + 'static)
123
124                      -> Arc<Self> {
125
126        let on_close_callback = Arc::new(latch_callback_1p(UniChannelType::MAX_STREAMS as u32, on_close_callback));
127        let on_err_callback = Arc::new(on_err_callback);
128        let in_streams = self.consumer_stream_internal();
129        for i in 0..=in_streams.len() {
130            let pipeline_name = format!("Consumer #{i} for Uni '{}'", self.channel.name());
131            let executor = StreamExecutor::<INSTRUMENTS>::with_futures_timeout(pipeline_name, futures_timeout);
132            self.stream_executors.push(executor);
133        }
134        let arc_self = Arc::new(self);
135        let arc_self_ref = Arc::clone(&arc_self);
136        arc_self.stream_executors.iter().zip(in_streams)
137            .for_each(|(executor, in_stream)| {
138                let arc_self = Arc::clone(&arc_self);
139                let on_close_callback = Arc::clone(&on_close_callback);
140                let on_err_callback = Arc::clone(&on_err_callback);
141                let out_stream = pipeline_builder(in_stream);
142                Arc::clone(executor)
143                    .spawn_executor::<_, _, _, _>(
144                        concurrency_limit,
145                        move |err| on_err_callback(err),
146                        move |executor| {
147                            async move {
148                                arc_self.finished_executors_count.fetch_add(1, Relaxed);
149                                on_close_callback(executor).await;
150                            }
151                        },
152                        out_stream
153                    );
154            });
155        arc_self_ref
156    }
157
158    fn spawn_fallibles_executors<OutItemType:        Send + Debug,
159                                 OutStreamType:      Stream<Item=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send + 'static,
160                                 CloseVoidAsyncType: Future<Output=()> + Send + 'static>
161
162                                (mut self,
163                                 concurrency_limit:         u32,
164                                 pipeline_builder:          impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
165                                 on_err_callback:           impl Fn(Box<dyn std::error::Error + Send + Sync>)                                                                 + Send + Sync + 'static,
166                                 on_close_callback:         impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>)                                     -> CloseVoidAsyncType + Send + Sync + 'static)
167
168                                -> Arc<Self> {
169
170        let on_close_callback = Arc::new(latch_callback_1p(UniChannelType::MAX_STREAMS as u32, on_close_callback));
171        let on_err_callback = Arc::new(on_err_callback);
172        let in_streams = self.consumer_stream_internal();
173        for i in 0..=in_streams.len() {
174            let pipeline_name = format!("Consumer #{i} for Uni '{}'", self.channel.name());
175            let executor = StreamExecutor::<INSTRUMENTS>::new(pipeline_name);
176            self.stream_executors.push(executor);
177        }
178        let arc_self = Arc::new(self);
179        let arc_self_ref = Arc::clone(&arc_self);
180        arc_self.stream_executors.iter().zip(in_streams)
181            .for_each(|(executor, in_stream)| {
182                let arc_self = Arc::clone(&arc_self);
183                let on_close_callback = Arc::clone(&on_close_callback);
184                let on_err_callback = Arc::clone(&on_err_callback);
185                let out_stream = pipeline_builder(in_stream);
186                Arc::clone(executor)
187                    .spawn_fallibles_executor::<_, _>(
188                        concurrency_limit,
189                        move |err| on_err_callback(err),
190                        move |executor| {
191                            let arc_self = Arc::clone(&arc_self);
192                            async move {
193                                arc_self.finished_executors_count.fetch_add(1, Relaxed);
194                                on_close_callback(executor).await;
195                            }
196                        },
197                        out_stream
198                    );
199            });
200        arc_self_ref
201    }
202
203    fn spawn_futures_executors<OutItemType:        Send + Debug,
204                               OutStreamType:      Stream<Item=OutType>       + Send + 'static,
205                               OutType:            Future<Output=OutItemType> + Send,
206                               CloseVoidAsyncType: Future<Output=()>          + Send + 'static>
207
208                              (mut self,
209                               concurrency_limit:         u32,
210                               futures_timeout:           Duration,
211                               pipeline_builder:          impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
212                               on_close_callback:         impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>)                                     -> CloseVoidAsyncType + Send + Sync + 'static)
213
214                              -> Arc<Self> {
215
216        let on_close_callback = Arc::new(latch_callback_1p(UniChannelType::MAX_STREAMS as u32, on_close_callback));
217        let in_streams= self.consumer_stream_internal();
218        for i in 0..=in_streams.len() {
219            let pipeline_name = format!("Consumer #{i} for Uni '{}'", self.channel.name());
220            let executor = StreamExecutor::<INSTRUMENTS>::with_futures_timeout(pipeline_name, futures_timeout);
221            self.stream_executors.push(executor);
222        }
223        let arc_self = Arc::new(self);
224        let arc_self_ref = Arc::clone(&arc_self);
225        arc_self.stream_executors.iter().zip(in_streams)
226            .for_each(|(executor, in_stream)| {
227                let arc_self = Arc::clone(&arc_self);
228                let on_close_callback = Arc::clone(&on_close_callback);
229                let out_stream = pipeline_builder(in_stream);
230                Arc::clone(executor)
231                    .spawn_futures_executor(
232                        concurrency_limit,
233                        move |executor| {
234                            let arc_self = Arc::clone(&arc_self);
235                            async move {
236                                arc_self.finished_executors_count.fetch_add(1, Relaxed);
237                                on_close_callback(executor).await;
238                            }
239                        },
240                        out_stream
241                    );
242                });
243        arc_self_ref
244    }
245
246    fn spawn_non_futures_non_fallibles_executors<OutItemType:        Send + Debug,
247                                                 OutStreamType:      Stream<Item=OutItemType> + Send + 'static,
248                                                 CloseVoidAsyncType: Future<Output=()>        + Send + 'static>
249
250                                                (mut self,
251                                                 concurrency_limit:        u32,
252                                                 pipeline_builder:         impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
253                                                 on_close_callback:        impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>)                                     -> CloseVoidAsyncType + Send + Sync + 'static)
254
255                                                -> Arc<Self> {
256
257        let on_close_callback = Arc::new(latch_callback_1p(UniChannelType::MAX_STREAMS as u32, on_close_callback));
258        let in_streams = self.consumer_stream_internal();
259        for i in 0..=in_streams.len() {
260            let pipeline_name = format!("Consumer #{i} for Uni '{}'", self.channel.name());
261            let executor = StreamExecutor::<INSTRUMENTS>::new(pipeline_name);
262            self.stream_executors.push(executor);
263        }
264        let arc_self = Arc::new(self);
265        let arc_self_ref = Arc::clone(&arc_self);
266        arc_self.stream_executors.iter().zip(in_streams)
267            .for_each(|(executor, in_stream)| {
268                let arc_self = Arc::clone(&arc_self);
269                let on_close_callback = Arc::clone(&on_close_callback);
270                let out_stream = pipeline_builder(in_stream);
271                Arc::clone(executor)
272                    .spawn_non_futures_non_fallibles_executor(
273                        concurrency_limit,
274                        move |executor| {
275                            let arc_self = Arc::clone(&arc_self);
276                            async move {
277                                arc_self.finished_executors_count.fetch_add(1, Relaxed);
278                                on_close_callback(executor).await;
279                            }
280                        },
281                        out_stream
282                    );
283            });
284        arc_self_ref
285    }
286}
287
288impl<ItemType:          Send + Sync + Debug + 'static,
289     UniChannelType:    FullDuplexUniChannel<ItemType=ItemType, DerivedItemType=DerivedItemType> + Send + Sync + 'static,
290     const INSTRUMENTS: usize,
291     DerivedItemType:   Send + Sync + Debug + 'static>
292Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> {
293
294    /// similar to [Self::consumer_stream()], but without consuming `self`
295    #[must_use]
296    fn consumer_stream_internal(&self) -> Vec<MutinyStream<'static, ItemType, UniChannelType, DerivedItemType>> {
297        (0..UniChannelType::MAX_STREAMS)
298            .map(|_| {
299                let (stream, _stream_id) = self.channel.create_stream();
300                stream
301            })
302            .collect()
303    }
304}
305
306
307/// This trait exists to allow simplifying generic declarations of concrete [Uni] types.
308/// See also [GenericMulti].\
309/// Usage:
310/// ```nocompile
311///     struct MyGenericStruct<T: GenericUni> { the_uni: T }
312///     let the_uni = Uni<Lots,And,Lots<Of,Generic,Arguments>>::new();
313///     let my_struct = MyGenericStruct { the_uni };
314///     // see more at `tests/use_cases.rs`
315pub trait GenericUni {
316    /// The instruments this Uni will collect/report
317    const INSTRUMENTS: usize;
318    /// The payload type this Uni's producers will receive
319    type ItemType: Send + Sync + Debug + 'static;
320    /// The payload type this [Uni]'s `Stream`s will yield
321    type DerivedItemType: Send + Sync + Debug + 'static;
322    /// The channel through which payloads will travel from producers to consumers (see [Uni] for more info)
323    type UniChannelType: FullDuplexUniChannel<ItemType=Self::ItemType, DerivedItemType=Self::DerivedItemType> + Send + Sync + 'static;
324    /// Defined as `MutinyStream<'static, ItemType, UniChannelType, DerivedItemType>`,\
325    /// the concrete type for the `Stream` of `DerivedItemType`s to be given to consumers
326    type MutinyStreamType;
327
328    /// Creates a [Uni], which implements the `consumer pattern`, capable of:
329    ///   - creating `Stream`s;
330    ///   - applying a user-provided `processor` to the `Stream`s and executing them to depletion --
331    ///     the final `Stream`s may produce a combination of fallible/non-fallible &
332    ///     futures/non-futures events;
333    ///   - producing events that are sent to those `Stream`s.
334    /// 
335    /// `uni_name` is used for instrumentation purposes, depending on the `INSTRUMENT` generic
336    /// argument passed to the [Uni] struct.
337    fn new<IntoString: Into<String>>(uni_name: IntoString) -> Self;
338
339    /// See [ChannelProducer::send()]
340    #[must_use = "The return type should be examined in case retrying is needed -- or call map(...).into() to transform it into a `Result<(), ItemType>`"]
341    fn send(&self, item: Self::ItemType) -> keen_retry::RetryConsumerResult<(), Self::ItemType, ()>;
342
343    /// See [ChannelProducer::send_with()]
344    #[must_use = "The return type should be examined in case retrying is needed -- or call map(...).into() to transform it into a `Result<(), F>`"]
345    fn send_with<F: FnOnce(&mut Self::ItemType)>(&self, setter: F) -> keen_retry::RetryConsumerResult<(), F, ()>;
346
347    /// See [ChannelProducer::send_with_async()]
348    fn send_with_async<F:   FnOnce(&'static mut Self::ItemType) -> Fut + Send,
349                       Fut: Future<Output=&'static mut Self::ItemType> + Send>
350                      (&'static self,
351                       setter: F)
352                      -> impl Future<Output=keen_retry::RetryConsumerResult<(), F, ()>> + Send;
353
354    /// See [ChannelProducer::reserve_slot()]
355    fn reserve_slot(&self) -> Option<&mut Self::ItemType>;
356
357    /// See [ChannelProducer::try_send_reserved()]
358    fn try_send_reserved(&self, reserved_slot: &mut Self::ItemType) -> bool;
359
360    /// See [ChannelProducer::try_cancel_slot_reserve()]
361    fn try_cancel_slot_reserve(&self, reserved_slot: &mut Self::ItemType) -> bool;
362
363    /// Sets this [Uni] to return `Stream`s instead of executing them
364    #[must_use = "By calling this method, the Uni gets converted into only providing Streams (rather than executing them) -- so the returned values of (self, Streams) must be used"]
365    fn consumer_stream(self) -> (Arc<Self>, Vec<MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>>);
366
367    /// Tells the limit number of events that might be, at any given time, awaiting consumption from the active `Stream`s
368    /// -- when exceeded, [Self::send()] & [Self::send_with()] will fail until consumption progresses
369    fn buffer_size(&self) -> u32;
370
371    /// Tells how many events (collected by [Self::send()] or [Self::send_with()]) are waiting to be 
372    /// consumed by the active `Stream`s
373    fn pending_items_count(&self) -> u32;
374    
375    /// Waits (up to `duration`) until [Self::pending_items_count()] is zero -- possibly waking some tasks awaiting on the active `Stream`s.\
376    /// Returns the pending items -- which will be non-zero if `timeout` expired.
377    fn flush(&self, timeout: Duration) -> impl Future<Output=u32> + Send;
378
379    /// Closes this Uni, in isolation -- flushing pending events, closing the producers,
380    /// waiting for all events to be fully processed and calling the "on close" callback.\
381    /// Returns `false` if the timeout kicked-in before it could be attested that the closing was complete.\
382    /// If this Uni share resources with another one (which will get dumped by the "on close"
383    /// callback), most probably you want to close them atomically -- see [unis_close_async!()]
384    #[must_use = "Returns true if the Uni could be closed within the given time"]
385    fn close(&self, timeout: Duration) -> impl Future<Output=bool> + Send;
386    
387    /// Spawns an optimized executor for the `Stream` returned by `pipeline_builder()`, provided it produces elements which are `Future` & fallible
388    /// (Actually, as many consumers as `MAX_STREAMS` will be spawned).\
389    /// `on_close_callback(stats)` is called when this [Uni] (and all `Stream`s) are closed.\
390    /// `on_err_callback(error)` is called whenever the `Stream` returns an `Err` element.
391    #[must_use = "`Arc<self>` is returned back, so the return value must be used to send data to this `Uni` and to close it"]
392    fn spawn_executors<OutItemType:        Send + Debug,
393                       OutStreamType:      Stream<Item=OutType> + Send + 'static,
394                       OutType:            Future<Output=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send,
395                       ErrVoidAsyncType:   Future<Output=()> + Send + 'static,
396                       CloseVoidAsyncType: Future<Output=()> + Send + 'static>
397
398                      (self,
399                       concurrency_limit:         u32,
400                       futures_timeout:           Duration,
401                       pipeline_builder:          impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
402                       on_err_callback:           impl Fn(Box<dyn std::error::Error + Send + Sync>)                                           -> ErrVoidAsyncType   + Send + Sync + 'static,
403                       on_close_callback:         impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>)                                     -> CloseVoidAsyncType + Send + Sync + 'static)
404
405                      -> Arc<Self>;
406
407    /// Spawns an optimized executor for the `Stream` returned by `pipeline_builder()`, provided it produces elements which are fallible & non-future
408    /// (Actually, as many consumers as `MAX_STREAMS` will be spawned).\
409    /// `on_close_callback(stats)` is called when this [Uni] (and all `Stream`s) are closed.\
410    /// `on_err_callback(error)` is called whenever the `Stream` returns an `Err` element.
411    #[must_use = "`Arc<self>` is returned back, so the return value must be used to send data to this `Uni` and to close it"]
412    fn spawn_fallibles_executors<OutItemType:        Send + Debug,
413                                 OutStreamType:      Stream<Item=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send + 'static,
414                                 CloseVoidAsyncType: Future<Output=()> + Send + 'static>
415
416                                (self,
417                                 concurrency_limit:         u32,
418                                 pipeline_builder:          impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
419                                 on_err_callback:           impl Fn(Box<dyn std::error::Error + Send + Sync>)                                                                 + Send + Sync + 'static,
420                                 on_close_callback:         impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>)                                     -> CloseVoidAsyncType + Send + Sync + 'static)
421
422                                -> Arc<Self>;
423                                    
424    /// Spawns an optimized executor for the `Stream` returned by `pipeline_builder()`, provided it produces elements which are `Future` & non-fallible
425    /// (Actually, as many consumers as `MAX_STREAMS` will be spawned).\
426    /// `on_close_callback(stats)` is called when this [Uni] (and all `Stream`s) are closed.
427    #[must_use = "`Arc<self>` is returned back, so the return value must be used to send data to this `Uni` and to close it"]
428    fn spawn_futures_executors<OutItemType:        Send + Debug,
429                               OutStreamType:      Stream<Item=OutType>       + Send + 'static,
430                               OutType:            Future<Output=OutItemType> + Send,
431                               CloseVoidAsyncType: Future<Output=()>          + Send + 'static>
432
433                              (self,
434                               concurrency_limit:         u32,
435                               futures_timeout:           Duration,
436                               pipeline_builder:          impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
437                               on_close_callback:         impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>)                                     -> CloseVoidAsyncType + Send + Sync + 'static)
438
439                              -> Arc<Self>;
440                                  
441    /// Spawns an optimized executor for the `Stream` returned by `pipeline_builder()`, provided it produces elements which are non-future & non-fallible
442    /// (Actually, as many consumers as `MAX_STREAMS` will be spawned).\
443    /// `on_close_callback(stats)` is called when this [Uni] (and all `Stream`s) are closed.
444    #[must_use = "`Arc<self>` is returned back, so the return value must be used to send data to this `Uni` and to close it"]
445    fn spawn_non_futures_non_fallibles_executors<OutItemType:        Send + Debug,
446                                                 OutStreamType:      Stream<Item=OutItemType> + Send + 'static,
447                                                 CloseVoidAsyncType: Future<Output=()>        + Send + 'static>
448
449                                                (self,
450                                                 concurrency_limit:        u32,
451                                                 pipeline_builder:         impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
452                                                 on_close_callback:        impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>)                                     -> CloseVoidAsyncType + Send + Sync + 'static)
453
454                                                -> Arc<Self>;
455}
456
457
458/// Macro to close, atomically-ish, all [Uni]s passed as parameters
459#[macro_export]
460macro_rules! unis_close_async {
461    ($timeout: expr,
462     $($uni: tt),+) => {
463        {
464            tokio::join!( $( $uni.channel.flush($timeout), )+ );
465            tokio::join!( $( $uni.channel.gracefully_end_all_streams($timeout), )+);
466        }
467    }
468}
469pub use unis_close_async;
470
471
472/// returns a closure (receiving 1 parameter) that must be called `latch_count` times before calling `callback(1 parameter)`
473fn latch_callback_1p<CallbackParameterType: Send + 'static,
474                     CallbackAsyncType:     Send + Future<Output=()>>
475                    (latch_count:    u32,
476                     async_callback: impl FnOnce(CallbackParameterType) -> CallbackAsyncType + Send + Sync + 'static)
477                    -> impl Fn(CallbackParameterType) -> BoxFuture<'static, ()> {
478    let async_callback = Arc::new(Mutex::new(Some(async_callback)));
479    let latch_counter = Arc::new(AtomicU32::new(latch_count));
480    move |p1| {
481        let async_callback = Arc::clone(&async_callback);
482        let latch_counter = Arc::clone(&latch_counter);
483        Box::pin(async move {
484            if latch_counter.fetch_sub(1, Relaxed) == 1 {
485                let mut async_callback = async_callback.lock().await;
486                (async_callback.take().expect("Uni::latch_callback_1p(): BUG! FnOnce() not honored by the algorithm"))(p1).await;
487            }
488        })
489    }
490}