1mod multi;
30pub use multi::*;
31
32pub mod channels;
33
34
35#[cfg(any(test,doc))]
37mod tests {
38 use super::*;
39 use super::super::{
40 instruments::Instruments,
41 mutiny_stream::MutinyStream,
42 types::FullDuplexMultiChannel,
43 };
44 use std::{
45 future::Future,
46 sync::{
47 Arc,
48 atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering::Relaxed},
49 },
50 time::{
51 Duration,
52 SystemTime,
53 },
54 io::Write,
55 };
56 use futures::stream::{self, Stream, StreamExt};
57 use minstant::Instant;
58 use tokio::sync::Mutex;
59
60
61 type MultiChannelType<ItemType, const BUFFER_SIZE: usize, const MAX_STREAMS: usize> = channels::arc::crossbeam::Crossbeam<'static, ItemType, BUFFER_SIZE, MAX_STREAMS>;
62
63
64 #[ctor::ctor]
65 fn suite_setup() {
66 simple_logger::SimpleLogger::new().with_utc_timestamps().init().unwrap_or_else(|_| eprintln!("--> LOGGER WAS ALREADY STARTED"));
67 }
68
69 #[cfg_attr(not(doc),tokio::test)]
71 async fn doc_tests() -> Result<(), Box<dyn std::error::Error>> {
72 fn local_on_event(stream: impl Stream<Item=Arc<String>>) -> impl Stream<Item=Arc<String>> {
73 stream.inspect(|message| println!("To Zeta: '{}'", message))
74 }
75 fn zeta_on_event(stream: impl Stream<Item=Arc<String>>) -> impl Stream<Item=Arc<String>> {
76 stream.inspect(|message| println!("ZETA: Received a message: '{}'", message))
77 }
78 fn earth_on_event(stream: impl Stream<Item=Arc<String>>) -> impl Stream<Item=Arc<String>> {
79 stream.inspect(|sneak_peeked_message| println!("EARTH: Sneak peeked a message to Zeta Reticuli: '{}'", sneak_peeked_message))
80 }
81 let multi: Multi<String, MultiChannelType<String, 1024, 4>, {Instruments::LogsWithMetrics.into()}, Arc<String>> = Multi::new("doc_test() event");
82 multi.spawn_non_futures_non_fallible_executor(1, "local screen", local_on_event, |_| async {}).await?;
83 multi.spawn_non_futures_non_fallible_executor(1, "zeta receiver", zeta_on_event, |_| async {}).await?;
84 multi.spawn_non_futures_non_fallible_executor(1, "earth snapper", earth_on_event, |_| async {}).await?;
85 let producer = |item: &str| multi.send(item.to_string());
86 producer("I've just arrived!");
87 producer("Nothing really interesting here... heading back home!");
88 multi.close(Duration::ZERO).await;
89 Ok(())
90 }
91
92 #[cfg_attr(not(doc),tokio::test)]
94 async fn simple_pipelines() -> Result<(), Box<dyn std::error::Error>> {
95 const EXPECTED_SUM: u32 = 17;
96 const PARTS: &[u32] = &[9, 8];
97
98 let observed_sum_1 = Arc::new(AtomicU32::new(0));
100 let observed_sum_2 = Arc::new(AtomicU32::new(0));
101
102 let multi: Multi<u32, MultiChannelType<u32, 1024, 2>, {Instruments::LogsWithMetrics.into()}, Arc<u32>> = Multi::new("Simple Event");
103 multi.spawn_non_futures_non_fallible_executor(1, "Pipeline #1",
105 |stream| {
106 let observed_sum = Arc::clone(&observed_sum_1);
107 stream.map(move |number: Arc<u32>| observed_sum.fetch_add(*number, Relaxed))
108 },
109 |_| async {}).await?;
110 multi.spawn_non_futures_non_fallible_executor(1, "Pipeline #2",
112 |stream| {
113 let observed_sum = Arc::clone(&observed_sum_2);
114 stream.map(move |number| observed_sum.fetch_add(*number, Relaxed))
115 },
116 |_| async {}).await?;
117 let producer = |item| multi.send_with(move |slot| *slot = item).expect_ok("couldn't send");
118
119 let shared_producer = &producer;
121 stream::iter(PARTS)
122 .for_each_concurrent(1, |number| async move {
123 shared_producer(*number).expect_ok("Couldn't produce");
124 }).await;
125
126 multi.close(Duration::ZERO).await;
127 assert_eq!(observed_sum_1.load(Relaxed), EXPECTED_SUM, "not all events passed through our pipeline #1");
128 assert_eq!(observed_sum_2.load(Relaxed), EXPECTED_SUM, "not all events passed through our pipeline #2");
129 Ok(())
130 }
131
132 #[cfg_attr(not(doc),tokio::test)]
135 async fn delete_pipelines() {
136 const PIPELINE_1: &str = "Pipeline #1";
137 const PIPELINE_2: &str = "Pipeline #2";
138 const TIMEOUT: Duration = Duration::ZERO;
139
140 let multi: Multi<u32, MultiChannelType<u32, 1024, 2>, {Instruments::LogsWithMetrics.into()}, Arc<u32>> = Multi::new("Event with come and go pipelines");
141
142 multi.spawn_non_futures_non_fallible_executor(1, PIPELINE_1, |s| s, |_| async {}).await
144 .expect("Single instance of PIPELINE_1 should have been created");
145 multi.spawn_non_futures_non_fallible_executor(1, PIPELINE_2, |s| s, |_| async {}).await
146 .expect("Single instance of PIPELINE_2 should have been created");
147 assert!(multi.flush_and_cancel_executor(PIPELINE_1, TIMEOUT).await, "'{}' was spawned, therefore it should have been cancelled", PIPELINE_1);
148 assert!(multi.flush_and_cancel_executor(PIPELINE_2, TIMEOUT).await, "'{}' was spawned, therefore it should have been cancelled", PIPELINE_2);
149
150 multi.spawn_non_futures_non_fallible_executor(1, PIPELINE_1, |s| s, |_| async {}).await
152 .expect("Single instance of PIPELINE_1 should have been created");
153 let result = multi.spawn_non_futures_non_fallible_executor(1, PIPELINE_1, |s| s, |_| async {}).await;
154 assert!(result.is_err(), "Second attempt to insert PIPELINE_1 should have failed");
155
156 assert!(multi.flush_and_cancel_executor(PIPELINE_1, TIMEOUT).await, "'{}' was spawned once, therefore it should have been cancelled", PIPELINE_1);
158 assert!(!multi.flush_and_cancel_executor(PIPELINE_1, TIMEOUT).await, "A pipeline cannot be cancelled twice");
159
160 assert!(!multi.flush_and_cancel_executor(PIPELINE_2, TIMEOUT).await, "An unexisting pipeline cannot be reported as having been cancelled");
162
163 for _ in 0..128 {
166 multi.spawn_non_futures_non_fallible_executor(1, PIPELINE_1, |s| s, |_| async {}).await
167 .expect("Single instance of PIPELINE_1 should have been created");
168 multi.spawn_non_futures_non_fallible_executor(1, PIPELINE_2, |s| s, |_| async {}).await
169 .expect("Single instance of PIPELINE_2 should have been created");
170 assert!(multi.flush_and_cancel_executor(PIPELINE_1, TIMEOUT).await, "'{}' was spawned, therefore it should have been cancelled", PIPELINE_1);
171 assert!(multi.flush_and_cancel_executor(PIPELINE_2, TIMEOUT).await, "'{}' was spawned, therefore it should have been cancelled", PIPELINE_2);
172 }
173
174 let last_message = Arc::new(Mutex::new(0u32));
176 let last_message_ref = Arc::clone(&last_message);
177 multi.spawn_non_futures_non_fallible_executor(1, PIPELINE_2,
178 |s| s.inspect(move |n| *last_message_ref.try_lock().unwrap() = *(n as &u32)),
179 |_| async {} ).await
180 .expect("Single instance of PIPELINE_2 should have been created");
181
182 multi.send_with(|slot| *slot = 97).expect_ok("couldn't send");
183 multi.close(Duration::ZERO).await;
184
185 assert_eq!(*last_message.try_lock().unwrap(), 97, "event didn't complete");
186
187 }
192
193 #[cfg_attr(not(doc),tokio::test)]
196 async fn async_elements() -> Result<(), Box<dyn std::error::Error>> {
197 const EXPECTED_SUM: u32 = 30;
198 const PARTS: &[u32] = &[9, 8, 7, 6];
199
200 let observed_sum_1 = Arc::new(AtomicU32::new(0));
202 let observed_sum_2 = Arc::new(AtomicU32::new(0));
203
204 let pipeline1_builder = |stream: MutinyStream<'static, u32, _, Arc<u32>>| {
210 let observed_sum = Arc::clone(&observed_sum_1);
211 stream
212 .map(|number| async move {
213 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
214 number
215 })
216 .map(move |number| {
217 let observed_sum = Arc::clone(&observed_sum);
218 async move {
219 let number = number.await;
220 observed_sum.fetch_add(*number, Relaxed);
221 number
222 }
223 })
224 .map(|number| async move {
225 let number = number.await;
226 println!("PIPELINE 1: Just added # {}", number);
227 Ok(number)
228 })
229 };
230 let pipeline2_builder = |stream: MutinyStream<'static, u32, _, Arc<u32>>| {
231 let observed_sum = Arc::clone(&observed_sum_2);
232 stream
233 .map(|number| async move {
234 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
235 number
236 })
237 .map(move |number| {
238 let observed_sum = Arc::clone(&observed_sum);
239 async move {
240 let number = number.await;
241 observed_sum.fetch_add(*number, Relaxed);
242 number
243 }
244 })
245 .map(|number| async move {
246 let number = number.await;
247 println!("PIPELINE 2: Just added # {}", number);
248 Ok(number)
249 })
250 };
251
252 let multi: Multi<u32, MultiChannelType<u32, 1024, 2>, {Instruments::LogsWithMetrics.into()}, Arc<u32>> = Multi::new("async event");
253 multi.spawn_executor(PARTS.len() as u32, Duration::from_secs(2), "Stream Pipeline #1", pipeline1_builder, |_| async {}, |_| async {}).await?;
254 multi.spawn_executor(PARTS.len() as u32, Duration::from_secs(2), "Stream Pipeline #2", pipeline2_builder, |_| async {}, |_| async {}).await?;
255
256 let producer = |item| multi.send_with(move |slot| *slot = item).expect_ok("couldn't send");
257
258 let shared_producer = &producer;
259 stream::iter(PARTS)
260 .for_each_concurrent(1, |number| async move {
261 shared_producer(*number).expect_ok("Couldn't produce");
262 }).await;
263
264 multi.close(Duration::ZERO).await;
265 assert_eq!(observed_sum_1.load(Relaxed), EXPECTED_SUM, "not all events passed through our async pipeline #1");
266 assert_eq!(observed_sum_2.load(Relaxed), EXPECTED_SUM, "not all events passed through our async pipeline #2");
267 Ok(())
268 }
269
270 #[cfg_attr(not(doc),tokio::test(flavor="multi_thread", worker_threads=2))]
273 #[ignore] async fn stats() -> Result<(), Box<dyn std::error::Error>> {
275 #[cfg(not(debug_assertions))]
276 const N_PIPELINES: usize = 256;
277 #[cfg(debug_assertions)]
278 const N_PIPELINES: usize = 128;
279
280 let event_name = "non_future/non_fallible event";
283 let multi: Multi<String, MultiChannelType<String, 256, N_PIPELINES>, {Instruments::LogsWithMetrics.into()}, Arc<String>> = Multi::new(event_name);
284 for i in 0..N_PIPELINES {
285 multi.spawn_non_futures_non_fallible_executor(1, format!("Pipeline #{} for {}", i, event_name), |stream| stream, |_| async {}).await?;
286 }
287 let producer = |item: &str| multi.send(item.to_string()).expect_ok("couldn't send");
288 producer("'only count successes' payload");
289 multi.close(Duration::from_secs(5)).await;
290 assert_eq!(N_PIPELINES, multi.executor_infos.read().await.len(), "Number of created pipelines doesn't match");
291 for (i, executor_info) in multi.executor_infos.read().await.values().enumerate() {
292 let (ok_counter, ok_avg_futures_resolution_duration) = executor_info.executor_stats.ok_events_avg_future_duration().lightweight_probe();
293 assert_eq!(ok_counter, 1, "counter of successful '{event_name}' events is wrong for pipeline #{i}");
294 assert_eq!(ok_avg_futures_resolution_duration, -1.0, "avg futures resolution time of successful '{event_name}' events is wrong for pipeline #{i} -- since it is a non-future, avg times should be always -1.0");
295 let (failures_counter, failures_avg_futures_resolution_duration) = executor_info.executor_stats.failed_events_avg_future_duration().lightweight_probe();
296 assert_eq!(failures_counter, 0, "counter of unsuccessful '{event_name}' events is wrong for pipeline #{i} -- since it is a non-fallible event, failures should always be 0");
297 assert_eq!(failures_avg_futures_resolution_duration, 0.0, "avg futures resolution time of unsuccessful '{event_name}' events is wrong for pipeline #{i} -- since it is a non-fallible event,, avg times should be always 0.0");
298 let (timeouts_counter, timeouts_avg_futures_resolution_duration) = executor_info.executor_stats.timed_out_events_avg_future_duration().lightweight_probe();
299 assert_eq!(timeouts_counter, 0, "counter of timed out '{event_name}' events is wrong for pipeline #{i} -- since it is a non-future event, timeouts should always be 0");
300 assert_eq!(timeouts_avg_futures_resolution_duration, 0.0, "avg futures resolution time of timed out '{event_name}' events is wrong for pipeline #{i} -- since it is a non-future event,, avg timeouts should be always 0.0");
301 }
302
303 let event_name = "future & fallible event";
306 let multi: Multi<String, MultiChannelType<String, 256, N_PIPELINES>, {Instruments::LogsWithMetrics.into()}, Arc<String>> = Multi::new(event_name);
307 for i in 0..N_PIPELINES {
308 multi.spawn_executor(1, Duration::from_millis(150), format!("Pipeline #{} for {}", i, event_name),
309 |stream| {
310 stream.map(|payload| async move {
311 if payload.contains("unsuccessful") {
312 tokio::time::sleep(Duration::from_millis(50)).await;
313 Err(Box::from(String::from("failing the pipeline, as requested")))
314 } else if payload.contains("timeout") {
315 tokio::time::sleep(Duration::from_millis(200)).await;
316 Ok(Arc::new(String::from("this answer will never make it -- stream executor times out after 100ms")))
317 } else {
318 tokio::time::sleep(Duration::from_millis(100)).await;
319 Ok(payload)
320 }
321 })
322 },
323 |_| async {},
324 |_| async {}
325 ).await?;
326 }
327 let producer = |item: &str| multi.send(item.to_string()).expect_ok("couldn't send");
328 for _ in 0..2 {
330 producer("'successful' payload");
331 producer("'unsuccessful' payload");
332 producer("'timeout' payload");
333 }
334 multi.close(Duration::from_secs(5)).await;
335 assert_eq!(N_PIPELINES, multi.executor_infos.read().await.len(), "Number of created pipelines doesn't match");
336 for (i, executor_info) in multi.executor_infos.read().await.values().enumerate() {
337 let (ok_counter, ok_avg_futures_resolution_duration) = executor_info.executor_stats.ok_events_avg_future_duration().lightweight_probe();
338 assert_eq!(ok_counter, 2, "counter of successful '{event_name}' events is wrong for pipeline #{i}");
339 assert!((ok_avg_futures_resolution_duration-0.100).abs() < 15e-2, "avg futures resolution time of successful '{event_name}' events is wrong for pipeline #{i} -- it should be 0.1s, but was {ok_avg_futures_resolution_duration}s");
340 let (failures_counter, failures_avg_futures_resolution_duration) = executor_info.executor_stats.failed_events_avg_future_duration().lightweight_probe();
341 assert_eq!(failures_counter, 2, "counter of unsuccessful '{event_name}' events is wrong for pipeline #{i}");
342 assert!((failures_avg_futures_resolution_duration-0.050).abs() < 15e-2, "avg futures resolution time of unsuccessful '{event_name}' events is wrong for pipeline #{i} -- it should be 0.05s, but was {failures_avg_futures_resolution_duration}s");
343 let (timeouts_counter, timeouts_avg_futures_resolution_duration) = executor_info.executor_stats.timed_out_events_avg_future_duration().lightweight_probe();
344 assert_eq!(timeouts_counter, 2, "counter of timed out '{event_name}' events is wrong for pipeline #{i}");
345 assert!((timeouts_avg_futures_resolution_duration-0.150).abs() < 15e-2, "avg futures resolution time of timed out '{event_name}' events is wrong for pipeline #{i} -- it should be 0.150s, but was {timeouts_avg_futures_resolution_duration}s");
346 }
347 Ok(())
348 }
349
350
351 #[cfg_attr(not(doc),tokio::test)]
355 async fn demux() -> Result<(), Box<dyn std::error::Error>> {
356 let shared_state = Arc::new(AtomicU32::new(0));
357 let two_fire_count = Arc::new(AtomicU32::new(0));
358 let four_fire_count = Arc::new(AtomicU32::new(0));
359 let six_fire_count = Arc::new(AtomicU32::new(0));
360
361 let on_six_event = |stream: MutinyStream<'static, bool, _, Arc<bool>>| {
363 let six_fire_count = Arc::clone(&six_fire_count);
364 stream.inspect(move |_| {
365 six_fire_count.fetch_add(1, Relaxed);
366 })
367 };
368 let six_multi = Multi::<bool, MultiChannelType<bool, 1024, 2>, {Instruments::LogsWithMetrics.into()}, Arc<bool>>::new("SIX");
369 six_multi.spawn_non_futures_non_fallible_executor(1, "Pipeline #1", on_six_event, |_| async {}).await?;
370 six_multi.spawn_non_futures_non_fallible_executor(1, "Pipeline #2", on_six_event, |_| async {}).await?;
371 let six_multi = Arc::new(six_multi);
372 let can_six_be_closed = Arc::new(AtomicBool::new(true));
374 let six_multi_ref = Arc::clone(&six_multi);
375 let six_closer = Arc::new(move || {
376 let can_six_be_closed = Arc::clone(&can_six_be_closed);
377 let six_multi = Arc::clone(&six_multi_ref);
378 async move {
379 if can_six_be_closed.swap(false, Relaxed) {
380 six_multi.close(Duration::ZERO).await;
381 }
382 }
383 });
384
385 let on_two_event = |stream: MutinyStream<'static, u32, _, Arc<u32>>| {
387 let two_fire_count = Arc::clone(&two_fire_count);
388 let shared_state = Arc::clone(&shared_state);
389 let six_multi = Arc::clone(&six_multi);
390 stream
391 .map(move |payload| {
392 let two_fire_count = Arc::clone(&two_fire_count);
393 let shared_state = Arc::clone(&shared_state);
394 let six_multi = Arc::clone(&six_multi);
395 async move {
396 two_fire_count.fetch_add(1, Relaxed);
397 if *payload & 2 == 2 {
398 let previous_state = shared_state.fetch_or(2, Relaxed);
399 if previous_state & 6 == 6 {
400 shared_state.store(0, Relaxed); six_multi.send_with(|slot| *slot = true).expect_ok("couldn't send");
402 }
403 } else if *payload == 97 {
404 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
405 }
406 payload
407 }
408 })
409 .buffer_unordered(1)
410 };
411 let six_closer_for_two = Arc::clone(&six_closer);
412 let on_two_close_builder = || {
413 let six_closer_for_two = Arc::clone(&six_closer_for_two);
414 move |_| {
415 let six_closer_for_two = Arc::clone(&six_closer_for_two);
416 async move {
417 six_closer_for_two().await;
418 }
419 }
420 };
421 let two_multi: Multi<u32, MultiChannelType<u32, 1024, 2>, {Instruments::LogsWithMetrics.into()}, Arc<u32>> = Multi::new("TWO");
422 two_multi.spawn_non_futures_non_fallible_executor(1, "Pipeline #1", on_two_event, on_two_close_builder()).await?;
423 two_multi.spawn_non_futures_non_fallible_executor(1, "Pipeline #2", on_two_event, on_two_close_builder()).await?;
424 let two_multi = Arc::new(two_multi);
425 let two_producer = |item| two_multi.send_with(move |slot| *slot = item).expect_ok("couldn't send `two`");
426
427 let on_four_event = |stream: MutinyStream<'static, u32, _, Arc<u32>>| {
429 let four_fire_count = Arc::clone(&four_fire_count);
430 let shared_state = Arc::clone(&shared_state);
431 let six_multi = Arc::clone(&six_multi);
432 stream
433 .map(move |payload| {
434 let four_fire_count = Arc::clone(&four_fire_count);
435 let shared_state = Arc::clone(&shared_state);
436 let six_multi = Arc::clone(&six_multi);
437 async move {
438 four_fire_count.fetch_add(1, Relaxed);
439 if *payload & 4 == 4 {
440 let previous_state = shared_state.fetch_or(4, Relaxed);
441 if previous_state & 6 == 6 {
442 shared_state.store(0, Relaxed); six_multi.send_with(|slot| *slot = true).expect_ok("couldn't send `six`");
444 }
445 } else if *payload == 97 {
446 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
447 }
448 payload
449 }
450 })
451 .buffer_unordered(1)
452 };
453 let six_closer_for_four = Arc::clone(&six_closer);
454 let on_four_close_builder = || {
455 let six_closer_for_four = Arc::clone(&six_closer_for_four);
456 move |_| {
457 let six_closer_for_four = Arc::clone(&six_closer_for_four);
458 async move {
459 six_closer_for_four().await;
460 }
461 }
462 };
463 let four_multi: Multi<u32, MultiChannelType<u32, 1024, 2>, {Instruments::LogsWithMetrics.into()}, Arc<u32>> = Multi::new("FOUR");
464 four_multi.spawn_non_futures_non_fallible_executor(1, "Pipeline #1", on_four_event, on_four_close_builder()).await?;
465 four_multi.spawn_non_futures_non_fallible_executor(1, "Pipeline #2", on_four_event, on_four_close_builder()).await?;
466 let four_multi = Arc::new(four_multi);
467 let four_producer = |item| four_multi.send_with(move |slot| *slot = item).expect_ok("couldn't send `four`");
468
469 two_producer(1);
472 two_producer(2);
473 four_producer(97); four_producer(1);
475 four_producer(2);
476 four_producer(3);
477 four_producer(4);
478 two_producer(3);
479 two_producer(4);
480 four_producer(5);
481 multis_close_async!(Duration::ZERO, two_multi, four_multi); assert_eq!(two_fire_count.load(Relaxed), 4 * 2, "Wrong number of events processed for TWO");
486 assert_eq!(four_fire_count.load(Relaxed), 6 * 2, "Wrong number of events processed for FOUR");
487 assert_eq!(six_fire_count.load(Relaxed), 1 * 2, "Wrong number of events processed for SIX");
488 Ok(())
489 }
490
491 #[cfg_attr(not(doc),tokio::test)]
495 async fn error_handling() -> Result<(), Box<dyn std::error::Error>> {
496
497 let on_err_count = Arc::new(AtomicU32::new(0));
498
499 fn on_fail_when_odd_event(stream: impl Stream<Item=Arc<u32>>) -> impl Stream<Item = impl Future<Output = Result<u32, Box<dyn std::error::Error + Send + Sync>> > + Send> {
500 stream
501 .map(|payload| async move {
502 if *payload % 2 == 0 {
503 Ok(*payload)
504 } else if *payload % 79 == 0 {
505 Err(format!("BLOW CODE received: {}", payload))
506 } else {
507 Err(format!("ODD payload received: {}", payload))
508 }
509 })
510 .filter_map(|payload| async {
512 let payload = payload.await;
513 match &payload {
514 Ok(ok_payload ) => {
515 println!("Payload {} ACCURATELY PROCESSED!", ok_payload);
516 Some(payload)
517 },
518 Err(ref err) => {
519 if err.contains("ODD") {
520 println!("Payload {} ERROR LOG -- this error is tolerable and this event will be skipped for the rest of the pipeline", err);
521 None
522 } else {
523 Some(payload)
525 }
526 }
527 }
529 })
530 .map(|payload| async {
531 let payload = payload?;
532 println!("Payload {} continued down the pipe ", payload);
534 Ok(payload)
535 })
536 }
537 let on_err_count_clone_1 = Arc::clone(&on_err_count);
538 let on_err_count_clone_2 = Arc::clone(&on_err_count);
539 let multi: Multi<u32, MultiChannelType<u32, 1024, 2>, {Instruments::LogsWithMetrics.into()}, Arc<u32>> = Multi::new("Event with error handling");
540 multi.spawn_executor(1,
541 Duration::from_millis(100),
542 "Pipeline #1",
543 on_fail_when_odd_event,
544 move |err| {
545 let on_err_count_clone = Arc::clone(&on_err_count_clone_1);
546 async move {
547 on_err_count_clone.fetch_add(1, Relaxed);
548 println!("Pipeline #1: ERROR CALLBACK WAS CALLED: '{:?}'", err);
549 }
550 },
551 |_| async {}
552 ).await?;
553 multi.spawn_executor(1,
554 Duration::from_millis(100),
555 "Pipeline #2",
556 on_fail_when_odd_event,
557 move |err| {
558 let on_err_count_clone = Arc::clone(&on_err_count_clone_2);
559 async move {
560 on_err_count_clone.fetch_add(1, Relaxed);
561 println!("Pipeline #2: ERROR CALLBACK WAS CALLED: '{:?}'", err);
562 }
563 },
564 |_| async {}
565 ).await?;
566 let producer = |item| multi.send_with(move |slot| *slot = item).expect_ok("couldn't send");
567 producer(0);
568 producer(1);
569 producer(2);
570 producer(79);
571 producer(80);
572 multi.close(Duration::ZERO).await;
573
574 assert_eq!(on_err_count.load(Relaxed), 1 * 2, "'on_err()' callback contract broken: events with handled errors should not call on_err(), the ones not 'caught', should -- twice, since we have 2 pipelines");
575 Ok(())
576 }
577
578 #[cfg_attr(not(doc),tokio::test)]
584 async fn undegradable_latencies() -> Result<(), Box<dyn std::error::Error>> {
585 const BLOATED_PIPELINES_COUNT: usize = 256;
586
587 let simple_count = Arc::new(AtomicU32::new(0));
588 let simple_last_elapsed_nanos = Arc::new(AtomicU64::new(0));
589 let bloated_count = Arc::new(AtomicU32::new(0));
590 let bloated_last_elapsed_nanos = Arc::new(AtomicU64::new(0));
591
592 let simple_multi: Multi<SystemTime, MultiChannelType<SystemTime, 1024, 1>, {Instruments::LogsWithMetrics.into()}, Arc<SystemTime>> = Multi::new("SIMPLE");
593 simple_multi.spawn_non_futures_non_fallible_executor(1, "solo pipeline",
594 |stream| {
595 let simple_count = Arc::clone(&simple_count);
596 let simple_last_elapsed_nanos = Arc::clone(&simple_last_elapsed_nanos);
597 stream.map(move |start: Arc<SystemTime>| {
598 simple_last_elapsed_nanos.store(start.elapsed().unwrap().as_nanos() as u64, Relaxed);
599 simple_count.fetch_add(1, Relaxed)
600 })
601 },
602 |_| async {}).await?;
603 let simple_producer = |item| simple_multi.send_with(move |slot| *slot = item).expect_ok("couldn't send");
604
605 tokio::time::sleep(Duration::from_millis(10)).await;
607 simple_producer(SystemTime::now());
608 while simple_count.load(Relaxed) != 1 {
610 tokio::task::yield_now().await;
611 }
612 let first_simple_duration = Duration::from_nanos(simple_last_elapsed_nanos.load(Relaxed));
613 println!("1. Time to produce & consume a SIMPLE event (no other Multi Tokio tasks): {:?}", first_simple_duration);
614
615 let bloated_multi: Multi<SystemTime, MultiChannelType<SystemTime, 16, BLOATED_PIPELINES_COUNT>, {Instruments::LogsWithMetrics.into()}, Arc<SystemTime>> = Multi::new("BLOATED");
616 for i in 0..BLOATED_PIPELINES_COUNT {
617 bloated_multi.spawn_non_futures_non_fallible_executor(1, format!("#{i})"),
618 |stream| {
619 let bloated_count = Arc::clone(&bloated_count);
620 let bloated_last_elapsed_nanos = Arc::clone(&bloated_last_elapsed_nanos);
621 stream.map(move |start| {
622 bloated_last_elapsed_nanos.store(start.elapsed().unwrap().as_nanos() as u64, Relaxed);
623 bloated_count.fetch_add(1, Relaxed)
624 })
625 },
626 |_| async {}).await?;
627 }
628 let bloated_producer = |item| bloated_multi.send_with(move |slot| *slot = item).expect_ok("couldn't send");
629
630 tokio::time::sleep(Duration::from_millis(10)).await;
632 bloated_producer(SystemTime::now());
633 while bloated_count.load(Relaxed) != BLOATED_PIPELINES_COUNT as u32 {
635 tokio::task::yield_now().await;
636 }
637 let bloated_duration = Duration::from_nanos(bloated_last_elapsed_nanos.load(Relaxed));
638 println!("2. Tokio Runtime is now BLOATED with {BLOATED_PIPELINES_COUNT} tasks -- all of them are multi executors. Time to produce + time for all pipelines to consume it: {:?}", bloated_duration);
639
640 tokio::time::sleep(Duration::from_millis(10)).await; simple_producer(SystemTime::now());
643 while simple_count.load(Relaxed) != 2 {
645 tokio::task::yield_now().await;
646 }
647 let second_simple_duration = Duration::from_nanos(simple_last_elapsed_nanos.load(Relaxed));
648 println!("3. Time to produce & consume another SIMPLE event (with lots of -- {BLOATED_PIPELINES_COUNT} -- sleeping Multi Tokio tasks): {:?}", second_simple_duration);
649
650 const TOLERANCE_MICROS: u128 = 10;
651 assert!(second_simple_duration < first_simple_duration || second_simple_duration.as_micros() - first_simple_duration.as_micros() < TOLERANCE_MICROS,
652 "Tokio tasks' latency must be unaffected by whatever number of sleeping tasks there are (tasks executing our multi stream pipelines) -- task execution latencies exceeded the {TOLERANCE_MICROS}µs tolerance: with 0 sleeping: {:?}; with {BLOATED_PIPELINES_COUNT} sleeping: {:?}",
653 first_simple_duration,
654 second_simple_duration);
655
656 simple_multi.close(Duration::ZERO).await;
657 bloated_multi.close(Duration::ZERO).await;
658 Ok(())
659 }
660
661 #[cfg_attr(not(doc),tokio::test)]
663 async fn chained_multis() -> Result<(), Box<dyn std::error::Error>> {
664 let expected_msgs = vec![
665 "Hello, beautiful world!",
666 "Oh me, oh my, this will never do... Goodbye, cruel world!"
667 ];
668 let first_multi_msgs = Arc::new(std::sync::Mutex::new(vec![]));
669 let first_multi_msgs_ref = Arc::clone(&first_multi_msgs);
670 let second_multi_msgs = Arc::new(std::sync::Mutex::new(vec![]));
671 let second_multi_msgs_ref = Arc::clone(&second_multi_msgs);
672
673 let second_multi: Multi<String, MultiChannelType<String, 1024, 4>, {Instruments::LogsWithMetrics.into()}, Arc<String>> = Multi::new("second chained multi, receiving the Arc-wrapped event -- with no copying (and no additional Arc cloning)");
674 second_multi.spawn_non_futures_non_fallible_executor(1, "second executor", move |stream| {
675 stream.map(move |message| {
676 println!("`second_multi` received '{:?}'", message);
677 second_multi_msgs_ref
678 .lock().unwrap()
679 .push(message);
680 })
681 }, |_| async {}).await?;
682 let second_multi = Arc::new(second_multi);
683 let second_multi_ref = Arc::clone(&second_multi);
684 let first_multi: Multi<String, MultiChannelType<String, 1024, 4>, {Instruments::LogsWithMetrics.into()}, Arc<String>> = Multi::new("first chained multi, receiving the original events");
685 first_multi.spawn_non_futures_non_fallible_executor(1, "first executor", move |stream| {
686 stream.map(move |message: Arc<String>| {
687 println!("`first_multi` received '{:?}'", message);
688 assert!(second_multi_ref.send_derived(&message), "couldn't send derived");
689 first_multi_msgs_ref
690 .lock().unwrap()
691 .push(message);
692 })
693 }, |_| async {}).await?;
694 let producer = |item: &str| first_multi.send(item.to_string()).expect_ok("couldn't send");
695 expected_msgs.iter().for_each(|&msg| { producer(msg); });
696 multis_close_async!(Duration::ZERO, first_multi, second_multi);
697 let expected_msgs: Vec<Arc<String>> = expected_msgs.into_iter()
698 .map(|msg| Arc::new(msg.to_string()))
699 .collect();
700 assert_eq!(*first_multi_msgs.lock().unwrap(), expected_msgs, "First multi didn't receive the expected messages");
701 assert_eq!(*second_multi_msgs.lock().unwrap(), expected_msgs, "Second multi didn't receive the expected messages");
702 Ok(())
703
704 }
705
706 #[cfg_attr(not(doc),tokio::test(flavor="multi_thread", worker_threads=2))]
708 #[ignore] async fn performance_measurements() -> Result<(), Box<dyn std::error::Error>> {
710
711 #[cfg(not(debug_assertions))]
712 const FACTOR: u32 = 4096;
713 #[cfg(debug_assertions)]
714 const FACTOR: u32 = 32;
715
716 async fn profile_multi<MultiChannelType: FullDuplexMultiChannel<ItemType=u32, DerivedItemType=Arc<u32>> + Sync + Send + 'static,
718 const INSTRUMENTS: usize>
719 (multi: &Multi<u32, MultiChannelType, INSTRUMENTS, Arc<u32>>,
720 profiling_name: &str,
721 count: u32) {
722 print!("{profiling_name} "); std::io::stdout().flush().unwrap();
723 let start = Instant::now();
724 let mut e = 0;
725 while e < count {
726 let buffer_entries_left = multi.buffer_size() - multi.pending_items_count();
727 for _ in 0..buffer_entries_left {
728 multi.send_with(|slot| *slot = e)
729 .retry_with(|setter| multi.send_with(setter))
730 .spinning_forever();
731 e += 1;
732 }
733 std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop();
734 std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop();
735 std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop();
736 std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop();
737 }
738 multi.close(Duration::from_secs(5)).await;
739 let elapsed = start.elapsed();
740 println!("{:10.2}/s -- {} items processed in {:?}",
741 count as f64 / elapsed.as_secs_f64(),
742 count,
743 elapsed);
744 }
745
746 println!();
747
748 type MultiChannelType = channels::arc::crossbeam::Crossbeam<'static, u32, 8192, 1>;
749 type DerivedType = Arc<u32>;
750
751 let profiling_name = "metricfull_non_futures_non_fallible_multi: ";
752 let multi: Multi<u32, MultiChannelType, {Instruments::MetricsWithoutLogs.into()}, DerivedType> = Multi::new(profiling_name.trim());
753 multi.spawn_non_futures_non_fallible_executor(1, "", |stream| stream, |_| async {}).await?;
754 profile_multi(&multi, profiling_name, 1024*FACTOR).await;
755
756 let profiling_name = "metricless_non_futures_non_fallible_multi: ";
757 let multi: Multi<u32, MultiChannelType, {Instruments::NoInstruments.into()}, Arc<u32>> = Multi::new(profiling_name.trim());
758 multi.spawn_non_futures_non_fallible_executor(1, "", |stream| stream, |_| async {}).await?;
759 profile_multi(&multi, profiling_name, 1024*FACTOR).await;
760
761 let profiling_name = "par_metricless_non_futures_non_fallible_multi:";
762 let multi: Multi<u32, MultiChannelType, {Instruments::NoInstruments.into()}, Arc<u32>> = Multi::new(profiling_name.trim());
763 multi.spawn_non_futures_non_fallible_executor(12, "", |stream| stream, |_| async {}).await?;
764 profile_multi(&multi, profiling_name, 1024*FACTOR).await;
765
766 let profiling_name = "metricfull_futures_fallible_multi: ";
767 let multi: Multi<u32, MultiChannelType, {Instruments::MetricsWithoutLogs.into()}, Arc<u32>> = Multi::new(profiling_name.trim());
768 multi.spawn_executor(1, Duration::ZERO, "",
769 |stream| {
770 stream.map(|number| async move {
771 Ok(number)
772 })
773 },
774 |_err| async {},
775 |_| async {}).await?;
776 profile_multi(&multi, profiling_name, 1024*FACTOR).await;
777
778 let profiling_name = "metricless_futures_fallible_multi: ";
779 let multi: Multi<u32, MultiChannelType, {Instruments::NoInstruments.into()}, Arc<u32>> = Multi::new(profiling_name.trim());
780 multi.spawn_executor(1, Duration::ZERO, "",
781 |stream| {
782 stream.map(|number| async move {
783 Ok(number)
784 })
785 },
786 |_err| async {},
787 |_| async {}).await?;
788 profile_multi(&multi, profiling_name, 1024*FACTOR).await;
789
790 let profiling_name = "timeoutable_metricfull_futures_fallible_multi:";
791 let multi: Multi<u32, MultiChannelType, {Instruments::MetricsWithoutLogs.into()}, Arc<u32>> = Multi::new(profiling_name.trim());
792 multi.spawn_executor(1, Duration::from_millis(100), "",
793 |stream| {
794 stream.map(|number| async move {
795 Ok(number)
796 })
797 },
798 |_err| async {},
799 |_| async {}).await?;
800 profile_multi(&multi, profiling_name, 768*FACTOR).await;
801
802 let profiling_name = "timeoutable_metricless_futures_fallible_multi:";
803 let multi: Multi<u32, MultiChannelType, {Instruments::NoInstruments.into()}, Arc<u32>> = Multi::new(profiling_name.trim());
804 multi.spawn_executor(1, Duration::from_millis(100), "",
805 |stream| {
806 stream.map(|number| async move {
807 Ok(number)
808 })
809 },
810 |_err| async {},
811 |_| async {}).await?;
812 profile_multi(&multi, profiling_name, 768*FACTOR).await;
813
814 Ok(())
815 }
832
833}