reactive_mutiny/uni/
mod.rs

1//! Allows creating `uni`s, which represent pairs of (`producer`, `event pipeline`) that may be used to
2//! `produce()` asynchronous payloads to be processed by a single `event pipeline` Stream -- and executed
3//! by one or more async tasks.
4//!
5//! Usage:
6//! ```nocompile
7//!    fn on_event(stream: impl Stream<Item=String>) -> impl Stream<Item=String> {
8//!        stream
9//!            .inspect(|message| println!("To Zeta: '{}'", message))
10//!            .inspect(|sneak_peeked_message| println!("EARTH: Sneak peeked a message to Zeta Reticuli: '{}'", sneak_peeked_message))
11//!            .inspect(|message| println!("ZETA: Received a message: '{}'", message))
12//!    }
13//!    let uni = UniBuilder::new()
14//!        .on_stream_close(|_| async {})
15//!        .spawn_non_futures_non_fallible_executor("doc_test() Event", on_event);
16//!    let producer = uni.producer_closure();
17//!    producer("I've just arrived!".to_string()).await;
18//!    producer("Nothing really interesting here... heading back home!".to_string()).await;
19//!    uni.close().await;
20//! ```
21
22mod uni;
23pub use uni::*;
24
25pub mod channels;
26
27
28/// Tests & enforces the requisites & expose good practices & exercises the API of of the [uni](self) module
29#[cfg(any(test,doc))]
30mod tests {
31    use super::*;
32    use crate::{
33        prelude::MutinyStream,
34        instruments::Instruments,
35        types::{ChannelCommon, FullDuplexUniChannel},
36    };
37    use std::{sync::{
38        Arc,
39        atomic::{AtomicBool, AtomicU32, Ordering::Relaxed},
40    }, time::Duration, future::Future, io::Write};
41    use futures::stream::{self, Stream, StreamExt};
42    use minstant::Instant;
43    use log::error;
44
45
46    /// The `UniBuilder` specialization used for the tests to follow
47    type TestUni<InType,
48                 const BUFFER_SIZE: usize,
49                 const MAX_STREAMS: usize,
50                 const INSTRUMENTS: usize = {Instruments::LogsWithMetrics.into()}>
51        = crate::uni::Uni<InType,
52                          channels::movable::full_sync::FullSync<'static, InType, BUFFER_SIZE, MAX_STREAMS>,
53                          INSTRUMENTS,
54                          InType>;
55
56
57
58    #[ctor::ctor]
59    fn suite_setup() {
60        simple_logger::SimpleLogger::new().with_utc_timestamps().init().unwrap_or_else(|_| eprintln!("--> LOGGER WAS ALREADY STARTED"));
61    }
62
63    /// exercises the code present on the documentation
64    #[cfg_attr(not(doc),tokio::test)]
65    async fn doc_tests() {
66        fn on_event<'r>(stream: impl Stream<Item=&'r str>) -> impl Stream<Item=&'r str> {
67            stream
68                .inspect(|message| println!("To Zeta: '{}'", message))
69                .inspect(|sneak_peeked_message| println!("EARTH: Sneak peeked a message to Zeta Reticuli: '{}'", sneak_peeked_message))
70                .inspect(|message| println!("ZETA: Received a message: '{}'", message))
71        }
72        let uni = TestUni::<&str, 1024, 1>::new("doc_test()")
73            .spawn_non_futures_non_fallibles_executors(1, on_event, |_| async {});
74        let producer = |item| uni.send_with(move |slot| *slot = item).expect_ok("couldn't send");
75        producer("I've just arrived!");
76        producer("Nothing really interesting here... heading back home!");
77        assert!(uni.close(Duration::from_secs(10)).await, "Uni wasn't properly closed");
78    }
79
80    /// guarantees that one of the simplest possible testable 'uni' pipelines will get executed all the way through
81    #[cfg_attr(not(doc),tokio::test)]
82    async fn simple_pipeline() {
83        const EXPECTED_SUM: u32 = 17;
84        const PARTS: &[u32] = &[9, 8];
85
86        // consumers may run at any time, so they should have a `static lifetime. Arc help us here.
87        let observed_sum = Arc::new(AtomicU32::new(0));
88
89        // this is the uni to work with our local variable
90        let uni = TestUni::<u32, 1024, 1>::new("simple_pipeline()")
91            .spawn_non_futures_non_fallibles_executors(1,
92                                                       |stream| {
93                                                          let observed_sum = Arc::clone(&observed_sum);
94                                                          stream
95                                                              .map(move |number| observed_sum.fetch_add(number, Relaxed))
96                                                      },
97                                                       |_| async {});
98        let producer = |item| uni.send_with(move |slot| *slot = item).expect_ok("couldn't send");
99
100        // now the consumer: lets suppose we share it among several different tasks -- sharing a reference is one way to do it
101        // (in this case, wrapping it in an Arc is not needed)
102        let shared_producer = &producer;
103        stream::iter(PARTS)
104            .for_each_concurrent(1, |number| async move {
105                shared_producer(*number);
106            }).await;
107
108        assert!(uni.close(Duration::ZERO).await, "Uni wasn't properly closed");
109        assert_eq!(observed_sum.load(Relaxed), EXPECTED_SUM, "not all events passed through our pipeline");
110    }
111
112    /// shows how we may call async functions inside a `Uni` pipeline
113    /// and work with "future" elements
114    #[cfg_attr(not(doc),tokio::test)]
115    async fn async_elements() {
116        const EXPECTED_SUM: u32 = 30;
117        const PARTS: &[u32] = &[9, 8, 7, 6];
118        let observed_sum = Arc::new(AtomicU32::new(0));
119        let properly_closed = Arc::new(AtomicBool::new(false));
120        let properly_closed_ref = Arc::clone(&properly_closed);
121
122        // notice how to transform a regular event into a future event &
123        // how to pass it down the pipeline. Also notice the required (as of Rust 1.63)
124        // moving of Arc local variables so they will be accessible
125        let on_event = |stream: MutinyStream<'static, u32, _, u32>| {
126            let observed_sum = Arc::clone(&observed_sum);
127            stream
128                .map(|number| async move {
129                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
130                    number
131                })
132                .map(move |number| {
133                    let observed_sum = Arc::clone(&observed_sum);
134                    async move {
135                        let number = number.await;
136                        observed_sum.fetch_add(number, Relaxed);
137                        number
138                    }
139                })
140                .map(|number| async move {
141                    let number = number.await;
142                    println!("Just added # {}", number);
143                    Ok(number)
144                })
145                // the line bellow is commented out, since the default executor, `spawn_executor()`, expects Results of Futures
146                // -- the code bellow would remove the Future, making the Stream yield Results of numbers, which, then, could be executed
147                // by the executors from the test cases above.
148                // .buffer_unordered(4)
149        };
150
151        let uni = TestUni::<u32, 1024, 1>::new("async_elements()")
152            .spawn_executors(PARTS.len() as u32, Duration::from_secs(2), on_event,
153                             |err| async move { error!("on_err_callback(): cought error '{:?}'", err) },
154                             |_executor| async move { properly_closed_ref.store(true, Relaxed); });
155
156        let producer = |item| uni.send_with(move |slot| *slot = item).expect_ok("couldn't send");
157
158        let shared_producer = &producer;
159        stream::iter(PARTS)
160            .for_each_concurrent(1, |number| async move {
161                shared_producer(*number);
162            }).await;
163
164        assert!(uni.close(Duration::ZERO).await, "Uni wasn't properly closed");
165        assert!(properly_closed.load(Relaxed), "the `on_close_callback()` wasn't called!");
166        assert_eq!(observed_sum.load(Relaxed), EXPECTED_SUM, "not all events passed through our async pipeline");
167    }
168
169    /// assures stats are computed appropriately for every executor,
170    /// according to the right instrumentation specifications
171    #[cfg_attr(not(doc),tokio::test)]
172    #[ignore]   // flaky if ran in multi-thread: timeout measurements go south
173    async fn stats() {
174
175        // asserts spawn_non_futures_non_fallible_executor() register statistics appropriately:
176        // with counters, but without average futures resolution time measurements
177        let event_name = "non_future/non_fallible event";
178        let uni = TestUni::<String, 1024, 1>::new(event_name)
179            .spawn_non_futures_non_fallibles_executors(1, |stream| stream, |_| async {});
180        let producer = |item| uni.send_with(|slot| *slot = item).expect_ok("couldn't send");
181        producer("'only count successes' payload".to_string());
182        assert!(uni.close(Duration::ZERO).await, "Uni wasn't properly closed");
183        let (ok_counter, ok_avg_futures_resolution_duration) = uni.stream_executors[0].ok_events_avg_future_duration.lightweight_probe();
184        assert_eq!(ok_counter,                               1,    "counter of successful '{}' events is wrong", event_name);
185        assert_eq!(ok_avg_futures_resolution_duration,       -1.0, "avg futures resolution time of successful '{}' events is wrong -- since it is a non-future, avg times should be always -1.0", event_name);
186        let (failures_counter, failures_avg_futures_resolution_duration) = uni.stream_executors[0].failed_events_avg_future_duration.lightweight_probe();
187        assert_eq!(failures_counter,                         0,    "counter of unsuccessful '{}' events is wrong -- since it is a non-fallible event, failures should always be 0", event_name);
188        assert_eq!(failures_avg_futures_resolution_duration, 0.0,  "avg futures resolution time of unsuccessful '{}' events is wrong -- since it is a non-fallible event,, avg times should be always 0.0", event_name);
189        let (timeouts_counter, timeouts_avg_futures_resolution_duration) = uni.stream_executors[0].timed_out_events_avg_future_duration.lightweight_probe();
190        assert_eq!(timeouts_counter,                         0,    "counter of timed out '{}' events is wrong -- since it is a non-future event, timeouts should always be 0", event_name);
191        assert_eq!(timeouts_avg_futures_resolution_duration, 0.0,  "avg futures resolution time of timed out '{}' events is wrong -- since it is a non-future event,, avg timeouts should be always 0.0", event_name);
192
193        // asserts spawn_executor() register statistics appropriately:
194        // with counters & with average futures resolution time measurements
195        let event_name = "future & fallible event";
196        let uni = TestUni::<String, 1024, 1>::new(event_name)
197            .spawn_executors(1,
198                             Duration::from_millis(150),
199                             |stream| {
200                                stream.map(|payload: String| async move {
201                                    if payload.contains("unsuccessful") {
202                                        tokio::time::sleep(Duration::from_millis(50)).await;
203                                        Err(Box::from(String::from("failing the pipeline, as requested")))
204                                    } else if payload.contains("timeout") {
205                                        tokio::time::sleep(Duration::from_millis(200)).await;
206                                        Ok("this answer will never make it -- stream executor times out after 100ms".to_string())
207                                    } else {
208                                        tokio::time::sleep(Duration::from_millis(100)).await;
209                                        Ok(payload)
210                                    }
211                                })
212                            },
213                             |_| async {},
214                             |_| async {}
215            );
216        let producer = |item| uni.send_with(|slot| *slot = item).expect_ok("couldn't send");
217        // for this test, produce each event twice
218        for _i in 0..2 {
219            producer("'successful' payload".to_string());
220            producer("'unsuccessful' payload".to_string());
221            producer("'timeout' payload".to_string());
222        }
223        assert!(uni.close(Duration::ZERO).await, "Uni wasn't properly closed");
224        let (ok_counter, ok_avg_futures_resolution_duration) = uni.stream_executors[0].ok_events_avg_future_duration.lightweight_probe();
225        assert_eq!(ok_counter,                                              2,   "counter of successful '{}' events is wrong", event_name);
226        assert!((ok_avg_futures_resolution_duration-0.100).abs()        < 15e-2, "avg futures resolution time of successful '{}' events is wrong -- it should be 0.1s", event_name);
227        let (failures_counter, failures_avg_futures_resolution_duration) = uni.stream_executors[0].failed_events_avg_future_duration.lightweight_probe();
228        assert_eq!(failures_counter,                                       2,   "counter of unsuccessful '{}' events is wrong", event_name);
229        assert!((failures_avg_futures_resolution_duration-0.050).abs() < 15e-2, "avg futures resolution time of unsuccessful '{}' events is wrong -- it should be 0.05s, but was {}", event_name, failures_avg_futures_resolution_duration);
230        let (timeouts_counter, timeouts_avg_futures_resolution_duration) = uni.stream_executors[0].timed_out_events_avg_future_duration.lightweight_probe();
231        assert_eq!(timeouts_counter,                                       2,   "counter of timed out '{}' events is wrong", event_name);
232        assert!((timeouts_avg_futures_resolution_duration-0.150).abs() < 15e-2, "avg futures resolution time of timed out '{}' events is wrong -- it should be 0.150s", event_name);
233
234    }
235
236
237    /// shows how to fuse multiple `uni`s, triggering payloads for another uni when certain conditions are met:
238    /// events TWO and FOUR will set a shared state between them, firing SIX.
239    #[cfg_attr(not(doc),tokio::test)]
240    async fn demux() {
241        let shared_state = Arc::new(AtomicU32::new(0));
242        let two_fire_count = Arc::new(AtomicU32::new(0));
243        let four_fire_count = Arc::new(AtomicU32::new(0));
244        let six_fire_count = Arc::new(AtomicU32::new(0));
245
246        // SIX event
247        let six_fire_count_ref = Arc::clone(&six_fire_count);
248        let on_six_event = move |stream: MutinyStream<'static, (), _, ()>| {
249            let six_fire_count_ref = Arc::clone(&six_fire_count_ref);
250            stream.inspect(move |_| {
251                six_fire_count_ref.fetch_add(1, Relaxed);
252            })
253        };
254        let six_uni = TestUni::<(), 1024, 1>::new("SIX event")
255            .spawn_non_futures_non_fallibles_executors(1, on_six_event, |_| async {});
256        // assures we'll close SIX only once
257        let can_six_be_closed = Arc::new(AtomicBool::new(true));
258        let six_uni_ref = Arc::clone(&six_uni);
259        let six_closer = Arc::new(move || {
260            let can_six_be_closed = Arc::clone(&can_six_be_closed);
261            let six_uni = Arc::clone(&six_uni_ref);
262            async move {
263                if can_six_be_closed.swap(false, Relaxed) {
264                    assert!(six_uni.close(Duration::ZERO).await, "`six_uni` wasn't properly closed");
265                }
266            }
267        });
268
269        // TWO event
270        let on_two_event = |stream: MutinyStream<'static, u32, _, u32>| {
271            let two_fire_count = Arc::clone(&two_fire_count);
272            let shared_state = Arc::clone(&shared_state);
273            let six_uni = Arc::clone(&six_uni);
274            stream
275                .map(move |event| {
276                    let two_fire_count = Arc::clone(&two_fire_count);
277                    let shared_state = Arc::clone(&shared_state);
278                    let six_uni = Arc::clone(&six_uni);
279                    async move {
280                        two_fire_count.fetch_add(1, Relaxed);
281                        if event & 2 == 2 {
282                            let previous_state = shared_state.fetch_or(2, Relaxed);
283                            if previous_state & 6 == 6 {
284                                shared_state.store(0, Relaxed); // reset the triggering state
285                                assert!(six_uni.send_with(|slot| *slot = ()).is_ok(), "couldn't send");
286                            }
287                        } else if event == 97 {
288                            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
289                        }
290                        event
291                    }
292            })
293            .buffer_unordered(1)
294        };
295        let six_closer_for_two = Arc::clone(&six_closer);
296        let on_two_close = move |_| {
297            let six_closer_for_two = Arc::clone(&six_closer_for_two);
298            async move {
299                six_closer_for_two().await;
300            }
301        };
302        let two_uni = TestUni::<u32, 1024, 1>::new("TWO event")
303            .spawn_non_futures_non_fallibles_executors(1, on_two_event, on_two_close);
304        let two_producer = |item| two_uni.send_with(move |slot| *slot = item).expect_ok("couldn't send `two`");
305
306        // FOUR event
307        let on_four_event = |stream: MutinyStream<'static, u32, _, u32>| {
308            let four_fire_count = Arc::clone(&four_fire_count);
309            let shared_state = Arc::clone(&shared_state);
310            let six_uni = Arc::clone(&six_uni);
311            stream
312                .map(move |event| {
313                    let four_fire_count = Arc::clone(&four_fire_count);
314                    let shared_state = Arc::clone(&shared_state);
315                    let six_uni = Arc::clone(&six_uni);
316                    async move {
317                        four_fire_count.fetch_add(1, Relaxed);
318                        if event & 4 == 4 {
319                            let previous_state = shared_state.fetch_or(4, Relaxed);
320                            if previous_state & 6 == 6 {
321                                shared_state.store(0, Relaxed); // reset the triggering state
322                                assert!(six_uni.send_with(|slot| *slot = ()).is_ok(), "couldn't send");
323                            }
324                        } else if event == 97 {
325                            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
326                        }
327                        event
328                    }
329                })
330                .buffer_unordered(1)
331        };
332        let six_closer_for_four = Arc::clone(&six_closer);
333        let on_four_close = move |_| {
334            let six_closer_for_four = Arc::clone(&six_closer_for_four);
335            async move {
336                six_closer_for_four().await;
337            }
338        };
339        let four_uni = TestUni::<u32, 1024, 1>::new("FOUR event")
340            .spawn_non_futures_non_fallibles_executors(1, on_four_event, on_four_close);
341        let four_producer = |item| four_uni.send_with(move |slot| *slot = item).expect_ok("couldn't send `four`");
342
343        // NOTE: the special value of 97 causes a sleep on both TWO and FOUR pipelines
344        //       so we can test race conditions for the 'close producer' functions
345        two_producer(1);
346        two_producer(2);
347        four_producer(97);    // sleeps, forcing any bugs regarding racing conditions to blow up
348        four_producer(1);
349        four_producer(2);
350        four_producer(3);
351        four_producer(4);
352        two_producer(3);
353        two_producer(4);
354        four_producer(5);
355        tokio::time::sleep(Duration::from_millis(100)).await;     // flakiness protection: wait a tad before atomically closing `two` and `four` -- if not, `six` might be closed before the `six` event is sent, causing this test to fail.
356        unis_close_async!(Duration::ZERO, two_uni, four_uni);  // notice SIX is closed here as well
357                                                               // closing TWO (and, therefore, SIX) before all elements of FOUR are processed would cause the later consumer to try to publish to SIX (when it is already closed) --
358                                                               // this is why both events should be closed atomically in this case -- both share the closeable resource SIX -- which happens to be another uni, but could be any other resource
359
360        assert_eq!(two_fire_count.load(Relaxed),  4, "Wrong number of events processed for TWO");
361        assert_eq!(four_fire_count.load(Relaxed), 6, "Wrong number of events processed for FOUR");
362        assert_eq!(six_fire_count.load(Relaxed),  1, "Wrong number of events processed for SIX");
363
364    }
365
366    /// shows how to handle errors when they happen anywhere down the pipeline
367    /// -- and what happens when they are not handled.
368    /// + tests meaningful messages are produced
369    #[cfg_attr(not(doc),tokio::test)]
370    async fn error_handling() {
371
372        let on_err_count = Arc::new(AtomicU32::new(0));
373
374        fn on_fail_when_odd_event(stream: impl Stream<Item=u32>) -> impl Stream<Item = impl Future<Output = Result<u32, Box<dyn std::error::Error + Send + Sync>> > + Send> {
375            stream
376                .map(|payload| async move {
377                    if payload % 2 == 0 {
378                        Ok(payload)
379                    } else if payload % 79 == 0 {
380                        Err(format!("BLOW CODE received: {}", payload))
381                    } else {
382                        Err(format!("ODD payload received: {}", payload))
383                    }
384                })
385                // treat known errors
386                .filter_map(|payload| async {
387                    let payload = payload.await;
388                    match payload {
389                        Ok(ok_payload ) => {
390                            println!("Payload {} ACCURATELY PROCESSED!", ok_payload);
391                            Some(payload)
392                        },
393                        Err(ref err) => {
394                            if err.contains("ODD") {
395                                println!("Payload {} ERROR LOG -- this error is tolerable and this event will be skipped for the rest of the pipeline", err);
396                                None
397                            } else {
398                                // other errors are "unknown" -- therefore, not tolerable nor treated nor recovered from... and will explode down the pipeline, causing the stream to close
399                                Some(payload)
400                            }
401                        }
402                        //unknown_error => Some(unknown_error),
403                    }
404                })
405                .map(|payload| async {
406                    let payload = payload?;
407                    // if this is executed, the payload had no errors OR the error was handled and the failed event was filtered out
408                    println!("Payload {} continued down the pipe ", payload);
409                    Ok(payload)
410                })
411        }
412        let on_err_count_clone = Arc::clone(&on_err_count);
413        let uni = TestUni::<u32, 1024, 1>::new("fallible event")
414            .spawn_executors(1,
415                             Duration::from_millis(100),
416                             on_fail_when_odd_event,
417                             move |err| {
418                                let on_err_count_clone = Arc::clone(&on_err_count_clone);
419                                async move {
420                                    on_err_count_clone.fetch_add(1, Relaxed);
421                                    println!("ERROR CALLBACK WAS CALLED: '{:?}'", err);
422                                }
423                            },
424                             |_| async {}
425            );
426        let producer = |item| uni.send_with(move |slot| *slot = item).expect_ok("couldn't send");
427        producer(0);
428        producer(1);
429        producer(2);
430        producer(79);
431        producer(80);
432        assert!(uni.close(Duration::ZERO).await, "Uni wasn't properly closed");
433
434        assert_eq!(on_err_count.load(Relaxed), 1, "'on_err()' callback contract broken: events with handled errors should not call on_err(), the ones not 'caught', should")
435    }
436
437    /// assures performance won't be degraded when we make changes
438    #[cfg_attr(not(doc),tokio::test(flavor="multi_thread", worker_threads=2))]
439    #[ignore]   // must run in a single thread for accurate measurements
440    async fn performance_measurements() {
441
442        #[cfg(not(debug_assertions))]
443        const FACTOR: u32 = 8192;
444        #[cfg(debug_assertions)]
445        const FACTOR: u32 = 40;
446
447        /// measure how long it takes to stream a certain number of elements through the given `uni`
448        async fn profile_uni<UniChannelType:    FullDuplexUniChannel<ItemType=u32, DerivedItemType=u32> + Sync + Send + 'static,
449                             const INSTRUMENTS: usize>
450                            (uni:            Arc<Uni<u32, UniChannelType, INSTRUMENTS>>,
451                             profiling_name: &str,
452                             count:          u32) {
453            print!("{profiling_name} "); std::io::stdout().flush().unwrap();
454            let mut full_count = 0u32;
455            let start = Instant::now();
456            for e in 0..count {
457                uni.send_with(|slot| *slot = e)
458                    .retry_with(|setter| {
459                        full_count += 1;
460                        if full_count % (1<<26) == 0 {
461                            print!("(stuck at e #{e}?)"); std::io::stdout().flush().unwrap();
462                        } else if full_count % (1<<20) == 0 {
463                            print!("."); std::io::stdout().flush().unwrap();
464                        }
465                        uni.send_with(setter)
466                    })
467                    .spinning_forever();
468            }
469            assert!(uni.close(Duration::from_secs(5)).await, "Uni wasn't properly closed");
470            let elapsed = start.elapsed();
471            println!("{:10.2}/s -- {} items processed in {:?}",
472                     count as f64 / elapsed.as_secs_f64(),
473                     count,
474                     elapsed);
475        }
476
477        println!();
478
479        let profiling_name = "metricfull_non_futures_non_fallible_uni:    ";
480        let uni = TestUni::<u32, 8192, 1, {Instruments::MetricsWithoutLogs.into()}>::new(profiling_name)
481            .spawn_non_futures_non_fallibles_executors(1, |stream| stream, |_| async {});
482        profile_uni(uni, profiling_name, 1024*FACTOR).await;
483
484        let profiling_name = "metricless_non_futures_non_fallible_uni:    ";
485        let uni = TestUni::<u32, 8192, 1, {Instruments::NoInstruments.into()}>::new(profiling_name)
486            .spawn_non_futures_non_fallibles_executors(1, |stream| stream, |_| async {});
487        profile_uni(uni, profiling_name, 1024*FACTOR).await;
488
489        let profiling_name = "par_metricless_non_futures_non_fallible_uni:";
490        let uni = TestUni::<u32, 8192, 1, {Instruments::NoInstruments.into()}>::new(profiling_name)
491            .spawn_non_futures_non_fallibles_executors(12, |stream| stream, |_| async {});
492        profile_uni(uni, profiling_name, 1024*FACTOR).await;
493
494        let profiling_name = "metricfull_futures_fallible_uni:            ";
495        let uni = TestUni::<u32, 8192, 1, {Instruments::MetricsWithoutLogs.into()}>::new(profiling_name)
496            .spawn_executors(1,
497                             Duration::ZERO,
498                             |stream| {
499                                stream.map(|number| async move {
500                                        Ok(number)
501                                    })
502                            },
503                             |_err| async {},
504                             |_| async {});
505        profile_uni(uni, profiling_name, 1024*FACTOR).await;
506
507        let profiling_name = "metricless_futures_fallible_uni:            ";
508        let uni = TestUni::<u32, 8192, 1, {Instruments::NoInstruments.into()}>::new(profiling_name)
509            .spawn_executors(1,
510                             Duration::ZERO,
511                             |stream| {
512                                stream.map(|number| async move {
513                                        Ok(number)
514                                    })
515                            },
516                             |_err| async {},
517                             |_| async {});
518        profile_uni(uni, profiling_name, 1024*FACTOR).await;
519
520        let profiling_name = "timeoutable_metricfull_futures_fallible_uni:";
521        let uni = TestUni::<u32, 8192, 1, {Instruments::MetricsWithoutLogs.into()}>::new(profiling_name)
522            .spawn_executors(1,
523                             Duration::from_millis(100),
524                             |stream| {
525                                stream.map(|number| async move {
526                                        Ok(number)
527                                    })
528                            },
529                             |_err| async {},
530                             |_| async {});
531        profile_uni(uni, profiling_name, 768*FACTOR).await;
532
533        let profiling_name = "timeoutable_metricless_futures_fallible_uni:";
534        let uni = TestUni::<u32, 8192, 1, {Instruments::NoInstruments.into()}>::new(profiling_name)
535            .spawn_executors(1,
536                             Duration::from_millis(100),
537                             |stream| {
538                                stream.map(|number| async move {
539                                        Ok(number)
540                                    })
541                            },
542                             |_err| async {},
543                             |_| async {});
544        profile_uni(uni, profiling_name, 768*FACTOR).await;
545
546        /*
547
548        As of Sept, 22th (after using multi-threaded tokio tests):
549
550        RUSTFLAGS="-C target-cpu=native" cargo test --release performance_measurements -- --test-threads 1 --nocapture
551
552        test mutiny::uni::tests::performance_measurements ...
553        metricfull_non_futures_non_fallible_uni:      511739.18/s -- 1048576 items processed in 2.049043793s
554        metricless_non_futures_non_fallible_uni:      570036.96/s -- 1048576 items processed in 1.839487733s
555        par_metricless_non_futures_non_fallible_uni:  479614.17/s -- 1048576 items processed in 2.18629069s
556        metricfull_futures_fallible_uni:              428879.60/s -- 1048576 items processed in 2.444919271s
557        metricless_futures_fallible_uni:              659091.97/s -- 1048576 items processed in 1.590940328s
558        timeoutable_metricfull_futures_fallible_uni:  469629.46/s -- 786432 items processed in 1.674579774s
559        timeoutable_metricless_futures_fallible_uni:  949109.14/s -- 786432 items processed in 828.600172ms
560
561        */
562    }
563
564}