1mod uni;
23pub use uni::*;
24
25pub mod channels;
26
27
28#[cfg(any(test,doc))]
30mod tests {
31 use super::*;
32 use crate::{
33 prelude::MutinyStream,
34 instruments::Instruments,
35 types::{ChannelCommon, FullDuplexUniChannel},
36 };
37 use std::{sync::{
38 Arc,
39 atomic::{AtomicBool, AtomicU32, Ordering::Relaxed},
40 }, time::Duration, future::Future, io::Write};
41 use futures::stream::{self, Stream, StreamExt};
42 use minstant::Instant;
43 use log::error;
44
45
46 type TestUni<InType,
48 const BUFFER_SIZE: usize,
49 const MAX_STREAMS: usize,
50 const INSTRUMENTS: usize = {Instruments::LogsWithMetrics.into()}>
51 = crate::uni::Uni<InType,
52 channels::movable::full_sync::FullSync<'static, InType, BUFFER_SIZE, MAX_STREAMS>,
53 INSTRUMENTS,
54 InType>;
55
56
57
58 #[ctor::ctor]
59 fn suite_setup() {
60 simple_logger::SimpleLogger::new().with_utc_timestamps().init().unwrap_or_else(|_| eprintln!("--> LOGGER WAS ALREADY STARTED"));
61 }
62
63 #[cfg_attr(not(doc),tokio::test)]
65 async fn doc_tests() {
66 fn on_event<'r>(stream: impl Stream<Item=&'r str>) -> impl Stream<Item=&'r str> {
67 stream
68 .inspect(|message| println!("To Zeta: '{}'", message))
69 .inspect(|sneak_peeked_message| println!("EARTH: Sneak peeked a message to Zeta Reticuli: '{}'", sneak_peeked_message))
70 .inspect(|message| println!("ZETA: Received a message: '{}'", message))
71 }
72 let uni = TestUni::<&str, 1024, 1>::new("doc_test()")
73 .spawn_non_futures_non_fallibles_executors(1, on_event, |_| async {});
74 let producer = |item| uni.send_with(move |slot| *slot = item).expect_ok("couldn't send");
75 producer("I've just arrived!");
76 producer("Nothing really interesting here... heading back home!");
77 assert!(uni.close(Duration::from_secs(10)).await, "Uni wasn't properly closed");
78 }
79
80 #[cfg_attr(not(doc),tokio::test)]
82 async fn simple_pipeline() {
83 const EXPECTED_SUM: u32 = 17;
84 const PARTS: &[u32] = &[9, 8];
85
86 let observed_sum = Arc::new(AtomicU32::new(0));
88
89 let uni = TestUni::<u32, 1024, 1>::new("simple_pipeline()")
91 .spawn_non_futures_non_fallibles_executors(1,
92 |stream| {
93 let observed_sum = Arc::clone(&observed_sum);
94 stream
95 .map(move |number| observed_sum.fetch_add(number, Relaxed))
96 },
97 |_| async {});
98 let producer = |item| uni.send_with(move |slot| *slot = item).expect_ok("couldn't send");
99
100 let shared_producer = &producer;
103 stream::iter(PARTS)
104 .for_each_concurrent(1, |number| async move {
105 shared_producer(*number);
106 }).await;
107
108 assert!(uni.close(Duration::ZERO).await, "Uni wasn't properly closed");
109 assert_eq!(observed_sum.load(Relaxed), EXPECTED_SUM, "not all events passed through our pipeline");
110 }
111
112 #[cfg_attr(not(doc),tokio::test)]
115 async fn async_elements() {
116 const EXPECTED_SUM: u32 = 30;
117 const PARTS: &[u32] = &[9, 8, 7, 6];
118 let observed_sum = Arc::new(AtomicU32::new(0));
119 let properly_closed = Arc::new(AtomicBool::new(false));
120 let properly_closed_ref = Arc::clone(&properly_closed);
121
122 let on_event = |stream: MutinyStream<'static, u32, _, u32>| {
126 let observed_sum = Arc::clone(&observed_sum);
127 stream
128 .map(|number| async move {
129 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
130 number
131 })
132 .map(move |number| {
133 let observed_sum = Arc::clone(&observed_sum);
134 async move {
135 let number = number.await;
136 observed_sum.fetch_add(number, Relaxed);
137 number
138 }
139 })
140 .map(|number| async move {
141 let number = number.await;
142 println!("Just added # {}", number);
143 Ok(number)
144 })
145 };
150
151 let uni = TestUni::<u32, 1024, 1>::new("async_elements()")
152 .spawn_executors(PARTS.len() as u32, Duration::from_secs(2), on_event,
153 |err| async move { error!("on_err_callback(): cought error '{:?}'", err) },
154 |_executor| async move { properly_closed_ref.store(true, Relaxed); });
155
156 let producer = |item| uni.send_with(move |slot| *slot = item).expect_ok("couldn't send");
157
158 let shared_producer = &producer;
159 stream::iter(PARTS)
160 .for_each_concurrent(1, |number| async move {
161 shared_producer(*number);
162 }).await;
163
164 assert!(uni.close(Duration::ZERO).await, "Uni wasn't properly closed");
165 assert!(properly_closed.load(Relaxed), "the `on_close_callback()` wasn't called!");
166 assert_eq!(observed_sum.load(Relaxed), EXPECTED_SUM, "not all events passed through our async pipeline");
167 }
168
169 #[cfg_attr(not(doc),tokio::test)]
172 #[ignore] async fn stats() {
174
175 let event_name = "non_future/non_fallible event";
178 let uni = TestUni::<String, 1024, 1>::new(event_name)
179 .spawn_non_futures_non_fallibles_executors(1, |stream| stream, |_| async {});
180 let producer = |item| uni.send_with(|slot| *slot = item).expect_ok("couldn't send");
181 producer("'only count successes' payload".to_string());
182 assert!(uni.close(Duration::ZERO).await, "Uni wasn't properly closed");
183 let (ok_counter, ok_avg_futures_resolution_duration) = uni.stream_executors[0].ok_events_avg_future_duration.lightweight_probe();
184 assert_eq!(ok_counter, 1, "counter of successful '{}' events is wrong", event_name);
185 assert_eq!(ok_avg_futures_resolution_duration, -1.0, "avg futures resolution time of successful '{}' events is wrong -- since it is a non-future, avg times should be always -1.0", event_name);
186 let (failures_counter, failures_avg_futures_resolution_duration) = uni.stream_executors[0].failed_events_avg_future_duration.lightweight_probe();
187 assert_eq!(failures_counter, 0, "counter of unsuccessful '{}' events is wrong -- since it is a non-fallible event, failures should always be 0", event_name);
188 assert_eq!(failures_avg_futures_resolution_duration, 0.0, "avg futures resolution time of unsuccessful '{}' events is wrong -- since it is a non-fallible event,, avg times should be always 0.0", event_name);
189 let (timeouts_counter, timeouts_avg_futures_resolution_duration) = uni.stream_executors[0].timed_out_events_avg_future_duration.lightweight_probe();
190 assert_eq!(timeouts_counter, 0, "counter of timed out '{}' events is wrong -- since it is a non-future event, timeouts should always be 0", event_name);
191 assert_eq!(timeouts_avg_futures_resolution_duration, 0.0, "avg futures resolution time of timed out '{}' events is wrong -- since it is a non-future event,, avg timeouts should be always 0.0", event_name);
192
193 let event_name = "future & fallible event";
196 let uni = TestUni::<String, 1024, 1>::new(event_name)
197 .spawn_executors(1,
198 Duration::from_millis(150),
199 |stream| {
200 stream.map(|payload: String| async move {
201 if payload.contains("unsuccessful") {
202 tokio::time::sleep(Duration::from_millis(50)).await;
203 Err(Box::from(String::from("failing the pipeline, as requested")))
204 } else if payload.contains("timeout") {
205 tokio::time::sleep(Duration::from_millis(200)).await;
206 Ok("this answer will never make it -- stream executor times out after 100ms".to_string())
207 } else {
208 tokio::time::sleep(Duration::from_millis(100)).await;
209 Ok(payload)
210 }
211 })
212 },
213 |_| async {},
214 |_| async {}
215 );
216 let producer = |item| uni.send_with(|slot| *slot = item).expect_ok("couldn't send");
217 for _i in 0..2 {
219 producer("'successful' payload".to_string());
220 producer("'unsuccessful' payload".to_string());
221 producer("'timeout' payload".to_string());
222 }
223 assert!(uni.close(Duration::ZERO).await, "Uni wasn't properly closed");
224 let (ok_counter, ok_avg_futures_resolution_duration) = uni.stream_executors[0].ok_events_avg_future_duration.lightweight_probe();
225 assert_eq!(ok_counter, 2, "counter of successful '{}' events is wrong", event_name);
226 assert!((ok_avg_futures_resolution_duration-0.100).abs() < 15e-2, "avg futures resolution time of successful '{}' events is wrong -- it should be 0.1s", event_name);
227 let (failures_counter, failures_avg_futures_resolution_duration) = uni.stream_executors[0].failed_events_avg_future_duration.lightweight_probe();
228 assert_eq!(failures_counter, 2, "counter of unsuccessful '{}' events is wrong", event_name);
229 assert!((failures_avg_futures_resolution_duration-0.050).abs() < 15e-2, "avg futures resolution time of unsuccessful '{}' events is wrong -- it should be 0.05s, but was {}", event_name, failures_avg_futures_resolution_duration);
230 let (timeouts_counter, timeouts_avg_futures_resolution_duration) = uni.stream_executors[0].timed_out_events_avg_future_duration.lightweight_probe();
231 assert_eq!(timeouts_counter, 2, "counter of timed out '{}' events is wrong", event_name);
232 assert!((timeouts_avg_futures_resolution_duration-0.150).abs() < 15e-2, "avg futures resolution time of timed out '{}' events is wrong -- it should be 0.150s", event_name);
233
234 }
235
236
237 #[cfg_attr(not(doc),tokio::test)]
240 async fn demux() {
241 let shared_state = Arc::new(AtomicU32::new(0));
242 let two_fire_count = Arc::new(AtomicU32::new(0));
243 let four_fire_count = Arc::new(AtomicU32::new(0));
244 let six_fire_count = Arc::new(AtomicU32::new(0));
245
246 let six_fire_count_ref = Arc::clone(&six_fire_count);
248 let on_six_event = move |stream: MutinyStream<'static, (), _, ()>| {
249 let six_fire_count_ref = Arc::clone(&six_fire_count_ref);
250 stream.inspect(move |_| {
251 six_fire_count_ref.fetch_add(1, Relaxed);
252 })
253 };
254 let six_uni = TestUni::<(), 1024, 1>::new("SIX event")
255 .spawn_non_futures_non_fallibles_executors(1, on_six_event, |_| async {});
256 let can_six_be_closed = Arc::new(AtomicBool::new(true));
258 let six_uni_ref = Arc::clone(&six_uni);
259 let six_closer = Arc::new(move || {
260 let can_six_be_closed = Arc::clone(&can_six_be_closed);
261 let six_uni = Arc::clone(&six_uni_ref);
262 async move {
263 if can_six_be_closed.swap(false, Relaxed) {
264 assert!(six_uni.close(Duration::ZERO).await, "`six_uni` wasn't properly closed");
265 }
266 }
267 });
268
269 let on_two_event = |stream: MutinyStream<'static, u32, _, u32>| {
271 let two_fire_count = Arc::clone(&two_fire_count);
272 let shared_state = Arc::clone(&shared_state);
273 let six_uni = Arc::clone(&six_uni);
274 stream
275 .map(move |event| {
276 let two_fire_count = Arc::clone(&two_fire_count);
277 let shared_state = Arc::clone(&shared_state);
278 let six_uni = Arc::clone(&six_uni);
279 async move {
280 two_fire_count.fetch_add(1, Relaxed);
281 if event & 2 == 2 {
282 let previous_state = shared_state.fetch_or(2, Relaxed);
283 if previous_state & 6 == 6 {
284 shared_state.store(0, Relaxed); assert!(six_uni.send_with(|slot| *slot = ()).is_ok(), "couldn't send");
286 }
287 } else if event == 97 {
288 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
289 }
290 event
291 }
292 })
293 .buffer_unordered(1)
294 };
295 let six_closer_for_two = Arc::clone(&six_closer);
296 let on_two_close = move |_| {
297 let six_closer_for_two = Arc::clone(&six_closer_for_two);
298 async move {
299 six_closer_for_two().await;
300 }
301 };
302 let two_uni = TestUni::<u32, 1024, 1>::new("TWO event")
303 .spawn_non_futures_non_fallibles_executors(1, on_two_event, on_two_close);
304 let two_producer = |item| two_uni.send_with(move |slot| *slot = item).expect_ok("couldn't send `two`");
305
306 let on_four_event = |stream: MutinyStream<'static, u32, _, u32>| {
308 let four_fire_count = Arc::clone(&four_fire_count);
309 let shared_state = Arc::clone(&shared_state);
310 let six_uni = Arc::clone(&six_uni);
311 stream
312 .map(move |event| {
313 let four_fire_count = Arc::clone(&four_fire_count);
314 let shared_state = Arc::clone(&shared_state);
315 let six_uni = Arc::clone(&six_uni);
316 async move {
317 four_fire_count.fetch_add(1, Relaxed);
318 if event & 4 == 4 {
319 let previous_state = shared_state.fetch_or(4, Relaxed);
320 if previous_state & 6 == 6 {
321 shared_state.store(0, Relaxed); assert!(six_uni.send_with(|slot| *slot = ()).is_ok(), "couldn't send");
323 }
324 } else if event == 97 {
325 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
326 }
327 event
328 }
329 })
330 .buffer_unordered(1)
331 };
332 let six_closer_for_four = Arc::clone(&six_closer);
333 let on_four_close = move |_| {
334 let six_closer_for_four = Arc::clone(&six_closer_for_four);
335 async move {
336 six_closer_for_four().await;
337 }
338 };
339 let four_uni = TestUni::<u32, 1024, 1>::new("FOUR event")
340 .spawn_non_futures_non_fallibles_executors(1, on_four_event, on_four_close);
341 let four_producer = |item| four_uni.send_with(move |slot| *slot = item).expect_ok("couldn't send `four`");
342
343 two_producer(1);
346 two_producer(2);
347 four_producer(97); four_producer(1);
349 four_producer(2);
350 four_producer(3);
351 four_producer(4);
352 two_producer(3);
353 two_producer(4);
354 four_producer(5);
355 tokio::time::sleep(Duration::from_millis(100)).await; unis_close_async!(Duration::ZERO, two_uni, four_uni); assert_eq!(two_fire_count.load(Relaxed), 4, "Wrong number of events processed for TWO");
361 assert_eq!(four_fire_count.load(Relaxed), 6, "Wrong number of events processed for FOUR");
362 assert_eq!(six_fire_count.load(Relaxed), 1, "Wrong number of events processed for SIX");
363
364 }
365
366 #[cfg_attr(not(doc),tokio::test)]
370 async fn error_handling() {
371
372 let on_err_count = Arc::new(AtomicU32::new(0));
373
374 fn on_fail_when_odd_event(stream: impl Stream<Item=u32>) -> impl Stream<Item = impl Future<Output = Result<u32, Box<dyn std::error::Error + Send + Sync>> > + Send> {
375 stream
376 .map(|payload| async move {
377 if payload % 2 == 0 {
378 Ok(payload)
379 } else if payload % 79 == 0 {
380 Err(format!("BLOW CODE received: {}", payload))
381 } else {
382 Err(format!("ODD payload received: {}", payload))
383 }
384 })
385 .filter_map(|payload| async {
387 let payload = payload.await;
388 match payload {
389 Ok(ok_payload ) => {
390 println!("Payload {} ACCURATELY PROCESSED!", ok_payload);
391 Some(payload)
392 },
393 Err(ref err) => {
394 if err.contains("ODD") {
395 println!("Payload {} ERROR LOG -- this error is tolerable and this event will be skipped for the rest of the pipeline", err);
396 None
397 } else {
398 Some(payload)
400 }
401 }
402 }
404 })
405 .map(|payload| async {
406 let payload = payload?;
407 println!("Payload {} continued down the pipe ", payload);
409 Ok(payload)
410 })
411 }
412 let on_err_count_clone = Arc::clone(&on_err_count);
413 let uni = TestUni::<u32, 1024, 1>::new("fallible event")
414 .spawn_executors(1,
415 Duration::from_millis(100),
416 on_fail_when_odd_event,
417 move |err| {
418 let on_err_count_clone = Arc::clone(&on_err_count_clone);
419 async move {
420 on_err_count_clone.fetch_add(1, Relaxed);
421 println!("ERROR CALLBACK WAS CALLED: '{:?}'", err);
422 }
423 },
424 |_| async {}
425 );
426 let producer = |item| uni.send_with(move |slot| *slot = item).expect_ok("couldn't send");
427 producer(0);
428 producer(1);
429 producer(2);
430 producer(79);
431 producer(80);
432 assert!(uni.close(Duration::ZERO).await, "Uni wasn't properly closed");
433
434 assert_eq!(on_err_count.load(Relaxed), 1, "'on_err()' callback contract broken: events with handled errors should not call on_err(), the ones not 'caught', should")
435 }
436
437 #[cfg_attr(not(doc),tokio::test(flavor="multi_thread", worker_threads=2))]
439 #[ignore] async fn performance_measurements() {
441
442 #[cfg(not(debug_assertions))]
443 const FACTOR: u32 = 8192;
444 #[cfg(debug_assertions)]
445 const FACTOR: u32 = 40;
446
447 async fn profile_uni<UniChannelType: FullDuplexUniChannel<ItemType=u32, DerivedItemType=u32> + Sync + Send + 'static,
449 const INSTRUMENTS: usize>
450 (uni: Arc<Uni<u32, UniChannelType, INSTRUMENTS>>,
451 profiling_name: &str,
452 count: u32) {
453 print!("{profiling_name} "); std::io::stdout().flush().unwrap();
454 let mut full_count = 0u32;
455 let start = Instant::now();
456 for e in 0..count {
457 uni.send_with(|slot| *slot = e)
458 .retry_with(|setter| {
459 full_count += 1;
460 if full_count % (1<<26) == 0 {
461 print!("(stuck at e #{e}?)"); std::io::stdout().flush().unwrap();
462 } else if full_count % (1<<20) == 0 {
463 print!("."); std::io::stdout().flush().unwrap();
464 }
465 uni.send_with(setter)
466 })
467 .spinning_forever();
468 }
469 assert!(uni.close(Duration::from_secs(5)).await, "Uni wasn't properly closed");
470 let elapsed = start.elapsed();
471 println!("{:10.2}/s -- {} items processed in {:?}",
472 count as f64 / elapsed.as_secs_f64(),
473 count,
474 elapsed);
475 }
476
477 println!();
478
479 let profiling_name = "metricfull_non_futures_non_fallible_uni: ";
480 let uni = TestUni::<u32, 8192, 1, {Instruments::MetricsWithoutLogs.into()}>::new(profiling_name)
481 .spawn_non_futures_non_fallibles_executors(1, |stream| stream, |_| async {});
482 profile_uni(uni, profiling_name, 1024*FACTOR).await;
483
484 let profiling_name = "metricless_non_futures_non_fallible_uni: ";
485 let uni = TestUni::<u32, 8192, 1, {Instruments::NoInstruments.into()}>::new(profiling_name)
486 .spawn_non_futures_non_fallibles_executors(1, |stream| stream, |_| async {});
487 profile_uni(uni, profiling_name, 1024*FACTOR).await;
488
489 let profiling_name = "par_metricless_non_futures_non_fallible_uni:";
490 let uni = TestUni::<u32, 8192, 1, {Instruments::NoInstruments.into()}>::new(profiling_name)
491 .spawn_non_futures_non_fallibles_executors(12, |stream| stream, |_| async {});
492 profile_uni(uni, profiling_name, 1024*FACTOR).await;
493
494 let profiling_name = "metricfull_futures_fallible_uni: ";
495 let uni = TestUni::<u32, 8192, 1, {Instruments::MetricsWithoutLogs.into()}>::new(profiling_name)
496 .spawn_executors(1,
497 Duration::ZERO,
498 |stream| {
499 stream.map(|number| async move {
500 Ok(number)
501 })
502 },
503 |_err| async {},
504 |_| async {});
505 profile_uni(uni, profiling_name, 1024*FACTOR).await;
506
507 let profiling_name = "metricless_futures_fallible_uni: ";
508 let uni = TestUni::<u32, 8192, 1, {Instruments::NoInstruments.into()}>::new(profiling_name)
509 .spawn_executors(1,
510 Duration::ZERO,
511 |stream| {
512 stream.map(|number| async move {
513 Ok(number)
514 })
515 },
516 |_err| async {},
517 |_| async {});
518 profile_uni(uni, profiling_name, 1024*FACTOR).await;
519
520 let profiling_name = "timeoutable_metricfull_futures_fallible_uni:";
521 let uni = TestUni::<u32, 8192, 1, {Instruments::MetricsWithoutLogs.into()}>::new(profiling_name)
522 .spawn_executors(1,
523 Duration::from_millis(100),
524 |stream| {
525 stream.map(|number| async move {
526 Ok(number)
527 })
528 },
529 |_err| async {},
530 |_| async {});
531 profile_uni(uni, profiling_name, 768*FACTOR).await;
532
533 let profiling_name = "timeoutable_metricless_futures_fallible_uni:";
534 let uni = TestUni::<u32, 8192, 1, {Instruments::NoInstruments.into()}>::new(profiling_name)
535 .spawn_executors(1,
536 Duration::from_millis(100),
537 |stream| {
538 stream.map(|number| async move {
539 Ok(number)
540 })
541 },
542 |_err| async {},
543 |_| async {});
544 profile_uni(uni, profiling_name, 768*FACTOR).await;
545
546 }
563
564}