1use std::{
4 collections::{BTreeMap, HashMap, VecDeque},
5 fmt,
6 future::Future,
7 hash::Hash,
8 marker::PhantomData,
9 panic::{AssertUnwindSafe, catch_unwind},
10 pin::Pin,
11 sync::{
12 Arc, Condvar, Mutex, OnceLock,
13 atomic::{AtomicBool, AtomicUsize, Ordering},
14 },
15 task::{Context, Poll},
16 thread,
17 time::Duration,
18};
19
20use futures::{channel::oneshot, executor::block_on};
21use thiserror::Error;
22use tokio::{
23 runtime::{Builder as TokioRuntimeBuilder, Runtime as TokioRuntime},
24 task::{JoinError, JoinHandle},
25};
26
27pub(crate) type BoxStream<T> = Box<dyn Iterator<Item = StreamResult<T>> + Send>;
28pub(crate) type PureTransform<In, Out> = Arc<dyn Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync>;
29pub(crate) type RuntimeTransform<In, Out> =
30 Arc<dyn Fn(BoxStream<In>, &Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync>;
31type SinkRunner<In, Mat> = dyn Fn(BoxStream<In>, &Materializer) -> StreamResult<Mat> + Send + Sync;
32type HintedSinkRunner<In, Mat> =
33 dyn Fn(BoxStream<In>, &Materializer, SourceRuntimeHints) -> StreamResult<Mat> + Send + Sync;
34type RunnableGraphRunner<Mat> = dyn Fn(&Materializer) -> StreamResult<Mat> + Send + Sync;
35const STREAM_READY_SPINS: usize = 256;
36const STREAM_SPIN_BACKOFF: usize = 8;
41const STREAM_MAX_PARK: Duration = Duration::from_millis(1);
42
43#[derive(Clone, Copy, Debug, PartialEq, Eq)]
47struct InlineMicroSourceHint {
48 max_success_items: usize,
49}
50
51#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
52struct SourceHints {
53 inline_head_terminal: bool,
54 inline_micro: Option<InlineMicroSourceHint>,
57 terminal_consumer_batch: bool,
61}
62
63#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
64pub(crate) struct SourceRuntimeHints {
65 pub(crate) inline_micro_max_success_items: Option<usize>,
66 pub(crate) terminal_consumer_batch: bool,
67}
68
69impl SourceHints {
70 const fn with_inline_micro(max_success_items: usize) -> Self {
71 Self {
72 inline_head_terminal: true,
73 inline_micro: Some(InlineMicroSourceHint { max_success_items }),
74 terminal_consumer_batch: true,
75 }
76 }
77
78 const fn with_terminal_consumer_batch() -> Self {
79 Self {
80 inline_head_terminal: false,
81 inline_micro: None,
82 terminal_consumer_batch: true,
83 }
84 }
85
86 fn after_flow(self, flow: FlowHints) -> Self {
87 if flow.preserves_inline_head_terminal {
88 Self {
91 inline_head_terminal: true,
92 inline_micro: None,
93 terminal_consumer_batch: self.terminal_consumer_batch
94 && flow.preserves_terminal_consumer_batch,
95 }
96 } else {
97 Self {
98 inline_head_terminal: false,
99 inline_micro: None,
100 terminal_consumer_batch: self.terminal_consumer_batch
101 && flow.preserves_terminal_consumer_batch,
102 }
103 }
104 }
105
106 fn without_inline_micro(self) -> Self {
107 Self {
108 inline_head_terminal: self.inline_head_terminal,
109 inline_micro: None,
110 terminal_consumer_batch: self.terminal_consumer_batch,
111 }
112 }
113
114 fn runtime(self) -> SourceRuntimeHints {
115 SourceRuntimeHints {
116 inline_micro_max_success_items: self.inline_micro.map(|hint| hint.max_success_items),
117 terminal_consumer_batch: self.terminal_consumer_batch,
118 }
119 }
120}
121
122#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
123struct FlowHints {
124 preserves_inline_head_terminal: bool,
125 preserves_terminal_consumer_batch: bool,
126}
127
128impl FlowHints {
129 const PRESERVES_INLINE_HEAD_TERMINAL: Self = Self {
130 preserves_inline_head_terminal: true,
131 preserves_terminal_consumer_batch: true,
132 };
133
134 const PRESERVES_TERMINAL_CONSUMER_BATCH: Self = Self {
135 preserves_inline_head_terminal: false,
136 preserves_terminal_consumer_batch: true,
137 };
138
139 fn then(self, next: Self) -> Self {
140 Self {
141 preserves_inline_head_terminal: self.preserves_inline_head_terminal
142 && next.preserves_inline_head_terminal,
143 preserves_terminal_consumer_batch: self.preserves_terminal_consumer_batch
144 && next.preserves_terminal_consumer_batch,
145 }
146 }
147}
148
149struct PartitionSlot<Key, Out> {
150 key: Option<Key>,
151 active: usize,
152 queued: VecDeque<(usize, Out)>,
153 in_ready_queue: bool,
154}
155
156struct AbortOnDropHandle<T> {
157 handle: JoinHandle<T>,
158}
159
160impl<T> AbortOnDropHandle<T> {
161 fn new(handle: JoinHandle<T>) -> Self {
162 Self { handle }
163 }
164}
165
166impl<T> Drop for AbortOnDropHandle<T> {
167 fn drop(&mut self) {
168 self.handle.abort();
169 }
170}
171
172impl<T> Future for AbortOnDropHandle<T> {
173 type Output = Result<T, JoinError>;
174
175 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
176 Pin::new(&mut self.handle).poll(cx)
177 }
178}
179
180impl<T> Unpin for AbortOnDropHandle<T> {}
181
182pub(crate) fn stream_tokio_runtime() -> &'static TokioRuntime {
183 static RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
184 RUNTIME.get_or_init(|| {
185 TokioRuntimeBuilder::new_multi_thread()
186 .enable_all()
187 .thread_name("datum-stream-tokio")
188 .build()
189 .expect("stream tokio runtime")
190 })
191}
192
193fn spawn_tokio_task<Fut, T>(future: Fut) -> AbortOnDropHandle<T>
194where
195 Fut: Future<Output = T> + Send + 'static,
196 T: Send + 'static,
197{
198 AbortOnDropHandle::new(stream_tokio_runtime().spawn(future))
199}
200
201pub(crate) fn current_stream_cancelled() -> Option<Arc<AtomicBool>> {
202 runtime::current_stream_cancelled()
203}
204
205pub(super) fn catch_unwind_failed<T, F>(context: &'static str, f: F) -> StreamResult<T>
206where
207 F: FnOnce() -> T,
208{
209 catch_unwind(AssertUnwindSafe(f))
210 .map_err(|_| StreamError::Failed(format!("{context} panicked")))
211}
212
213impl<Key, Out> PartitionSlot<Key, Out> {
214 fn new(key: Key) -> Self {
215 Self {
216 key: Some(key),
217 active: 0,
218 queued: VecDeque::new(),
219 in_ready_queue: false,
220 }
221 }
222}
223
224#[inline(always)]
225fn partition_slot_for<Key, Out>(
226 key: Key,
227 slots_by_key: &mut HashMap<Key, usize>,
228 slots: &mut Vec<PartitionSlot<Key, Out>>,
229 free_slots: &mut Vec<usize>,
230) -> usize
231where
232 Key: Clone + Eq + Hash,
233{
234 if let Some(slot) = slots_by_key.get(&key) {
235 return *slot;
236 }
237
238 let slot = if let Some(slot) = free_slots.pop() {
239 let state = &mut slots[slot];
240 state.key = Some(key.clone());
241 state.active = 0;
242 state.queued.clear();
243 state.in_ready_queue = false;
244 slot
245 } else {
246 slots.push(PartitionSlot::new(key.clone()));
247 slots.len() - 1
248 };
249 slots_by_key.insert(key, slot);
250 slot
251}
252
253#[inline(always)]
254fn retire_partition_slot<Key, Out>(
255 slot: usize,
256 slots_by_key: &mut HashMap<Key, usize>,
257 slots: &mut [PartitionSlot<Key, Out>],
258 free_slots: &mut Vec<usize>,
259) where
260 Key: Eq + Hash,
261{
262 let state = &mut slots[slot];
263 if let Some(key) = state.key.take() {
264 slots_by_key.remove(&key);
265 }
266 state.active = 0;
267 state.queued.clear();
268 state.in_ready_queue = false;
269 free_slots.push(slot);
270}
271
272#[inline(always)]
273fn ready_partition_slot<Key, Out>(
274 slots: &mut [PartitionSlot<Key, Out>],
275 ready_slots: &mut VecDeque<usize>,
276 slot: usize,
277 per_partition: usize,
278) {
279 if let Some(state) = slots.get_mut(slot)
280 && state.key.is_some()
281 && !state.in_ready_queue
282 && state.active < per_partition
283 && !state.queued.is_empty()
284 {
285 state.in_ready_queue = true;
286 ready_slots.push_back(slot);
287 }
288}
289
290#[inline(always)]
291fn pop_ready_partition_slot<Key, Out>(
292 slots: &mut [PartitionSlot<Key, Out>],
293 ready_slots: &mut VecDeque<usize>,
294 per_partition: usize,
295) -> Option<(usize, usize, Out)> {
296 while let Some(slot) = ready_slots.pop_front() {
297 let mut requeue = false;
298 let item = if let Some(state) = slots.get_mut(slot) {
299 state.in_ready_queue = false;
300 if state.key.is_some() && state.active < per_partition {
301 let item = state.queued.pop_front().map(|(index, item)| {
302 state.active += 1;
303 (index, slot, item)
304 });
305 if !state.queued.is_empty() && state.active < per_partition {
306 state.in_ready_queue = true;
307 requeue = true;
308 }
309 item
310 } else {
311 None
312 }
313 } else {
314 None
315 };
316
317 if requeue {
318 ready_slots.push_back(slot);
319 }
320 if item.is_some() {
321 return item;
322 }
323 }
324 None
325}
326
327pub(crate) trait SourceFactory<Out, Mat>: Send + Sync {
328 fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)>;
329}
330
331struct FnSourceFactory<F>(F);
332
333impl<Out, Mat, F> SourceFactory<Out, Mat> for FnSourceFactory<F>
334where
335 F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync,
336{
337 fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)> {
338 (self.0)(materializer)
339 }
340}
341
342struct MapSourceFactory<In, Out, Mat, F> {
343 source: Arc<dyn SourceFactory<In, Mat>>,
344 stage: F,
345 _marker: PhantomData<fn(In) -> Out>,
346}
347
348impl<In, Out, Mat, F> SourceFactory<Out, Mat> for MapSourceFactory<In, Out, Mat, F>
349where
350 In: Send + 'static,
351 Out: Send + 'static,
352 Mat: Send + 'static,
353 F: Fn(In) -> Out + Send + Sync + 'static,
354{
355 fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)> {
356 let (stream, mat) = Arc::clone(&self.source).create(materializer)?;
357 Ok((
358 Box::new(MapSourceStream {
359 input: stream,
360 factory: self,
361 }),
362 mat,
363 ))
364 }
365}
366
367struct MapSourceStream<In, Out, Mat, F> {
368 input: BoxStream<In>,
369 factory: Arc<MapSourceFactory<In, Out, Mat, F>>,
370}
371
372impl<In, Out, Mat, F> Iterator for MapSourceStream<In, Out, Mat, F>
373where
374 F: Fn(In) -> Out,
375{
376 type Item = StreamResult<Out>;
377
378 fn next(&mut self) -> Option<Self::Item> {
379 self.input
380 .next()
381 .map(|item| item.map(|item| (self.factory.stage)(item)))
382 }
383}
384
385fn merge_streams<Out>(streams: Vec<BoxStream<Out>>, eager_complete: bool) -> BoxStream<Out>
386where
387 Out: Send + 'static,
388{
389 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
390 let mut current = 0usize;
391 Box::new(std::iter::from_fn(move || {
392 loop {
393 let index = next_active_optional_stream(&streams, current, |_| true)?;
394 current = (index + 1) % streams.len().max(1);
395 let Some(stream) = streams[index].as_mut() else {
396 continue;
397 };
398 match stream.next() {
399 Some(item) => return Some(item),
400 None => {
401 streams[index] = None;
402 if eager_complete {
403 return None;
404 }
405 }
406 }
407 }
408 }))
409}
410
411fn merge_prioritized_streams<Out>(
412 streams: Vec<BoxStream<Out>>,
413 priorities: Vec<usize>,
414 eager_complete: bool,
415) -> BoxStream<Out>
416where
417 Out: Send + 'static,
418{
419 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
420 let schedule: Vec<usize> = priorities
421 .into_iter()
422 .enumerate()
423 .flat_map(|(index, weight)| std::iter::repeat_n(index, weight))
424 .collect();
425 let mut schedule_index = 0usize;
426 Box::new(std::iter::from_fn(move || {
427 loop {
428 if streams.iter().all(Option::is_none) {
429 return None;
430 }
431 let index = next_weighted_stream(&streams, &schedule, &mut schedule_index)?;
432 let Some(stream) = streams[index].as_mut() else {
433 continue;
434 };
435 match stream.next() {
436 Some(item) => return Some(item),
437 None => {
438 streams[index] = None;
439 if eager_complete {
440 return None;
441 }
442 }
443 }
444 }
445 }))
446}
447
448fn merge_sorted_stream<Out>(mut left: BoxStream<Out>, mut right: BoxStream<Out>) -> BoxStream<Out>
449where
450 Out: Ord + Send + 'static,
451{
452 let mut left_next: Option<Out> = None;
453 let mut right_next: Option<Out> = None;
454 let mut left_done = false;
455 let mut right_done = false;
456 Box::new(std::iter::from_fn(move || {
457 loop {
458 if left_next.is_none() && !left_done {
459 match left.next() {
460 Some(Ok(item)) => left_next = Some(item),
461 Some(Err(error)) => return Some(Err(error)),
462 None => left_done = true,
463 }
464 }
465 if right_next.is_none() && !right_done {
466 match right.next() {
467 Some(Ok(item)) => right_next = Some(item),
468 Some(Err(error)) => return Some(Err(error)),
469 None => right_done = true,
470 }
471 }
472
473 let next = match (&left_next, &right_next) {
474 (Some(left_item), Some(right_item)) => {
475 if left_item <= right_item {
476 left_next.take()
477 } else {
478 right_next.take()
479 }
480 }
481 (Some(_), None) if right_done => left_next.take(),
482 (None, Some(_)) if left_done => right_next.take(),
483 (None, None) if left_done && right_done => return None,
484 _ => continue,
485 };
486 if let Some(item) = next {
487 return Some(Ok(item));
488 }
489 }
490 }))
491}
492
493fn merge_latest_streams<Out>(
494 streams: Vec<BoxStream<Out>>,
495 eager_complete: bool,
496) -> BoxStream<Vec<Out>>
497where
498 Out: Clone + Send + 'static,
499{
500 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
501 let mut latest = vec![None; streams.len()];
502 let mut seen = 0usize;
503 let mut current = 0usize;
504 let mut pending = VecDeque::<Vec<Out>>::new();
505 Box::new(std::iter::from_fn(move || {
506 loop {
507 if let Some(output) = pending.pop_front() {
508 return Some(Ok(output));
509 }
510 if streams.iter().all(Option::is_none) {
511 return None;
512 }
513 let index = next_active_optional_stream(&streams, current, |_| true)?;
514 current = (index + 1) % streams.len().max(1);
515 let Some(stream) = streams[index].as_mut() else {
516 continue;
517 };
518 match stream.next() {
519 Some(Ok(item)) => {
520 if latest[index].is_none() {
521 seen += 1;
522 }
523 latest[index] = Some(item);
524 if seen == latest.len() {
525 pending.push_back(
526 latest
527 .iter()
528 .map(|item| item.clone().expect("merge-latest initialized"))
529 .collect(),
530 );
531 }
532 }
533 Some(Err(error)) => return Some(Err(error)),
534 None => {
535 streams[index] = None;
536 if eager_complete {
537 return None;
538 }
539 }
540 }
541 }
542 }))
543}
544
545fn zip_streams<Left, Right>(
546 mut left: BoxStream<Left>,
547 mut right: BoxStream<Right>,
548) -> BoxStream<(Left, Right)>
549where
550 Left: Send + 'static,
551 Right: Send + 'static,
552{
553 let mut left_next: Option<Left> = None;
554 let mut right_next: Option<Right> = None;
555 let mut left_done = false;
556 let mut right_done = false;
557 Box::new(std::iter::from_fn(move || {
558 loop {
559 if left_next.is_none() && !left_done {
560 match left.next() {
561 Some(Ok(item)) => left_next = Some(item),
562 Some(Err(error)) => return Some(Err(error)),
563 None => left_done = true,
564 }
565 }
566 if right_next.is_none() && !right_done {
567 match right.next() {
568 Some(Ok(item)) => right_next = Some(item),
569 Some(Err(error)) => return Some(Err(error)),
570 None => right_done = true,
571 }
572 }
573 match (left_next.take(), right_next.take()) {
574 (Some(left_item), Some(right_item)) => return Some(Ok((left_item, right_item))),
575 (left_item, right_item) => {
576 left_next = left_item;
577 right_next = right_item;
578 if (left_done && left_next.is_none()) || (right_done && right_next.is_none()) {
579 return None;
580 }
581 }
582 }
583 }
584 }))
585}
586
587fn zip_latest_with_stream<Left, Right, Out, F>(
588 mut left: BoxStream<Left>,
589 mut right: BoxStream<Right>,
590 eager_complete: bool,
591 combine: Arc<F>,
592) -> BoxStream<Out>
593where
594 Left: Clone + Send + 'static,
595 Right: Clone + Send + 'static,
596 Out: Send + 'static,
597 F: Fn(Left, Right) -> Out + Send + Sync + 'static,
598{
599 let mut left_latest: Option<Left> = None;
600 let mut right_latest: Option<Right> = None;
601 let mut left_done = false;
602 let mut right_done = false;
603 let mut turn_left = true;
604 let mut pending = VecDeque::<Out>::new();
605
606 Box::new(std::iter::from_fn(move || {
607 loop {
608 if let Some(output) = pending.pop_front() {
609 return Some(Ok(output));
610 }
611 if eager_complete && (left_done || right_done) {
612 return None;
613 }
614 if left_done && right_done {
615 return None;
616 }
617 if (left_done && left_latest.is_none()) || (right_done && right_latest.is_none()) {
621 return None;
622 }
623
624 let pull_left = if left_done {
625 false
626 } else if right_done {
627 true
628 } else {
629 let value = turn_left;
630 turn_left = !turn_left;
631 value
632 };
633
634 if pull_left {
635 match left.next() {
636 Some(Ok(item)) => {
637 left_latest = Some(item);
638 if let (Some(left_item), Some(right_item)) = (&left_latest, &right_latest) {
639 pending.push_back(combine(left_item.clone(), right_item.clone()));
640 }
641 }
642 Some(Err(error)) => return Some(Err(error)),
643 None => {
644 left_done = true;
645 if eager_complete {
646 return None;
647 }
648 }
649 }
650 } else {
651 match right.next() {
652 Some(Ok(item)) => {
653 right_latest = Some(item);
654 if let (Some(left_item), Some(right_item)) = (&left_latest, &right_latest) {
655 pending.push_back(combine(left_item.clone(), right_item.clone()));
656 }
657 }
658 Some(Err(error)) => return Some(Err(error)),
659 None => {
660 right_done = true;
661 if eager_complete {
662 return None;
663 }
664 }
665 }
666 }
667 }
668 }))
669}
670
671fn zip_all_stream<Left, Right>(
672 mut left: BoxStream<Left>,
673 mut right: BoxStream<Right>,
674 left_fill: Left,
675 right_fill: Right,
676) -> BoxStream<(Left, Right)>
677where
678 Left: Clone + Send + 'static,
679 Right: Clone + Send + 'static,
680{
681 let mut left_done = false;
682 let mut right_done = false;
683 Box::new(std::iter::from_fn(move || {
684 if left_done && right_done {
685 return None;
686 }
687
688 let left_item = if left_done {
689 None
690 } else {
691 match left.next() {
692 Some(Ok(item)) => Some(item),
693 Some(Err(error)) => return Some(Err(error)),
694 None => {
695 left_done = true;
696 None
697 }
698 }
699 };
700 let right_item = if right_done {
701 None
702 } else {
703 match right.next() {
704 Some(Ok(item)) => Some(item),
705 Some(Err(error)) => return Some(Err(error)),
706 None => {
707 right_done = true;
708 None
709 }
710 }
711 };
712
713 match (left_item, right_item) {
714 (None, None) if left_done && right_done => None,
715 (Some(left_value), Some(right_value)) => Some(Ok((left_value, right_value))),
716 (Some(left_value), None) => Some(Ok((left_value, right_fill.clone()))),
717 (None, Some(right_value)) => Some(Ok((left_fill.clone(), right_value))),
718 (None, None) => None,
719 }
720 }))
721}
722
723fn zip_n_streams<Out, Next, F>(streams: Vec<BoxStream<Out>>, zipper: Arc<F>) -> BoxStream<Next>
724where
725 Out: Send + 'static,
726 Next: Send + 'static,
727 F: Fn(Vec<Out>) -> Next + Send + Sync + 'static,
728{
729 let count = streams.len();
730 if count == 0 {
731 return Box::new(std::iter::empty());
732 }
733 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
734 let mut slots: Vec<Option<Out>> = (0..count).map(|_| None).collect();
735 let mut current = 0usize;
736 Box::new(std::iter::from_fn(move || {
737 loop {
738 if slots.iter().all(Option::is_some) {
739 let values = slots
740 .iter_mut()
741 .map(|slot| slot.take().expect("zip-n slot filled"))
742 .collect();
743 return Some(Ok(zipper(values)));
744 }
745
746 let index = next_active_optional_stream(&streams, current, |idx| slots[idx].is_none())?;
747 current = (index + 1) % count.max(1);
748 let Some(stream) = streams[index].as_mut() else {
749 continue;
750 };
751 match stream.next() {
752 Some(Ok(item)) => slots[index] = Some(item),
753 Some(Err(error)) => return Some(Err(error)),
754 None => {
755 streams[index] = None;
756 slots[index].as_ref()?;
757 }
758 }
759 }
760 }))
761}
762
763fn next_active_optional_stream<T, F>(
764 streams: &[Option<BoxStream<T>>],
765 current: usize,
766 predicate: F,
767) -> Option<usize>
768where
769 T: Send + 'static,
770 F: Fn(usize) -> bool,
771{
772 if streams.is_empty() {
773 return None;
774 }
775 for offset in 0..streams.len() {
776 let index = (current + offset) % streams.len();
777 if streams[index].is_some() && predicate(index) {
778 return Some(index);
779 }
780 }
781 None
782}
783
784fn next_weighted_stream<T>(
785 streams: &[Option<BoxStream<T>>],
786 schedule: &[usize],
787 schedule_index: &mut usize,
788) -> Option<usize>
789where
790 T: Send + 'static,
791{
792 if streams.is_empty() || schedule.is_empty() {
793 return None;
794 }
795 for _ in 0..schedule.len() {
796 let index = schedule[*schedule_index % schedule.len()];
797 *schedule_index = (*schedule_index + 1) % schedule.len();
798 if streams.get(index).is_some_and(Option::is_some) {
799 return Some(index);
800 }
801 }
802 None
803}
804
805pub(crate) mod async_boundary;
806mod completion;
807mod error;
808mod flow;
809mod rate;
810mod restart;
811mod runtime;
812mod sink;
813mod source;
814mod time;
815mod timer;
816
817pub(crate) trait SplitSegmentHookDyn: Send + Sync + 'static {
821 fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Send + Sync>;
822}
823
824pub(crate) trait TerminalSourceHookDyn<In>: Send + Sync + 'static {
830 fn drain_terminal_batch(
831 &self,
832 materializer: &Materializer,
833 cancelled: &Arc<AtomicBool>,
834 batch: &mut Vec<In>,
835 ) -> StreamResult<TerminalSourceStatus>;
836
837 fn cancel_terminal(&self) {}
838}
839
840#[derive(Clone, Copy, Debug, PartialEq, Eq)]
841pub(crate) enum TerminalSourceStatus {
842 Active,
843 Completed,
844}
845
846pub(crate) trait FoldFastPathDyn<In: Send + 'static>: Send + Sync + 'static {
849 fn try_register(
853 &self,
854 hook: Arc<dyn SplitSegmentHookDyn>,
855 ) -> Option<StreamResult<Box<dyn std::any::Any + Send>>>;
856
857 fn supports_terminal_drain(&self) -> bool {
858 false
859 }
860
861 fn try_register_terminal_drain(
862 &self,
863 _hook: Arc<dyn TerminalSourceHookDyn<In>>,
864 _materializer: &Materializer,
865 ) -> Option<StreamResult<Box<dyn std::any::Any + Send>>> {
866 None
867 }
868}
869
870use self::runtime::{runtime_checked_stream, set_current_stream_cancelled};
871
872pub(crate) use self::completion::StreamCancellation;
873
874pub use self::{
875 completion::{Cancellable, StreamCompletion},
876 error::{StreamError, StreamResult, Supervision, SupervisionDecider, SupervisionDirective},
877 flow::{BidiFlow, Flow},
878 rate::{AggregateTimer, OverflowStrategy},
879 restart::{RestartFlow, RestartSettings, RestartSink, RestartSource, RetryFlow},
880 runtime::{Materializer, Runtime},
881 sink::{RunnableGraph, Sink, SinkCombineStrategy},
882 source::{Demand, Keep, MaybeHandle, NotUsed, PushOutlet, Source, SourceCombineStrategy},
883 time::{DelayOverflowStrategy, ThrottleMode},
884};
885
886#[cfg(test)]
887mod tests {
888 use super::*;
889 use crate::Attributes;
890 use crate::testkit::TestSink;
891 use std::fs;
892 use std::sync::{
893 Arc as StdArc,
894 atomic::{
895 AtomicBool as StdAtomicBool, AtomicUsize as StdAtomicUsize, Ordering as StdOrdering,
896 },
897 mpsc,
898 };
899 use std::time::Duration as StdDuration;
900 use std::time::Instant;
901
902 fn wait<T>(completion: StreamCompletion<T>) -> T {
903 completion.wait().unwrap()
904 }
905
906 fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
907 let deadline = Instant::now() + timeout;
908 while Instant::now() < deadline {
909 if condition() {
910 return true;
911 }
912 thread::sleep(Duration::from_millis(2));
913 }
914 condition()
915 }
916
917 fn linux_thread_count(thread_name: &str) -> usize {
918 fs::read_dir("/proc/self/task")
919 .expect("task directory readable")
920 .filter_map(Result::ok)
921 .filter_map(|entry| fs::read_to_string(entry.path().join("comm")).ok())
922 .filter(|name| name.trim() == thread_name)
923 .count()
924 }
925
926 #[test]
927 fn source_async_boundary_preserves_results() {
928 let expected = Source::from_iter(0_u64..128)
929 .map(|item| item.wrapping_add(1))
930 .filter(|item| item % 3 != 0)
931 .map(|item| item * 2)
932 .run_collect()
933 .unwrap();
934
935 let actual = Source::from_iter(0_u64..128)
936 .map(|item| item.wrapping_add(1))
937 .async_boundary()
938 .filter(|item| item % 3 != 0)
939 .map(|item| item * 2)
940 .run_collect()
941 .unwrap();
942
943 assert_eq!(actual, expected);
944 }
945
946 #[test]
947 fn flow_async_boundary_preserves_results() {
948 let expected = Source::from_iter(0_u64..128)
949 .map(|item| item + 1)
950 .map(|item| item * 3)
951 .run_collect()
952 .unwrap();
953
954 let flow = Flow::identity()
955 .map(|item: u64| item + 1)
956 .r#async()
957 .map(|item| item * 3);
958 let actual = Source::from_iter(0_u64..128)
959 .via(flow)
960 .run_collect()
961 .unwrap();
962
963 assert_eq!(actual, expected);
964 }
965
966 #[test]
967 fn linear_async_boundary_matches_graph_async_boundary_shape() {
968 use crate::{
969 AsyncBoundary, AsyncBoundaryExecutionConfig, FusedExecutionConfig, GraphDsl,
970 GraphFlowShape, MapStage,
971 };
972
973 let graph = GraphDsl::try_create(|builder| {
974 let first = builder.add(MapStage::new(|item: u64| item + 1));
975 let boundary = builder.add(AsyncBoundary::<u64>::new());
976 let second = builder.add(MapStage::new(|item: u64| item * 2));
977
978 builder.connect(first.outlet(), boundary.inlet())?;
979 builder.connect(boundary.outlet(), second.inlet())?;
980
981 Ok(GraphFlowShape::new(first.inlet(), second.outlet()))
982 })
983 .unwrap();
984
985 let linear = Source::from_iter(1_u64..=4)
986 .map(|item| item + 1)
987 .async_boundary_with_buffer(4)
988 .map(|item| item * 2)
989 .run_collect()
990 .unwrap();
991 let graph_output = graph.run_with_input(1_u64..=4).unwrap();
992 let report = graph
993 .run_async_boundary_count_with_input_report(
994 1_u64..=4,
995 AsyncBoundaryExecutionConfig {
996 fused: FusedExecutionConfig { event_limit: 1024 },
997 buffer_size: 4,
998 },
999 )
1000 .unwrap();
1001
1002 assert_eq!(linear, graph_output);
1003 assert_eq!(report.result, linear.len());
1004 assert_eq!(report.async_boundary_crossings, linear.len());
1005 }
1006
1007 #[test]
1008 fn async_boundary_regions_run_concurrently() {
1009 let (upstream_tx, upstream_rx) = mpsc::channel::<u64>();
1010 let (downstream_blocked_tx, downstream_blocked_rx) = mpsc::channel::<()>();
1011 let (release_tx, release_rx) = mpsc::channel::<()>();
1012 let release_rx = StdArc::new(Mutex::new(release_rx));
1013
1014 let completion = Source::from_iter(0_u64..3)
1015 .map(move |item| {
1016 upstream_tx.send(item).expect("upstream probe receives");
1017 item
1018 })
1019 .async_boundary_with_buffer(1)
1020 .map({
1021 let release_rx = StdArc::clone(&release_rx);
1022 move |item| {
1023 if item == 0 {
1024 downstream_blocked_tx
1025 .send(())
1026 .expect("downstream probe receives");
1027 release_rx
1028 .lock()
1029 .expect("release receiver lock")
1030 .recv_timeout(StdDuration::from_secs(2))
1031 .expect("downstream release arrives");
1032 }
1033 item
1034 }
1035 })
1036 .run_with(Sink::collect())
1037 .unwrap();
1038
1039 assert_eq!(
1040 downstream_blocked_rx.recv_timeout(StdDuration::from_secs(2)),
1041 Ok(())
1042 );
1043 assert_eq!(upstream_rx.recv_timeout(StdDuration::from_secs(2)), Ok(0));
1044 assert_eq!(upstream_rx.recv_timeout(StdDuration::from_secs(2)), Ok(1));
1045
1046 release_tx.send(()).expect("release downstream");
1047 assert_eq!(completion.wait().unwrap(), vec![0, 1, 2]);
1048 }
1049
1050 #[test]
1051 fn async_boundary_backpressures_slow_downstream() {
1052 let (produced_tx, produced_rx) = mpsc::channel::<u64>();
1053 let (release_tx, release_rx) = mpsc::channel::<()>();
1054 let release_rx = StdArc::new(Mutex::new(release_rx));
1055
1056 let completion = Source::from_iter(0_u64..8)
1057 .map(move |item| {
1058 produced_tx.send(item).expect("producer probe receives");
1059 item
1060 })
1061 .async_boundary_with_buffer(1)
1062 .map({
1063 let release_rx = StdArc::clone(&release_rx);
1064 move |item| {
1065 if item == 0 {
1066 release_rx
1067 .lock()
1068 .expect("release receiver lock")
1069 .recv_timeout(StdDuration::from_secs(2))
1070 .expect("downstream release arrives");
1071 }
1072 item
1073 }
1074 })
1075 .run_with(Sink::collect())
1076 .unwrap();
1077
1078 assert_eq!(produced_rx.recv_timeout(StdDuration::from_secs(2)), Ok(0));
1079 assert_eq!(produced_rx.recv_timeout(StdDuration::from_secs(2)), Ok(1));
1080 if let Ok(item) = produced_rx.recv_timeout(StdDuration::from_millis(100)) {
1081 assert_eq!(item, 2);
1082 }
1083 match produced_rx.recv_timeout(StdDuration::from_millis(100)) {
1084 Err(mpsc::RecvTimeoutError::Timeout) => {}
1085 other => panic!("async boundary handoff was not bounded: {other:?}"),
1086 }
1087
1088 release_tx.send(()).expect("release downstream");
1089 assert_eq!(completion.wait().unwrap(), (0_u64..8).collect::<Vec<_>>());
1090 }
1091
1092 #[test]
1093 fn source_blueprints_are_reusable() {
1094 let source = Source::from_iter(0..5).map(|item| item + 1);
1095
1096 assert_eq!(source.clone().run_collect().unwrap(), vec![1, 2, 3, 4, 5]);
1097 assert_eq!(source.run_collect().unwrap(), vec![1, 2, 3, 4, 5]);
1098 }
1099
1100 #[test]
1101 fn source_map_preserves_materialized_value() {
1102 let graph = Source::single(1)
1103 .map_materialized_value(|_| "source")
1104 .map(|item| item + 1)
1105 .to_mat(Sink::head(), Keep::both);
1106
1107 let materialized = graph.run().unwrap();
1108 assert_eq!(materialized.0, "source");
1109 assert_eq!(wait(materialized.1), 2);
1110 }
1111
1112 #[test]
1113 fn source_and_flow_compose() {
1114 let flow = Flow::identity()
1115 .map(|item: i32| item * 2)
1116 .filter(|item| item % 3 == 0);
1117
1118 let result = Source::from_iter(0..8).via(flow).run_collect().unwrap();
1119
1120 assert_eq!(result, vec![0, 6, 12]);
1121 }
1122
1123 #[test]
1124 fn sink_setup_sees_materializer_defaults_and_local_attributes() {
1125 let observed = StdArc::new(Mutex::new(None));
1126 let observed_in_setup = StdArc::clone(&observed);
1127 let sink = Sink::<i32, StreamCompletion<NotUsed>>::setup(move |_materializer, attrs| {
1128 *observed_in_setup.lock().unwrap() = Some((
1129 attrs.name().map(str::to_owned),
1130 attrs.input_buffer_hint(),
1131 attrs.dispatcher_hint().map(str::to_owned),
1132 ));
1133 Sink::ignore()
1134 })
1135 .add_attributes(Attributes::named("sink-inner"))
1136 .add_attributes(Attributes::input_buffer(4, 4))
1137 .add_attributes(Attributes::dispatcher("bench-dispatcher"));
1138
1139 let materializer = Materializer::new().with_attributes(Attributes::named("mat-outer"));
1140 wait(
1141 Source::from_iter([1, 2, 3])
1142 .run_with_materializer(sink, &materializer)
1143 .unwrap(),
1144 );
1145
1146 assert_eq!(
1147 *observed.lock().unwrap(),
1148 Some((
1149 Some("sink-inner".to_owned()),
1150 Some((4, 4)),
1151 Some("bench-dispatcher".to_owned())
1152 ))
1153 );
1154 }
1155
1156 #[test]
1157 fn sink_pre_materialize_feeds_existing_materialization() {
1158 let materializer = Materializer::new();
1159 let (completion, pre) = Sink::<i32, StreamCompletion<Vec<i32>>>::collect()
1160 .pre_materialize(&materializer)
1161 .unwrap();
1162
1163 Source::from_iter([1, 2, 3])
1164 .run_with_materializer(pre, &materializer)
1165 .unwrap();
1166
1167 assert_eq!(wait(completion), vec![1, 2, 3]);
1168 }
1169
1170 #[test]
1171 fn flow_from_sink_and_source_connects_both_sides() {
1172 assert_eq!(
1173 Source::from_iter([1, 2, 3])
1174 .via(Flow::from_sink_and_source(
1175 Sink::foreach(|_item: i32| {}),
1176 Source::from_iter([10, 20, 30]),
1177 ))
1178 .run_collect()
1179 .unwrap(),
1180 vec![10, 20, 30]
1181 );
1182 }
1183
1184 #[test]
1185 fn from_sink_and_source_keeps_sink_running_after_source_side_completes() {
1186 let completed = StdArc::new(StdAtomicBool::new(false));
1187 let on_complete = StdArc::clone(&completed);
1188 let flow = Flow::from_sink_and_source(
1189 Sink::on_complete(move || {
1190 on_complete.store(true, StdOrdering::SeqCst);
1191 }),
1192 Source::single(10),
1193 );
1194
1195 let result = Source::from_iter([1, 2, 3])
1196 .via(flow)
1197 .run_collect()
1198 .unwrap();
1199
1200 assert_eq!(result, vec![10]);
1201 assert!(wait_until(StdDuration::from_secs(1), || {
1202 completed.load(StdOrdering::SeqCst)
1203 }));
1204 }
1205
1206 #[test]
1207 fn from_sink_and_source_coupled_cancels_source_when_sink_finishes_first() {
1208 let cancellable = StdArc::new(Mutex::new(None));
1209 let observed = StdArc::clone(&cancellable);
1210 let flow = Flow::from_sink_and_source_coupled(
1211 Sink::ignore(),
1212 Source::tick(
1213 StdDuration::from_millis(50),
1214 StdDuration::from_millis(50),
1215 10,
1216 )
1217 .map_materialized_value(move |handle| {
1218 *observed.lock().unwrap() = Some(handle.clone());
1219 handle
1220 }),
1221 );
1222
1223 let completion = Source::from_iter(std::iter::empty::<i32>())
1224 .via(flow)
1225 .run_with(Sink::ignore())
1226 .unwrap();
1227 assert!(wait_until(StdDuration::from_secs(1), || {
1228 cancellable
1229 .lock()
1230 .unwrap()
1231 .as_ref()
1232 .is_some_and(Cancellable::is_cancelled)
1233 }));
1234 assert_eq!(wait(completion), NotUsed);
1235 }
1236
1237 #[test]
1238 fn bidi_flow_join_and_atop_compose() {
1239 let codec = BidiFlow::from_flows(
1240 Flow::identity().map(|item: i32| item + 1),
1241 Flow::identity().map(|item: i32| item * 2),
1242 )
1243 .named("codec");
1244 let framing = BidiFlow::from_flows(
1245 Flow::identity().map(|item: i32| item * 3),
1246 Flow::identity().map(|item: i32| item - 4),
1247 );
1248
1249 let joined = codec
1250 .clone()
1251 .join(Flow::identity().map(|item: i32| item - 5));
1252 let stacked = codec.atop(framing).join(Flow::identity());
1253
1254 assert_eq!(
1255 Source::single(10).via(joined).run_collect().unwrap(),
1256 vec![12]
1257 );
1258 assert_eq!(
1259 Source::single(10).via(stacked).run_collect().unwrap(),
1260 vec![58]
1261 );
1262 }
1263
1264 #[test]
1265 fn flow_buffer_then_map_runs_end_to_end() {
1266 let flow = Flow::identity()
1267 .buffer(8, OverflowStrategy::Backpressure)
1268 .map(|item: i32| item + 1);
1269
1270 let result = Source::from_iter(0..4).via(flow).run_collect().unwrap();
1271
1272 assert_eq!(result, vec![1, 2, 3, 4]);
1273 }
1274
1275 #[test]
1276 fn public_flow_combinators_preserve_runtime_transform_after_buffer() {
1277 fn buffered_flow() -> Flow<i32, i32> {
1278 Flow::identity().buffer(8, OverflowStrategy::Backpressure)
1279 }
1280
1281 assert_eq!(
1282 Source::from_iter(0..4)
1283 .via(buffered_flow().filter(|item| *item % 2 == 0))
1284 .run_collect()
1285 .unwrap(),
1286 vec![0, 2]
1287 );
1288 assert_eq!(
1289 Source::from_iter(0..4)
1290 .via(buffered_flow().filter_not(|item| *item % 2 == 0))
1291 .run_collect()
1292 .unwrap(),
1293 vec![1, 3]
1294 );
1295 assert_eq!(
1296 Source::from_iter(0..4)
1297 .via(buffered_flow().filter_map(|item| (item % 2 == 0).then_some(item + 10)))
1298 .run_collect()
1299 .unwrap(),
1300 vec![10, 12]
1301 );
1302 assert_eq!(
1303 Source::from_iter(0..3)
1304 .via(buffered_flow().map_concat(|item| [item, item + 10]))
1305 .run_collect()
1306 .unwrap(),
1307 vec![0, 10, 1, 11, 2, 12]
1308 );
1309 assert_eq!(
1310 Source::from_iter(0..3)
1311 .via(buffered_flow().stateful_map(5, |state, item| {
1312 *state += item;
1313 *state
1314 }))
1315 .run_collect()
1316 .unwrap(),
1317 vec![5, 6, 8]
1318 );
1319 assert_eq!(
1320 Source::from_iter(0..3)
1321 .via(buffered_flow().stateful_map_concat(0, |state, item| {
1322 *state += item;
1323 [*state, item]
1324 }))
1325 .run_collect()
1326 .unwrap(),
1327 vec![0, 0, 1, 1, 3, 2]
1328 );
1329 assert_eq!(
1330 Source::from_iter(0..4)
1331 .via(buffered_flow().map_async(2, |item| async move { Ok(item + 1) }))
1332 .run_collect()
1333 .unwrap(),
1334 vec![1, 2, 3, 4]
1335 );
1336 assert_eq!(
1337 Source::from_iter(0..4)
1338 .via(buffered_flow().map_async_unordered(2, |item| async move { Ok(item + 1) }))
1339 .run_collect()
1340 .unwrap(),
1341 vec![1, 2, 3, 4]
1342 );
1343 assert_eq!(
1344 Source::from_iter(0..4)
1345 .via(buffered_flow().map_async_partitioned(
1346 2,
1347 1,
1348 |item| item % 2,
1349 |item| async move { Ok(item + 1) },
1350 ))
1351 .run_collect()
1352 .unwrap(),
1353 vec![1, 2, 3, 4]
1354 );
1355 assert_eq!(
1356 Source::from_iter(0..5)
1357 .via(buffered_flow().take(3))
1358 .run_collect()
1359 .unwrap(),
1360 vec![0, 1, 2]
1361 );
1362 assert_eq!(
1363 Source::from_iter(0..5)
1364 .via(buffered_flow().drop(2))
1365 .run_collect()
1366 .unwrap(),
1367 vec![2, 3, 4]
1368 );
1369 assert_eq!(
1370 Source::from_iter(0..5)
1371 .via(buffered_flow().take_while(|item| *item < 3))
1372 .run_collect()
1373 .unwrap(),
1374 vec![0, 1, 2]
1375 );
1376 assert_eq!(
1377 Source::from_iter(0..5)
1378 .via(buffered_flow().drop_while(|item| *item < 3))
1379 .run_collect()
1380 .unwrap(),
1381 vec![3, 4]
1382 );
1383 assert_eq!(
1384 Source::from_iter(0..3)
1385 .via(buffered_flow().limit(5))
1386 .run_collect()
1387 .unwrap(),
1388 vec![0, 1, 2]
1389 );
1390 assert_eq!(
1391 Source::from_iter(0..5)
1392 .via(buffered_flow().grouped(2))
1393 .run_collect()
1394 .unwrap(),
1395 vec![vec![0, 1], vec![2, 3], vec![4]]
1396 );
1397 assert_eq!(
1398 Source::from_iter(1..=3)
1399 .via(buffered_flow().scan(0, |acc, item| acc + item))
1400 .run_collect()
1401 .unwrap(),
1402 vec![0, 1, 3, 6]
1403 );
1404 assert_eq!(
1405 Source::from_iter(1..=4)
1406 .via(buffered_flow().sliding(2, 1))
1407 .run_collect()
1408 .unwrap(),
1409 vec![vec![1, 2], vec![2, 3], vec![3, 4]]
1410 );
1411 assert_eq!(
1412 Source::from_iter(1..=4)
1413 .via(buffered_flow().fold(0, |acc, item| acc + item))
1414 .run_collect()
1415 .unwrap(),
1416 vec![10]
1417 );
1418 assert_eq!(
1419 Source::from_iter(1..=4)
1420 .via(buffered_flow().reduce(|acc, item| acc + item))
1421 .run_collect()
1422 .unwrap(),
1423 vec![10]
1424 );
1425 assert_eq!(
1426 Source::from_factory(|| {
1427 Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into())), Ok(2)].into_iter())
1428 })
1429 .via(buffered_flow().map_error(|_| StreamError::Failed("mapped".into())))
1430 .run_collect(),
1431 Err(StreamError::Failed("mapped".into()))
1432 );
1433 assert_eq!(
1434 Source::<i32>::failed(StreamError::Failed("boom".into()))
1435 .via(buffered_flow().recover(|_| Some(42)))
1436 .run_collect()
1437 .unwrap(),
1438 vec![42]
1439 );
1440 assert_eq!(
1441 Source::<i32>::failed(StreamError::Failed("boom".into()))
1442 .via(buffered_flow().recover_with(|_| Some(Source::from_iter([7, 8]))))
1443 .run_collect()
1444 .unwrap(),
1445 vec![7, 8]
1446 );
1447 assert_eq!(
1448 Source::<i32>::failed(StreamError::Failed("boom".into()))
1449 .via(buffered_flow().recover_with_retries(1, |_| Some(Source::from_iter([9]))))
1450 .run_collect()
1451 .unwrap(),
1452 vec![9]
1453 );
1454 assert_eq!(
1455 Source::from_factory(|| {
1456 Box::new(vec![Ok(1), Err(StreamError::Failed("ignored".into())), Ok(2)].into_iter())
1457 })
1458 .via(buffered_flow().on_error_complete())
1459 .run_collect()
1460 .unwrap(),
1461 vec![1]
1462 );
1463
1464 let materialized = Source::from_iter([1, 2, 3])
1465 .run_with(
1466 buffered_flow()
1467 .via(Flow::identity().map(|item| item + 1))
1468 .map_materialized_value(|_| "buffered-flow")
1469 .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both),
1470 )
1471 .unwrap();
1472 assert_eq!(materialized.0, "buffered-flow");
1473 assert_eq!(wait(materialized.1), 9);
1474
1475 let kept = Source::from_iter([1, 2, 3])
1476 .run_with(
1477 buffered_flow()
1478 .via_mat_with(Flow::identity().map(|item| item + 1), |_, _| "combined")
1479 .to(Sink::fold(0, |acc, item| acc + item)),
1480 )
1481 .unwrap();
1482 assert_eq!(kept, "combined");
1483 }
1484
1485 #[test]
1486 fn runtime_rate_flows_compose_in_flow_form() {
1487 let conflate = Flow::identity()
1488 .conflate(|left: i32, right| left + right)
1489 .map(|item| item + 1);
1490 assert_eq!(
1491 Source::single(4).via(conflate).run_collect().unwrap(),
1492 vec![5]
1493 );
1494
1495 let batch = Flow::identity()
1496 .batch(4, |item: i32| item, |left, right| left + right)
1497 .map(|item| item + 1);
1498 assert_eq!(Source::single(4).via(batch).run_collect().unwrap(), vec![5]);
1499
1500 let expand = Flow::identity()
1501 .expand(std::iter::once::<i32>)
1502 .map(|item| item + 1);
1503 assert_eq!(
1504 Source::from_iter(0..4).via(expand).run_collect().unwrap(),
1505 vec![1, 2, 3, 4]
1506 );
1507
1508 let aggregate = Flow::identity()
1509 .aggregate_with_boundary(
1510 Vec::<i32>::new,
1511 |mut items, item| {
1512 items.push(item);
1513 let ready = !items.is_empty();
1514 (items, ready)
1515 },
1516 |items| items.into_iter().sum::<i32>(),
1517 None,
1518 )
1519 .map(|item| item + 1);
1520 assert_eq!(
1521 Source::from_iter(0..4)
1522 .via(aggregate)
1523 .run_collect()
1524 .unwrap(),
1525 vec![1, 2, 3, 4]
1526 );
1527
1528 let detached = Flow::identity().detach().map(|item: i32| item + 1);
1529 assert_eq!(
1530 Source::from_iter(0..4).via(detached).run_collect().unwrap(),
1531 vec![1, 2, 3, 4]
1532 );
1533 }
1534
1535 #[test]
1536 fn high_use_source_flow_operators_work() {
1537 let result = Source::from_iter(0..8)
1538 .drop(1)
1539 .take(5)
1540 .filter_not(|item| item % 2 == 0)
1541 .map_concat(|item| [item, item + 10])
1542 .grouped(3)
1543 .run_collect()
1544 .unwrap();
1545
1546 assert_eq!(result, vec![vec![1, 11, 3], vec![13, 5, 15]]);
1547 }
1548
1549 #[test]
1550 fn prefix_and_tail_emits_prefix_and_live_tail() {
1551 let mut outer = Source::from_iter(0..5)
1552 .prefix_and_tail(2)
1553 .run_collect()
1554 .unwrap();
1555 assert_eq!(outer.len(), 1);
1556 let (prefix, tail) = outer.pop().unwrap();
1557 assert_eq!(prefix, vec![0, 1]);
1558 assert_eq!(tail.clone().run_collect().unwrap(), vec![2, 3, 4]);
1559 assert_eq!(
1560 tail.run_collect(),
1561 Err(StreamError::Failed(
1562 "substream source cannot be materialized more than once".into()
1563 ))
1564 );
1565 }
1566
1567 #[test]
1568 fn prefix_and_tail_fails_before_prefix_is_ready() {
1569 let result = Source::from_factory(|| {
1570 Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into())), Ok(2)].into_iter())
1571 })
1572 .prefix_and_tail(2)
1573 .run_collect();
1574 assert!(matches!(result, Err(StreamError::Failed(message)) if message == "boom"));
1575 }
1576
1577 #[test]
1578 fn prefix_and_tail_tail_propagates_late_upstream_failure() {
1579 let mut outer = Source::from_factory(|| {
1580 Box::new(vec![Ok(1), Ok(2), Err(StreamError::Failed("boom".into())), Ok(3)].into_iter())
1581 })
1582 .prefix_and_tail(2)
1583 .run_collect()
1584 .unwrap();
1585 let (prefix, tail) = outer.pop().unwrap();
1586 assert_eq!(prefix, vec![1, 2]);
1587 assert_eq!(tail.run_collect(), Err(StreamError::Failed("boom".into())));
1588 }
1589
1590 #[test]
1591 fn prefix_and_tail_accepts_non_clone_elements() {
1592 #[derive(Debug, PartialEq, Eq)]
1593 struct NonClone(u8);
1594
1595 let mut outer = Source::from_factory(|| {
1596 Box::new(vec![Ok(NonClone(1)), Ok(NonClone(2)), Ok(NonClone(3))].into_iter())
1597 })
1598 .prefix_and_tail(2)
1599 .run_collect()
1600 .unwrap();
1601 let (prefix, tail) = outer.pop().unwrap();
1602 assert_eq!(prefix, vec![NonClone(1), NonClone(2)]);
1603 assert_eq!(tail.run_collect().unwrap(), vec![NonClone(3)]);
1604 }
1605
1606 #[test]
1607 fn flat_map_prefix_materializes_on_short_upstream_completion() {
1608 let values = Source::from_iter([1, 2])
1609 .flat_map_prefix(3, |prefix| {
1610 let sum = prefix.into_iter().sum::<i32>();
1611 Flow::identity().prepend(Source::single(sum))
1612 })
1613 .run_collect()
1614 .unwrap();
1615 assert_eq!(values, vec![3]);
1616 }
1617
1618 #[test]
1619 fn flat_map_prefix_does_not_materialize_on_early_upstream_failure() {
1620 let invoked = StdArc::new(StdAtomicBool::new(false));
1621 let invoked_for_stage = StdArc::clone(&invoked);
1622 let result = Source::from_factory(|| {
1623 Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into()))].into_iter())
1624 })
1625 .flat_map_prefix(3, move |_prefix| {
1626 invoked_for_stage.store(true, StdOrdering::SeqCst);
1627 Flow::identity()
1628 })
1629 .run_collect();
1630 assert_eq!(result, Err(StreamError::Failed("boom".into())));
1631 assert!(!invoked.load(StdOrdering::SeqCst));
1632 }
1633
1634 #[test]
1635 fn flat_map_concat_flattens_nested_sources_sequentially() {
1636 let values = Source::from_iter([1, 2, 3])
1637 .flat_map_concat(|item| Source::from_iter(0..item))
1638 .run_collect()
1639 .unwrap();
1640 assert_eq!(values, vec![0, 0, 1, 0, 1, 2]);
1641 }
1642
1643 #[test]
1644 fn flat_map_merge_respects_breadth_bound() {
1645 let active = StdArc::new(StdAtomicUsize::new(0));
1646 let max_active = StdArc::new(StdAtomicUsize::new(0));
1647 let active_for_stage = StdArc::clone(&active);
1648 let max_for_stage = StdArc::clone(&max_active);
1649
1650 let mut values = Source::from_iter(0..6)
1651 .flat_map_merge(2, move |item| {
1652 let active = StdArc::clone(&active_for_stage);
1653 let max_active = StdArc::clone(&max_for_stage);
1654 Source::future(move || {
1655 let active = StdArc::clone(&active);
1656 let max_active = StdArc::clone(&max_active);
1657 async move {
1658 let now = active.fetch_add(1, StdOrdering::SeqCst) + 1;
1659 loop {
1660 let seen = max_active.load(StdOrdering::SeqCst);
1661 if now <= seen {
1662 break;
1663 }
1664 if max_active
1665 .compare_exchange(
1666 seen,
1667 now,
1668 StdOrdering::SeqCst,
1669 StdOrdering::SeqCst,
1670 )
1671 .is_ok()
1672 {
1673 break;
1674 }
1675 }
1676 thread::sleep(StdDuration::from_millis(20));
1677 active.fetch_sub(1, StdOrdering::SeqCst);
1678 Ok(item)
1679 }
1680 })
1681 })
1682 .run_collect()
1683 .unwrap();
1684 values.sort_unstable();
1685 assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
1686 assert!(max_active.load(StdOrdering::SeqCst) <= 2);
1687 }
1688
1689 #[test]
1690 fn flat_map_merge_propagates_inner_failures() {
1691 let result = Source::from_iter([0, 1, 2])
1692 .flat_map_merge(2, |item| {
1693 if item == 1 {
1694 Source::failed(StreamError::Failed("boom".into()))
1695 } else {
1696 Source::single(item)
1697 }
1698 })
1699 .run_collect();
1700 assert_eq!(result, Err(StreamError::Failed("boom".into())));
1701 }
1702
1703 #[test]
1704 fn flat_map_merge_emits_ready_inner_output_while_upstream_is_blocked() {
1705 let (release_tx, release_rx) = mpsc::channel();
1706 let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
1707 let queue = Source::from_factory(move || {
1708 let release_rx = StdArc::clone(&release_rx);
1709 let mut step = 0_u8;
1710 Box::new(std::iter::from_fn(move || {
1711 let item = match step {
1712 0 => Some(Ok(0)),
1713 1 => {
1714 release_rx
1715 .lock()
1716 .unwrap()
1717 .as_ref()
1718 .expect("release receiver available")
1719 .recv_timeout(StdDuration::from_secs(1))
1720 .expect("timed out waiting to release second upstream element");
1721 Some(Ok(1))
1722 }
1723 _ => None,
1724 };
1725 step += 1;
1726 item
1727 }))
1728 })
1729 .flat_map_merge(2, |item| Source::single(item + 10))
1730 .run_with(Sink::queue())
1731 .unwrap();
1732
1733 assert_eq!(queue.pull().unwrap(), Some(10));
1734 release_tx.send(()).unwrap();
1735 assert_eq!(queue.pull().unwrap(), Some(11));
1736 assert!(queue.pull().unwrap().is_none());
1737 }
1738
1739 #[test]
1740 fn group_by_routes_keys_and_drops_closed_keys() {
1741 let outer = Source::from_iter([0, 1, 2, 3, 4])
1742 .group_by(4, |item| item % 2, false)
1743 .run_with(Sink::queue())
1744 .unwrap();
1745
1746 let even = outer.pull().unwrap().unwrap();
1747 let even_completion = even.run_with(Sink::ignore()).unwrap();
1748 let odd = outer.pull().unwrap().unwrap();
1749 drop(even_completion);
1750
1751 assert_eq!(odd.run_collect().unwrap(), vec![1, 3]);
1752 assert!(outer.pull().unwrap().is_none());
1753 }
1754
1755 #[test]
1756 fn group_by_fails_when_distinct_key_limit_is_exceeded() {
1757 let outer = Source::from_iter([0, 1, 2])
1758 .group_by(2, |item| *item, false)
1759 .run_with(Sink::queue())
1760 .unwrap();
1761
1762 let _ = outer.pull().unwrap().unwrap();
1763 let _ = outer.pull().unwrap().unwrap();
1764 assert!(matches!(
1765 outer.pull(),
1766 Err(StreamError::Failed(message)) if message == "group_by reached max_substreams (2)"
1767 ));
1768 }
1769
1770 #[test]
1771 fn group_by_can_recreate_closed_substreams_when_enabled() {
1772 let (release_tx, release_rx) = mpsc::channel();
1773 let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
1774 let outer = Source::from_factory(move || {
1775 let release_rx = StdArc::clone(&release_rx);
1776 let mut step = 0_u8;
1777 Box::new(std::iter::from_fn(move || {
1778 let item = match step {
1779 0 => Some(Ok(0)),
1780 1 => Some(Ok(1)),
1781 2 => {
1782 release_rx
1783 .lock()
1784 .unwrap()
1785 .as_ref()
1786 .expect("release receiver available")
1787 .recv_timeout(StdDuration::from_secs(1))
1788 .expect("timed out waiting to release recreated key");
1789 Some(Ok(0))
1790 }
1791 _ => None,
1792 };
1793 step += 1;
1794 item
1795 }))
1796 })
1797 .group_by(4, |item| item % 2, true)
1798 .run_with(Sink::queue())
1799 .unwrap();
1800
1801 let even = outer.pull().unwrap().unwrap();
1802 assert_eq!(wait(even.run_with(Sink::head()).unwrap()), 0);
1803 release_tx.send(()).unwrap();
1804
1805 let odd = outer.pull().unwrap().unwrap();
1806 assert_eq!(odd.run_collect().unwrap(), vec![1]);
1807
1808 let recreated_even = outer.pull().unwrap().unwrap();
1809 assert_eq!(recreated_even.run_collect().unwrap(), vec![0]);
1810 assert!(outer.pull().unwrap().is_none());
1811 }
1812
1813 #[test]
1814 fn group_by_panicking_key_fn_abruptly_terminates_live_substreams() {
1815 let outer = Source::from_iter([0, 1])
1816 .group_by(
1817 4,
1818 |item| {
1819 assert_ne!(*item, 1, "boom");
1820 item % 2
1821 },
1822 false,
1823 )
1824 .run_with(Sink::queue())
1825 .unwrap();
1826
1827 let substream = outer.pull().unwrap().unwrap();
1828 let (result_tx, result_rx) = mpsc::channel();
1829 thread::spawn(move || {
1830 let _ = result_tx.send(substream.run_collect());
1831 });
1832
1833 assert_eq!(
1834 result_rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1835 Err(StreamError::AbruptTermination)
1836 );
1837 assert!(matches!(outer.pull(), Err(StreamError::AbruptTermination)));
1838 }
1839
1840 #[test]
1841 fn split_when_starts_new_substream_on_boundary_element() {
1842 let outer = Source::from_iter([1, 2, 0, 3, 0, 4, 5])
1843 .split_when(|item| *item == 0)
1844 .run_with(Sink::queue())
1845 .unwrap();
1846
1847 let first = outer.pull().unwrap().unwrap();
1848 assert_eq!(first.run_collect().unwrap(), vec![1, 2]);
1849 let second = outer.pull().unwrap().unwrap();
1850 assert_eq!(second.run_collect().unwrap(), vec![0, 3]);
1851 let third = outer.pull().unwrap().unwrap();
1852 assert_eq!(third.run_collect().unwrap(), vec![0, 4, 5]);
1853 assert!(outer.pull().unwrap().is_none());
1854 }
1855
1856 #[test]
1857 fn split_after_ends_current_substream_on_boundary_element() {
1858 let outer = Source::from_iter([1, 2, 0, 3, 0, 4, 5])
1859 .split_after(|item| *item == 0)
1860 .run_with(Sink::queue())
1861 .unwrap();
1862
1863 let first = outer.pull().unwrap().unwrap();
1864 assert_eq!(first.run_collect().unwrap(), vec![1, 2, 0]);
1865 let second = outer.pull().unwrap().unwrap();
1866 assert_eq!(second.run_collect().unwrap(), vec![3, 0]);
1867 let third = outer.pull().unwrap().unwrap();
1868 assert_eq!(third.run_collect().unwrap(), vec![4, 5]);
1869 assert!(outer.pull().unwrap().is_none());
1870 }
1871
1872 #[test]
1873 fn split_when_panicking_predicate_abruptly_terminates_live_substreams() {
1874 let outer = Source::from_iter([1, 2])
1875 .split_when(|item| {
1876 assert_ne!(*item, 2, "boom");
1877 false
1878 })
1879 .run_with(Sink::queue())
1880 .unwrap();
1881
1882 let substream = outer.pull().unwrap().unwrap();
1883 let (result_tx, result_rx) = mpsc::channel();
1884 thread::spawn(move || {
1885 let _ = result_tx.send(substream.run_collect());
1886 });
1887
1888 assert_eq!(
1889 result_rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1890 Err(StreamError::AbruptTermination)
1891 );
1892 assert!(matches!(outer.pull(), Err(StreamError::AbruptTermination)));
1893 }
1894
1895 #[test]
1896 fn split_when_pre_buffer_segments_match_expected_count() {
1897 let outer = Source::from_iter(0..100)
1898 .split_when(|item| *item != 0 && *item % 10 == 0)
1899 .run_with(Sink::queue())
1900 .unwrap();
1901 let mut segment_count = 0;
1902 while let Some(substream) = outer.pull().unwrap() {
1903 let items: Vec<i32> = substream.run_collect().unwrap();
1904 assert!(!items.is_empty(), "segment should not be empty");
1905 segment_count += 1;
1906 }
1907 assert_eq!(segment_count, 10, "100 elements in segments of 10");
1908 }
1909
1910 #[test]
1911 fn split_after_pre_buffer_segments_match_expected_count() {
1912 let outer = Source::from_iter(0..100)
1913 .split_after(|item| (*item + 1) % 10 == 0)
1914 .run_with(Sink::queue())
1915 .unwrap();
1916 let mut segment_count = 0;
1917 let mut total = 0_i32;
1918 while let Some(substream) = outer.pull().unwrap() {
1919 let items: Vec<i32> = substream.run_collect().unwrap();
1920 assert!(!items.is_empty(), "segment should not be empty");
1921 total += items.len() as i32;
1922 segment_count += 1;
1923 }
1924 assert_eq!(segment_count, 10);
1925 assert_eq!(total, 100);
1926 }
1927
1928 #[test]
1929 fn group_by_single_key_fused_matches_general_path() {
1930 let outer = Source::from_iter(0..1000i64)
1931 .group_by(1, |_| 0u8, false)
1932 .run_with(Sink::queue())
1933 .unwrap();
1934 let substream = outer.pull().unwrap().unwrap();
1935 let items: Vec<i64> = substream.run_collect().unwrap();
1936 assert_eq!(items.len(), 1000);
1937 assert_eq!(items[0], 0);
1938 assert_eq!(items[999], 999);
1939 assert!(outer.pull().unwrap().is_none());
1940 }
1941
1942 #[test]
1943 fn group_by_single_key_fused_handles_key_change_with_substream_limit() {
1944 let outer = Source::from_iter([0, 1, 0])
1945 .group_by(2, |item| *item, false)
1946 .run_with(Sink::queue())
1947 .unwrap();
1948 let mut sources = vec![];
1949 while let Some(source) = outer.pull().unwrap() {
1950 sources.push(source);
1951 }
1952 assert_eq!(sources.len(), 2);
1953 assert_eq!(sources[0].clone().run_collect().unwrap(), vec![0, 0]);
1954 assert_eq!(sources[1].clone().run_collect().unwrap(), vec![1]);
1955 }
1956
1957 #[test]
1958 fn flat_map_merge_lock_lighter_matches_expected_count() {
1959 let items = Source::from_iter(0..20)
1960 .flat_map_merge(2, |item| Source::single(item + 100))
1961 .run_with(Sink::queue())
1962 .unwrap();
1963 let mut count = 0;
1964 while items.pull().unwrap().is_some() {
1965 count += 1;
1966 }
1967 assert_eq!(count, 20);
1968 }
1969
1970 #[test]
1980 fn group_by_single_key_emits_substream_before_upstream_completes() {
1981 let (tx, rx) = mpsc::sync_channel::<i32>(0);
1984 let rx = StdArc::new(std::sync::Mutex::new(rx));
1985
1986 let outer = Source::from_factory({
1987 let rx = StdArc::clone(&rx);
1988 move || {
1989 let rx = StdArc::clone(&rx);
1990 Box::new(std::iter::from_fn(move || {
1991 rx.lock().unwrap().recv().ok().map(Ok)
1992 })) as BoxStream<i32>
1993 }
1994 })
1995 .group_by(1, |_| 0u8, false)
1996 .run_with(Sink::queue())
1997 .unwrap();
1998
1999 let (sub_tx, sub_rx) = mpsc::channel::<Source<i32>>();
2002 let outer_thread = thread::spawn(move || {
2003 let substream = outer.pull().unwrap().expect("expected a substream");
2004 sub_tx.send(substream).unwrap();
2005 });
2006
2007 tx.send(0).unwrap();
2010
2011 let substream = sub_rx
2013 .recv_timeout(StdDuration::from_secs(5))
2014 .expect("timed out — group_by buffered first element before emitting substream");
2015
2016 for i in 1..100_i32 {
2018 tx.send(i).unwrap();
2019 }
2020 drop(tx);
2021
2022 let items: Vec<i32> = substream.run_collect().unwrap();
2023 assert_eq!(items.len(), 100);
2024 outer_thread.join().unwrap();
2025 }
2026
2027 #[test]
2028 fn group_by_concurrent_live_substreams_do_not_hold_ready_item_stress() {
2029 const STREAMS: usize = 32;
2030 const ROUNDS: usize = 8;
2031 const ITEMS: i64 = 8;
2032
2033 for _ in 0..ROUNDS {
2034 let barrier = StdArc::new(std::sync::Barrier::new(STREAMS));
2035 let mut handles = Vec::with_capacity(STREAMS);
2036
2037 for _ in 0..STREAMS {
2038 let barrier = StdArc::clone(&barrier);
2039 handles.push(thread::spawn(move || {
2040 let (tx, rx) = mpsc::sync_channel::<i64>(0);
2041 let rx = StdArc::new(std::sync::Mutex::new(rx));
2042
2043 let outer = Source::from_factory({
2044 let rx = StdArc::clone(&rx);
2045 move || {
2046 let rx = StdArc::clone(&rx);
2047 Box::new(std::iter::from_fn(move || {
2048 rx.lock().unwrap().recv().ok().map(Ok)
2049 })) as BoxStream<i64>
2050 }
2051 })
2052 .group_by(1, |_| 0_u8, false)
2053 .run_with(Sink::queue())
2054 .unwrap();
2055
2056 barrier.wait();
2057
2058 tx.send(0).unwrap();
2059 let substream = outer.pull().unwrap().expect("expected group_by substream");
2060 let subqueue = substream.run_with(Sink::queue()).unwrap();
2061 assert_eq!(subqueue.pull().unwrap(), Some(0));
2062
2063 for item in 1..ITEMS {
2064 tx.send(item).unwrap();
2065 assert_eq!(subqueue.pull().unwrap(), Some(item));
2066 }
2067 drop(tx);
2068
2069 assert!(subqueue.pull().unwrap().is_none());
2070 assert!(outer.pull().unwrap().is_none());
2071 }));
2072 }
2073
2074 for handle in handles {
2075 handle.join().expect("group_by stress worker panicked");
2076 }
2077 }
2078 }
2079
2080 #[test]
2087 fn split_when_emits_substream_before_segment_ends() {
2088 const SEGMENT_LEN: usize = 300;
2092
2093 let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT_LEN * 2 + 4);
2094 for i in 0..SEGMENT_LEN as i32 {
2095 tx.send(i).unwrap();
2096 }
2097 tx.send(-1).unwrap(); tx.send(99).unwrap(); drop(tx);
2100
2101 let outer = Source::from_iter(rx)
2102 .split_when(|item| *item == -1)
2103 .run_with(Sink::queue())
2104 .unwrap();
2105
2106 let (result_tx, result_rx) = mpsc::channel();
2107 thread::spawn(move || {
2108 let first = outer.pull().unwrap().expect("expected first substream");
2109 let items: Vec<i32> = first.run_collect().unwrap();
2110 let second = outer.pull().unwrap().expect("expected second substream");
2111 let items2: Vec<i32> = second.run_collect().unwrap();
2112 let done = outer.pull().unwrap().is_none();
2113 let _ = result_tx.send((items, items2, done));
2114 });
2115
2116 let (items, items2, done) = result_rx
2117 .recv_timeout(StdDuration::from_secs(5))
2118 .expect("timed out — split_when is buffering the whole segment");
2119 assert_eq!(items.len(), SEGMENT_LEN);
2120 assert_eq!(items2, vec![-1, 99]);
2121 assert!(done);
2122 }
2123
2124 #[test]
2125 fn split_after_emits_substream_before_segment_ends() {
2126 const SEGMENT_LEN: usize = 300;
2127
2128 let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT_LEN * 2 + 4);
2129 for i in 0..SEGMENT_LEN as i32 {
2130 tx.send(i).unwrap();
2131 }
2132 tx.send(-1).unwrap(); tx.send(99).unwrap();
2134 drop(tx);
2135
2136 let outer = Source::from_iter(rx)
2137 .split_after(|item| *item == -1)
2138 .run_with(Sink::queue())
2139 .unwrap();
2140
2141 let (result_tx, result_rx) = mpsc::channel();
2142 thread::spawn(move || {
2143 let first = outer.pull().unwrap().expect("expected first substream");
2144 let items: Vec<i32> = first.run_collect().unwrap();
2145 let second = outer.pull().unwrap().expect("expected second substream");
2146 let items2: Vec<i32> = second.run_collect().unwrap();
2147 let done = outer.pull().unwrap().is_none();
2148 let _ = result_tx.send((items, items2, done));
2149 });
2150
2151 let (items, items2, done) = result_rx
2152 .recv_timeout(StdDuration::from_secs(5))
2153 .expect("timed out — split_after is buffering the whole segment");
2154 assert_eq!(items.len(), SEGMENT_LEN + 1);
2156 assert_eq!(items2, vec![99]);
2157 assert!(done);
2158 }
2159
2160 #[test]
2164 fn flat_map_merge_coordinator_no_lost_wakeup_stress() {
2165 for _ in 0..20 {
2166 let result = Source::from_iter(0..50_i32)
2167 .flat_map_merge(8, |item| Source::from_iter(item..item + 3))
2168 .run_with(Sink::fold(0i64, |acc, item| acc + item as i64))
2169 .unwrap()
2170 .wait();
2171 assert_eq!(result, Ok(3825), "flat_map_merge produced wrong sum");
2174 }
2175 }
2176
2177 #[test]
2182 fn flat_map_merge_single_mutex_race_stress() {
2183 for _ in 0..20 {
2184 let result = Source::from_iter(0..100_i64)
2185 .flat_map_merge(16, |item| Source::from_iter([item, item + 1000]))
2186 .run_with(Sink::fold(0i64, |acc, v| acc + v))
2187 .unwrap()
2188 .wait();
2189 assert_eq!(result, Ok(109_900), "flat_map_merge single-mutex stress");
2193 }
2194 }
2195
2196 #[test]
2203 fn split_when_bounded_memory_rendezvous() {
2204 const SEGMENT: usize = 100;
2207 let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT * 4);
2208 for i in 0..SEGMENT as i32 {
2209 tx.send(i).unwrap();
2210 }
2211 tx.send(-1).unwrap(); for i in 0..10_i32 {
2214 tx.send(i).unwrap();
2215 }
2216 drop(tx);
2217
2218 let outer = Source::from_iter(rx)
2219 .split_when(|item| *item == -1)
2220 .run_with(Sink::queue())
2221 .unwrap();
2222
2223 let (result_tx, result_rx) = mpsc::channel();
2224 thread::spawn(move || {
2225 let first = outer.pull().unwrap().expect("first segment");
2226 let seg1: Vec<i32> = first.run_collect().unwrap();
2227 let second = outer.pull().unwrap().expect("second segment");
2228 let seg2: Vec<i32> = second.run_collect().unwrap();
2229 let done = outer.pull().unwrap().is_none();
2230 result_tx.send((seg1, seg2, done)).unwrap();
2231 });
2232
2233 let (seg1, seg2, done) = result_rx
2234 .recv_timeout(StdDuration::from_secs(5))
2235 .expect("timed out — split_when writer held items past LIVE_SUBSTREAM_BATCH");
2236 assert_eq!(seg1.len(), SEGMENT, "first segment length");
2237 assert_eq!(seg2[0], -1, "boundary element starts second segment");
2238 assert_eq!(seg2.len(), 11, "second segment: boundary + 10 items");
2239 assert!(done);
2240 }
2241
2242 #[test]
2246 fn group_by_single_key_bounded_memory_rendezvous() {
2247 const N: usize = 200;
2250 let outer = Source::from_iter(0..N as i64)
2251 .group_by(1, |_| 0u8, false)
2252 .run_with(Sink::queue())
2253 .unwrap();
2254
2255 let (result_tx, result_rx) = mpsc::channel();
2256 thread::spawn(move || {
2257 let substream = outer.pull().unwrap().expect("substream");
2258 let items: Vec<i64> = substream.run_collect().unwrap();
2259 let done = outer.pull().unwrap().is_none();
2260 result_tx.send((items, done)).unwrap();
2261 });
2262
2263 let (items, done) = result_rx
2264 .recv_timeout(StdDuration::from_secs(5))
2265 .expect("timed out — group_by write batch held items beyond LIVE_SUBSTREAM_BATCH");
2266 assert_eq!(items.len(), N, "all items delivered");
2267 assert_eq!(items[0], 0);
2268 assert_eq!(items[N - 1], (N - 1) as i64);
2269 assert!(done);
2270 }
2271
2272 #[test]
2273 fn scan_emits_seed_and_accumulated_values() {
2274 let result = Source::from_iter(1..=3)
2275 .scan(0, |acc, item| acc + item)
2276 .run_collect()
2277 .unwrap();
2278
2279 assert_eq!(result, vec![0, 1, 3, 6]);
2280 }
2281
2282 #[test]
2283 fn limit_fails_after_max_elements() {
2284 let result = Source::from_iter(0..3).limit(2).run_collect();
2285
2286 assert_eq!(result, Err(StreamError::LimitExceeded { max: 2 }));
2287 }
2288
2289 #[test]
2290 fn limit_weighted_fails_with_limit_error_like_akka() {
2291 let result = Source::from_iter(["this", "is", "some", "string"])
2292 .via(Flow::identity().limit_weighted(15, |item: &&str| item.len()))
2293 .run_collect();
2294
2295 assert_eq!(result, Err(StreamError::LimitExceeded { max: 15 }));
2296 }
2297
2298 #[test]
2299 fn grouped_weighted_allows_oversized_first_element_like_akka() {
2300 let result = Source::from_iter([10_usize, 1, 2])
2301 .via(Flow::identity().grouped_weighted(5, |item: &usize| *item))
2302 .run_collect()
2303 .unwrap();
2304
2305 assert_eq!(result, vec![vec![10], vec![1, 2]]);
2306 }
2307
2308 #[test]
2309 fn grouped_weighted_keeps_oversized_later_element_in_current_group_like_akka() {
2310 let result = Source::from_iter([1_usize, 10, 2])
2311 .via(Flow::identity().grouped_weighted(5, |item: &usize| *item))
2312 .run_collect()
2313 .unwrap();
2314
2315 assert_eq!(result, vec![vec![1, 10], vec![2]]);
2316 }
2317
2318 #[test]
2319 fn sink_terminals_materialize_results() {
2320 let sum = Source::from_iter(1..=4)
2321 .run_with(Sink::fold(0, |acc, item| acc + item))
2322 .unwrap();
2323
2324 assert_eq!(wait(sum), 10);
2325 assert_eq!(
2326 wait(Source::from_iter(1..=4).run_with(Sink::head()).unwrap()),
2327 1
2328 );
2329 assert_eq!(
2330 wait(Source::from_iter(1..=4).run_with(Sink::last()).unwrap()),
2331 4
2332 );
2333 }
2334
2335 #[test]
2336 fn all_terminal_sink_variants_complete() {
2337 assert_eq!(
2338 wait(
2339 Source::from_iter([1, 2, 3])
2340 .run_with(Sink::collect())
2341 .unwrap()
2342 ),
2343 vec![1, 2, 3]
2344 );
2345 assert_eq!(
2346 wait(
2347 Source::<i32>::empty()
2348 .run_with(Sink::head_option())
2349 .unwrap()
2350 ),
2351 None
2352 );
2353 assert_eq!(
2354 wait(
2355 Source::from_iter([1, 2, 3])
2356 .run_with(Sink::last_option())
2357 .unwrap()
2358 ),
2359 Some(3)
2360 );
2361 assert_eq!(
2362 wait(
2363 Source::from_iter([1, 2, 3])
2364 .run_with(Sink::reduce(|acc, item| acc + item))
2365 .unwrap()
2366 ),
2367 6
2368 );
2369
2370 let seen = StdArc::new(StdAtomicUsize::new(0));
2371 let seen_by_sink = StdArc::clone(&seen);
2372 assert_eq!(
2373 wait(
2374 Source::from_iter([1_usize, 2, 3])
2375 .run_with(Sink::foreach(move |item| {
2376 seen_by_sink.fetch_add(item, StdOrdering::SeqCst);
2377 }))
2378 .unwrap()
2379 ),
2380 NotUsed
2381 );
2382 assert_eq!(seen.load(StdOrdering::SeqCst), 6);
2383 }
2384
2385 #[test]
2386 fn take_last_zero_returns_empty_vector() {
2387 let result = Source::from_iter([1, 2, 3])
2388 .run_with(Sink::take_last(0))
2389 .unwrap();
2390
2391 assert_eq!(wait(result), Vec::<i32>::new());
2392 }
2393
2394 #[test]
2395 fn bounded_head_terminals_complete_inline() {
2396 let materializer = Materializer::new();
2397
2398 let mut head = Source::from_iter(0_u64..1_000)
2399 .run_with_materializer(Sink::head(), &materializer)
2400 .unwrap();
2401 assert_eq!(materializer.active_streams(), 0);
2402 assert_eq!(head.try_wait(), Some(Ok(0)));
2403
2404 let mut filtered_head = Source::from_iter(0_u64..1_000)
2405 .filter(|item| *item >= 10)
2406 .run_with_materializer(Sink::head(), &materializer)
2407 .unwrap();
2408 assert_eq!(materializer.active_streams(), 0);
2409 assert_eq!(filtered_head.try_wait(), Some(Ok(10)));
2410
2411 let mut head_option = Source::<u64>::empty()
2412 .run_with_materializer(Sink::head_option(), &materializer)
2413 .unwrap();
2414 assert_eq!(materializer.active_streams(), 0);
2415 assert_eq!(head_option.try_wait(), Some(Ok(None)));
2416 }
2417
2418 #[test]
2419 fn bounded_head_fast_path_preserves_terminal_errors() {
2420 let materializer = Materializer::new();
2421
2422 let mut empty = Source::<u64>::empty()
2423 .run_with_materializer(Sink::head(), &materializer)
2424 .unwrap();
2425 assert_eq!(empty.try_wait(), Some(Err(StreamError::EmptyStream)));
2426
2427 let mut failed = Source::<u64>::failed(StreamError::Failed("boom".into()))
2428 .run_with_materializer(Sink::head(), &materializer)
2429 .unwrap();
2430 assert_eq!(
2431 failed.try_wait(),
2432 Some(Err(StreamError::Failed("boom".into())))
2433 );
2434 assert_eq!(materializer.active_streams(), 0);
2435 }
2436
2437 #[test]
2438 fn runnable_graph_composes_source_and_sink() {
2439 let graph = Source::from_iter(1..=4)
2440 .map(|item| item * 2)
2441 .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::right);
2442
2443 assert_eq!(wait(graph.run().unwrap()), 20);
2444
2445 let graph = Source::single(1)
2446 .map_materialized_value(|_| 20)
2447 .to(Sink::ignore())
2448 .map_materialized_value(|value| value + 1);
2449 assert_eq!(graph.run().unwrap(), 21);
2450
2451 let ignored = Source::single(1).to(Sink::ignore()).run().unwrap();
2452 assert_eq!(ignored, NotUsed);
2453 }
2454
2455 #[test]
2456 fn materialized_values_follow_keep_defaults() {
2457 let source = Source::single(1).map_materialized_value(|_| "source");
2458 let flow = Flow::identity().map_materialized_value(|_| "flow");
2459
2460 let source_mat = source.clone().via(flow.clone()).to(Sink::ignore()).run();
2461 assert_eq!(source_mat.unwrap(), "source");
2462
2463 let combined = source
2464 .via_mat(flow, Keep::both)
2465 .to_mat(Sink::ignore(), Keep::both)
2466 .run()
2467 .unwrap();
2468 assert_eq!(combined.0, ("source", "flow"));
2469 assert_eq!(wait(combined.1), NotUsed);
2470
2471 let sink_mat = Source::single(41)
2472 .map_materialized_value(|_| "ignored source")
2473 .run_with(Sink::fold(1, |acc, item| acc + item))
2474 .unwrap();
2475 assert_eq!(wait(sink_mat), 42);
2476 }
2477
2478 #[test]
2479 fn flow_to_sink_preserves_flow_materialized_value_by_default() {
2480 let sink = Flow::identity()
2481 .map(|item: i32| item + 1)
2482 .map_materialized_value(|_| "flow")
2483 .to(Sink::fold(0, |acc, item| acc + item));
2484
2485 let materialized = Source::from_iter([1, 2, 3]).run_with(sink).unwrap();
2486
2487 assert_eq!(materialized, "flow");
2488 let explicit = Flow::identity()
2489 .map(|item: i32| item + 1)
2490 .map_materialized_value(|_| "flow")
2491 .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both)
2492 .run_with(Source::from_iter([1, 2, 3]))
2493 .unwrap();
2494 assert_eq!(explicit, NotUsed);
2495
2496 let explicit = Source::from_iter([1, 2, 3])
2497 .run_with(
2498 Flow::identity()
2499 .map(|item: i32| item + 1)
2500 .map_materialized_value(|_| "flow")
2501 .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both),
2502 )
2503 .unwrap();
2504 assert_eq!(explicit.0, "flow");
2505 assert_eq!(wait(explicit.1), 9);
2506 }
2507
2508 #[test]
2509 fn materializer_shutdown_fails_materialization() {
2510 let materializer = Materializer::new();
2511 let named = materializer.with_name_prefix("test-stream");
2512 materializer.shutdown();
2513
2514 let graph = Source::single(1).to(Sink::ignore());
2515
2516 assert_eq!(named.name_prefix(), "test-stream");
2517 assert_eq!(
2518 graph.run_with_materializer(&named),
2519 Err(StreamError::AbruptTermination)
2520 );
2521 }
2522
2523 #[test]
2524 fn materializer_shutdown_fails_running_stream_completion() {
2525 let materializer = Materializer::new();
2526 let completion = Source::repeat(1)
2527 .run_with_materializer(Sink::ignore(), &materializer)
2528 .unwrap();
2529
2530 assert_eq!(materializer.active_streams(), 1);
2531 materializer.shutdown();
2532 assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
2533 assert_eq!(materializer.active_streams(), 0);
2534 }
2535
2536 #[test]
2537 fn dropped_stream_completion_cancels_running_stream() {
2538 let materializer = Materializer::new();
2539 let completion = Source::repeat(1)
2540 .run_with_materializer(Sink::ignore(), &materializer)
2541 .unwrap();
2542
2543 assert_eq!(materializer.active_streams(), 1);
2544 drop(completion);
2545 for _ in 0..50 {
2546 if materializer.active_streams() == 0 {
2547 break;
2548 }
2549 thread::sleep(Duration::from_millis(5));
2550 }
2551 assert_eq!(materializer.active_streams(), 0);
2552 }
2553
2554 #[test]
2555 fn runtime_timers_fire_cancel_and_stop_on_shutdown() {
2556 let materializer = Materializer::new();
2557 let (once_tx, once_rx) = mpsc::channel();
2558 let once = materializer.schedule_once(Duration::from_millis(5), move || {
2559 once_tx.send(()).unwrap();
2560 });
2561 once_rx.recv_timeout(Duration::from_millis(250)).unwrap();
2562 assert!(!once.is_cancelled());
2563
2564 let (cancelled_tx, cancelled_rx) = mpsc::channel();
2565 let cancelled = materializer.schedule_once(Duration::from_millis(25), move || {
2566 cancelled_tx.send(()).unwrap();
2567 });
2568 assert!(cancelled.cancel());
2569 assert!(!cancelled.cancel());
2570 assert!(cancelled.is_cancelled());
2571 assert!(
2572 cancelled_rx
2573 .recv_timeout(Duration::from_millis(75))
2574 .is_err()
2575 );
2576
2577 let fixed_delay_count = StdArc::new(StdAtomicUsize::new(0));
2578 let fixed_delay_task_count = StdArc::clone(&fixed_delay_count);
2579 let fixed_delay = materializer.schedule_with_fixed_delay(
2580 Duration::from_millis(1),
2581 Duration::from_millis(5),
2582 move || {
2583 fixed_delay_task_count.fetch_add(1, StdOrdering::SeqCst);
2584 },
2585 );
2586 thread::sleep(Duration::from_millis(25));
2587 assert!(fixed_delay_count.load(StdOrdering::SeqCst) > 0);
2588 fixed_delay.cancel();
2589
2590 let fixed_rate_count = StdArc::new(StdAtomicUsize::new(0));
2591 let fixed_rate_task_count = StdArc::clone(&fixed_rate_count);
2592 let fixed_rate = materializer.schedule_at_fixed_rate(
2593 Duration::from_millis(1),
2594 Duration::from_millis(5),
2595 move || {
2596 fixed_rate_task_count.fetch_add(1, StdOrdering::SeqCst);
2597 },
2598 );
2599 thread::sleep(Duration::from_millis(25));
2600 assert!(fixed_rate_count.load(StdOrdering::SeqCst) > 0);
2601 fixed_rate.cancel();
2602
2603 let shutdown_materializer = Materializer::new();
2604 let (shutdown_tx, shutdown_rx) = mpsc::channel();
2605 shutdown_materializer.schedule_once(Duration::from_millis(25), move || {
2606 shutdown_tx.send(()).unwrap();
2607 });
2608 shutdown_materializer.shutdown();
2609 assert!(shutdown_rx.recv_timeout(Duration::from_millis(75)).is_err());
2610 }
2611
2612 #[test]
2613 fn runtime_timer_driver_preserves_fixed_rate_cadence_under_slow_tasks() {
2614 use std::sync::{Condvar, Mutex};
2615
2616 #[derive(Debug)]
2617 enum TimerEvent {
2618 Started(usize, Instant),
2619 Completed(usize, Instant),
2620 }
2621
2622 let recv_event = |rx: &mpsc::Receiver<TimerEvent>, label: &str| {
2623 rx.recv_timeout(Duration::from_secs(20))
2624 .unwrap_or_else(|err| panic!("{label}: expected timer event within 20 s: {err}"))
2625 };
2626 let release = |gate: &StdArc<(Mutex<bool>, Condvar)>| {
2627 let (released, condvar) = &**gate;
2628 let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2629 *released = true;
2630 condvar.notify_all();
2631 };
2632
2633 let interval = Duration::from_secs(2);
2634 let overrun = interval + Duration::from_millis(250);
2635
2636 let rate_materializer = Materializer::new();
2637 let (rate_tx, rate_rx) = mpsc::channel();
2638 let rate_runs = StdArc::new(StdAtomicUsize::new(0));
2639 let rate_task_runs = StdArc::clone(&rate_runs);
2640 let rate_gate = StdArc::new((Mutex::new(false), Condvar::new()));
2641 let rate_task_gate = StdArc::clone(&rate_gate);
2642 let fixed_rate =
2643 rate_materializer.schedule_at_fixed_rate(Duration::ZERO, interval, move || {
2644 let run = rate_task_runs.fetch_add(1, StdOrdering::SeqCst) + 1;
2645 rate_tx
2646 .send(TimerEvent::Started(run, Instant::now()))
2647 .unwrap();
2648 if run == 1 {
2649 let (released, condvar) = &*rate_task_gate;
2650 let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2651 while !*released {
2652 released = condvar
2653 .wait(released)
2654 .unwrap_or_else(|poison| poison.into_inner());
2655 }
2656 rate_tx
2657 .send(TimerEvent::Completed(run, Instant::now()))
2658 .unwrap();
2659 }
2660 });
2661 let rate_first_started = match recv_event(&rate_rx, "fixed-rate first task") {
2662 TimerEvent::Started(1, at) => at,
2663 other => panic!("fixed-rate first task: unexpected event {other:?}"),
2664 };
2665 assert!(wait_until(Duration::from_secs(20), || {
2666 rate_first_started.elapsed() >= overrun
2667 }));
2668 release(&rate_gate);
2669 let rate_first_completed = match recv_event(&rate_rx, "fixed-rate first completion") {
2670 TimerEvent::Completed(1, at) => at,
2671 other => panic!("fixed-rate first completion: unexpected event {other:?}"),
2672 };
2673 let rate_second_started = match recv_event(&rate_rx, "fixed-rate second task") {
2674 TimerEvent::Started(2, at) => at,
2675 other => panic!("fixed-rate second task: unexpected event {other:?}"),
2676 };
2677 fixed_rate.cancel();
2678 rate_materializer.shutdown();
2679
2680 let delay_materializer = Materializer::new();
2681 let (delay_tx, delay_rx) = mpsc::channel();
2682 let delay_runs = StdArc::new(StdAtomicUsize::new(0));
2683 let delay_task_runs = StdArc::clone(&delay_runs);
2684 let delay_gate = StdArc::new((Mutex::new(false), Condvar::new()));
2685 let delay_task_gate = StdArc::clone(&delay_gate);
2686 let fixed_delay =
2687 delay_materializer.schedule_with_fixed_delay(Duration::ZERO, interval, move || {
2688 let run = delay_task_runs.fetch_add(1, StdOrdering::SeqCst) + 1;
2689 delay_tx
2690 .send(TimerEvent::Started(run, Instant::now()))
2691 .unwrap();
2692 if run == 1 {
2693 let (released, condvar) = &*delay_task_gate;
2694 let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2695 while !*released {
2696 released = condvar
2697 .wait(released)
2698 .unwrap_or_else(|poison| poison.into_inner());
2699 }
2700 delay_tx
2701 .send(TimerEvent::Completed(run, Instant::now()))
2702 .unwrap();
2703 }
2704 });
2705 let delay_first_started = match recv_event(&delay_rx, "fixed-delay first task") {
2706 TimerEvent::Started(1, at) => at,
2707 other => panic!("fixed-delay first task: unexpected event {other:?}"),
2708 };
2709 assert!(wait_until(Duration::from_secs(20), || {
2710 delay_first_started.elapsed() >= overrun
2711 }));
2712 release(&delay_gate);
2713 let delay_first_completed = match recv_event(&delay_rx, "fixed-delay first completion") {
2714 TimerEvent::Completed(1, at) => at,
2715 other => panic!("fixed-delay first completion: unexpected event {other:?}"),
2716 };
2717 let delay_second_started = match recv_event(&delay_rx, "fixed-delay second task") {
2718 TimerEvent::Started(2, at) => at,
2719 other => panic!("fixed-delay second task: unexpected event {other:?}"),
2720 };
2721 fixed_delay.cancel();
2722 delay_materializer.shutdown();
2723
2724 let rate_task_time = rate_first_completed.duration_since(rate_first_started);
2725 let rate_catch_up = rate_second_started.duration_since(rate_first_completed);
2726 let delay_task_time = delay_first_completed.duration_since(delay_first_started);
2727 let delay_gap = delay_second_started.duration_since(delay_first_completed);
2728 assert!(
2729 rate_task_time >= interval,
2730 "fixed-rate first task should overrun its interval; ran for {rate_task_time:?}"
2731 );
2732 assert!(
2733 rate_catch_up < interval,
2734 "fixed-rate second task should catch up after an overrun; waited {rate_catch_up:?}"
2735 );
2736 assert!(
2737 delay_task_time >= interval,
2738 "fixed-delay first task should overrun its interval; ran for {delay_task_time:?}"
2739 );
2740 assert!(
2741 delay_gap >= interval,
2742 "fixed-delay second task fired before one full delay elapsed after completion: {delay_gap:?}",
2743 );
2744 }
2745
2746 #[test]
2747 fn runtime_repeating_timer_cancellation_stops_future_fires() {
2748 let materializer = Materializer::new();
2749 let (tx, rx) = mpsc::channel();
2750 let timer = materializer.schedule_at_fixed_rate(
2751 Duration::from_millis(1),
2752 Duration::from_millis(30),
2753 move || {
2754 tx.send(()).unwrap();
2755 },
2756 );
2757
2758 rx.recv_timeout(Duration::from_millis(250)).unwrap();
2759 assert!(timer.cancel());
2760 assert!(rx.recv_timeout(Duration::from_millis(90)).is_err());
2761 materializer.shutdown();
2762 }
2763
2764 #[test]
2765 fn runtime_panicking_once_timer_does_not_kill_driver_or_later_timers() {
2766 let materializer = Materializer::new();
2767 materializer.schedule_once(Duration::from_millis(1), || {
2768 panic!("timer boom");
2769 });
2770
2771 let (tx, rx) = mpsc::channel();
2772 materializer.schedule_once(Duration::from_millis(20), move || {
2773 tx.send(()).unwrap();
2774 });
2775
2776 rx.recv_timeout(Duration::from_millis(250)).unwrap();
2777 materializer.shutdown();
2778 }
2779
2780 #[test]
2781 fn runtime_panicking_fixed_rate_timer_stops_itself_and_leaves_driver_alive() {
2782 let materializer = Materializer::new();
2783 let panic_count = StdArc::new(StdAtomicUsize::new(0));
2784 let panic_count_task = StdArc::clone(&panic_count);
2785 materializer.schedule_at_fixed_rate(Duration::ZERO, Duration::from_millis(20), move || {
2786 panic_count_task.fetch_add(1, StdOrdering::SeqCst);
2787 panic!("fixed-rate boom");
2788 });
2789
2790 assert!(wait_until(Duration::from_millis(150), || {
2791 panic_count.load(StdOrdering::SeqCst) == 1
2792 }));
2793
2794 let (tx, rx) = mpsc::channel();
2795 materializer.schedule_once(Duration::from_millis(30), move || {
2796 tx.send(()).unwrap();
2797 });
2798 rx.recv_timeout(Duration::from_millis(250)).unwrap();
2799
2800 thread::sleep(Duration::from_millis(90));
2801 assert_eq!(panic_count.load(StdOrdering::SeqCst), 1);
2802 materializer.shutdown();
2803 }
2804
2805 #[test]
2806 fn runtime_slow_timer_task_does_not_delay_unrelated_timers() {
2807 let materializer = Materializer::new();
2808 let started = StdArc::new(StdAtomicBool::new(false));
2809 let started_task = StdArc::clone(&started);
2810 let slow_timer = materializer.schedule_at_fixed_rate(
2811 Duration::ZERO,
2812 Duration::from_millis(250),
2813 move || {
2814 started_task.store(true, StdOrdering::SeqCst);
2815 thread::sleep(Duration::from_millis(200));
2816 },
2817 );
2818
2819 assert!(wait_until(Duration::from_millis(100), || {
2820 started.load(StdOrdering::SeqCst)
2821 }));
2822
2823 let start = Instant::now();
2824 let (tx, rx) = mpsc::channel();
2825 materializer.schedule_once(Duration::from_millis(10), move || {
2826 tx.send(Instant::now()).unwrap();
2827 });
2828 let fired_at = rx.recv_timeout(Duration::from_millis(350)).unwrap();
2829 let elapsed = fired_at.duration_since(start);
2830
2831 slow_timer.cancel();
2832 materializer.shutdown();
2833 assert!(
2834 elapsed < Duration::from_millis(150),
2835 "unrelated timer was delayed by a blocking timer task: {elapsed:?}",
2836 );
2837 }
2838
2839 #[test]
2840 fn runtime_shutdown_stops_timer_driver_thread() {
2841 let materializer = Materializer::new();
2842 assert!(wait_until(Duration::from_secs(1), || materializer
2843 .timer_driver_is_live()));
2844
2845 materializer.shutdown();
2846 assert!(wait_until(Duration::from_secs(2), || !materializer
2847 .timer_driver_is_live()));
2848 }
2849
2850 #[test]
2851 fn runtime_timer_driver_orders_many_timers_by_deadline() {
2852 let materializer = Materializer::new();
2853 let (tx, rx) = mpsc::channel();
2854 let schedule = [(450_u64, 4_u8), (50, 1), (350, 3), (150, 2), (550, 5)];
2855
2856 for (delay_ms, value) in schedule {
2857 let tx = tx.clone();
2858 materializer.schedule_once(Duration::from_millis(delay_ms), move || {
2859 tx.send(value).unwrap();
2860 });
2861 }
2862 drop(tx);
2863
2864 let mut received = Vec::new();
2865 for _ in 0..schedule.len() {
2866 received.push(rx.recv_timeout(Duration::from_secs(10)).unwrap());
2867 }
2868 materializer.shutdown();
2869
2870 assert_eq!(received, vec![1, 2, 3, 4, 5]);
2871 }
2872
2873 #[test]
2874 fn runtime_timer_driver_uses_one_thread_per_runtime_regardless_of_timer_count() {
2875 let materializer = Materializer::new();
2876 let thread_name = materializer.timer_thread_name().to_owned();
2877 let linux_thread_name = thread_name.chars().take(15).collect::<String>();
2883 assert!(wait_until(Duration::from_secs(5), || {
2884 materializer.timer_driver_is_live() && linux_thread_count(&linux_thread_name) >= 1
2885 }));
2886 let live_timer_threads = linux_thread_count(&linux_thread_name);
2887
2888 for _ in 0..128 {
2889 materializer.schedule_once(Duration::from_secs(60), || {});
2890 }
2891
2892 assert!(
2893 wait_until(Duration::from_secs(5), || {
2894 materializer.timer_driver_is_live()
2895 && linux_thread_count(&linux_thread_name) == live_timer_threads
2896 }),
2897 "scheduling timers should not create extra timer threads for a runtime",
2898 );
2899 materializer.shutdown();
2900 assert!(wait_until(Duration::from_secs(5), || {
2901 !materializer.timer_driver_is_live()
2902 && linux_thread_count(&linux_thread_name) < live_timer_threads
2903 }));
2904 }
2905
2906 #[test]
2907 fn cancelled_and_never_sinks_have_distinct_materialization_results() {
2908 assert_eq!(
2909 Source::repeat(1)
2910 .run_with(Sink::cancelled())
2911 .expect("cancelled sink materializes"),
2912 NotUsed
2913 );
2914 assert_eq!(
2915 Source::single(1)
2916 .run_with(Sink::never())
2917 .expect("never sink materializes")
2918 .try_wait(),
2919 None
2920 );
2921 }
2922
2923 #[test]
2924 fn never_sink_finishes_on_materializer_shutdown() {
2925 let materializer = Materializer::new();
2926 let completion = Source::single(1)
2927 .run_with_materializer(Sink::never(), &materializer)
2928 .unwrap();
2929
2930 materializer.shutdown();
2931 assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
2932 }
2933
2934 #[test]
2935 fn dropping_source_never_completion_releases_parked_worker() {
2936 let materializer = Materializer::new();
2937 let completion = Source::<i32>::never()
2938 .run_with_materializer(Sink::ignore(), &materializer)
2939 .unwrap();
2940
2941 assert!(wait_until(StdDuration::from_secs(1), || {
2942 materializer.active_streams() == 1
2943 }));
2944 assert_eq!(materializer.active_streams(), 1);
2945
2946 drop(completion);
2947
2948 assert!(wait_until(StdDuration::from_secs(15), || {
2949 materializer.active_streams() == 0
2950 }));
2951 assert_eq!(materializer.active_streams(), 0);
2952 }
2953
2954 #[test]
2955 fn future_and_maybe_sources_emit_values() {
2956 let future_value = Source::future(|| async { Ok(7) }).run_collect().unwrap();
2957 assert_eq!(future_value, vec![7]);
2958
2959 let future_source = Source::future_source(|| async { Ok(Source::from_iter([1, 2, 3])) })
2960 .run_collect()
2961 .unwrap();
2962 assert_eq!(future_source, vec![1, 2, 3]);
2963
2964 let (handle, source) = Source::maybe();
2965 assert_eq!(
2966 source.clone().run_collect(),
2967 Err(StreamError::MaybeIncomplete)
2968 );
2969 handle.complete(9).unwrap();
2970 assert_eq!(source.run_collect().unwrap(), vec![9]);
2971 }
2972
2973 #[test]
2974 fn wp6b_source_generators_emit_and_fail_like_stream_errors() {
2975 assert_eq!(
2976 Source::cycle(|| [1, 2, 3].into_iter())
2977 .take(8)
2978 .run_collect()
2979 .unwrap(),
2980 vec![1, 2, 3, 1, 2, 3, 1, 2]
2981 );
2982 assert_eq!(
2983 Source::<i32>::cycle(std::iter::empty::<i32>).run_collect(),
2984 Err(StreamError::Failed("empty iterator".into()))
2985 );
2986 assert_eq!(
2987 Source::unfold(0, |state| (state < 4).then_some((state + 1, state)))
2988 .run_collect()
2989 .unwrap(),
2990 vec![0, 1, 2, 3]
2991 );
2992 assert_eq!(
2993 Source::unfold_async(0, |state| async move {
2994 Ok((state < 4).then_some((state + 1, state * 2)))
2995 })
2996 .run_collect()
2997 .unwrap(),
2998 vec![0, 2, 4, 6]
2999 );
3000 assert!(matches!(
3001 Source::<i32>::lazy_single(|| panic!("boom")).run_collect(),
3002 Err(StreamError::Failed(message)) if message == "lazy_single factory panicked"
3003 ));
3004 }
3005
3006 #[test]
3007 fn wp6b_lazy_sources_defer_until_first_pull_and_complete_deferred_mat() {
3008 let created = StdArc::new(StdAtomicUsize::new(0));
3009 let created_for_source = StdArc::clone(&created);
3010 let source = Source::<i32>::lazy_source(move || {
3011 created_for_source.fetch_add(1, StdOrdering::SeqCst);
3012 Source::from_iter([7, 8]).map_materialized_value(|_| 99)
3013 });
3014 let materializer = Materializer::new();
3015 let (mut stream, mut mat) = StdArc::clone(&source.factory)
3016 .create(&materializer)
3017 .unwrap();
3018
3019 assert_eq!(created.load(StdOrdering::SeqCst), 0);
3020 assert!(mat.try_wait().is_none());
3021 assert_eq!(stream.next().unwrap().unwrap(), 7);
3022 assert_eq!(mat.wait().unwrap(), 99);
3023 assert_eq!(created.load(StdOrdering::SeqCst), 1);
3024 assert_eq!(stream.next().unwrap().unwrap(), 8);
3025
3026 let never_created = StdArc::new(StdAtomicUsize::new(0));
3027 let never_created_for_source = StdArc::clone(&never_created);
3028 let mat = Source::<i32>::lazy_future_source(move || {
3029 never_created_for_source.fetch_add(1, StdOrdering::SeqCst);
3030 async { Ok(Source::single(1)) }
3031 })
3032 .to(Sink::cancelled())
3033 .run()
3034 .unwrap();
3035 assert!(matches!(mat.wait(), Err(StreamError::Failed(_))));
3036 assert_eq!(never_created.load(StdOrdering::SeqCst), 0);
3037
3038 let lazy_future = StdArc::new(StdAtomicUsize::new(0));
3039 let lazy_future_for_source = StdArc::clone(&lazy_future);
3040 let source = Source::lazy_future(move || {
3041 lazy_future_for_source.fetch_add(1, StdOrdering::SeqCst);
3042 async { Ok(42) }
3043 });
3044 let (mut stream, _) = StdArc::clone(&source.factory)
3045 .create(&Materializer::new())
3046 .unwrap();
3047 assert_eq!(lazy_future.load(StdOrdering::SeqCst), 0);
3048 assert_eq!(stream.next().unwrap().unwrap(), 42);
3049 assert_eq!(lazy_future.load(StdOrdering::SeqCst), 1);
3050 }
3051
3052 #[test]
3053 fn wp6b_unfold_resource_closes_on_completion_failure_and_cancellation() {
3054 let closed = StdArc::new(StdAtomicUsize::new(0));
3055 let closed_on_complete = StdArc::clone(&closed);
3056 let values = Source::unfold_resource(
3057 || Ok(std::collections::VecDeque::from([1, 2, 3])),
3058 |items| Ok(items.pop_front()),
3059 move |_items| {
3060 closed_on_complete.fetch_add(1, StdOrdering::SeqCst);
3061 Ok(())
3062 },
3063 )
3064 .run_collect()
3065 .unwrap();
3066 assert_eq!(values, vec![1, 2, 3]);
3067 assert_eq!(closed.load(StdOrdering::SeqCst), 1);
3068
3069 let closed_on_failure = StdArc::new(StdAtomicUsize::new(0));
3070 let closed_on_failure_for_close = StdArc::clone(&closed_on_failure);
3071 let failed = Source::<i32>::unfold_resource(
3072 || Ok(()),
3073 |_| Err(StreamError::Failed("read".into())),
3074 move |_| {
3075 closed_on_failure_for_close.fetch_add(1, StdOrdering::SeqCst);
3076 Err(StreamError::Failed("close".into()))
3077 },
3078 )
3079 .run_collect();
3080 assert_eq!(failed, Err(StreamError::Failed("read".into())));
3081 assert_eq!(closed_on_failure.load(StdOrdering::SeqCst), 1);
3082
3083 let closed_on_cancel = StdArc::new(StdAtomicUsize::new(0));
3084 let closed_on_cancel_for_close = StdArc::clone(&closed_on_cancel);
3085 let first = Source::unfold_resource(
3086 || Ok(0_usize),
3087 |next| {
3088 let item = *next;
3089 *next += 1;
3090 Ok(Some(item))
3091 },
3092 move |_| {
3093 closed_on_cancel_for_close.fetch_add(1, StdOrdering::SeqCst);
3094 Ok(())
3095 },
3096 )
3097 .run_with(Sink::head())
3098 .unwrap();
3099 assert_eq!(first.wait().unwrap(), 0);
3100 assert!(wait_until(Duration::from_millis(250), || {
3101 closed_on_cancel.load(StdOrdering::SeqCst) == 1
3102 }));
3103 }
3104
3105 #[test]
3106 fn wp6b_async_resource_and_async_accumulators_are_sequential() {
3107 let closed = StdArc::new(StdAtomicUsize::new(0));
3108 let closed_for_close = StdArc::clone(&closed);
3109 let values = Source::unfold_resource_async(
3110 || async { Ok(std::collections::VecDeque::from([1, 2, 3])) },
3111 |items| {
3112 let item = items.pop_front();
3113 async move { Ok(item) }
3114 },
3115 move |_items| {
3116 let closed = StdArc::clone(&closed_for_close);
3117 async move {
3118 closed.fetch_add(1, StdOrdering::SeqCst);
3119 Ok(())
3120 }
3121 },
3122 )
3123 .run_collect()
3124 .unwrap();
3125 assert_eq!(values, vec![1, 2, 3]);
3126 assert_eq!(closed.load(StdOrdering::SeqCst), 1);
3127
3128 let closed_on_failure = StdArc::new(StdAtomicUsize::new(0));
3129 let closed_on_failure_for_close = StdArc::clone(&closed_on_failure);
3130 let failed = Source::<i32>::unfold_resource_async(
3131 || async { Ok(()) },
3132 |_resource| async { Err(StreamError::Failed("read".into())) },
3133 move |_resource| {
3134 let closed_on_failure = StdArc::clone(&closed_on_failure_for_close);
3135 async move {
3136 closed_on_failure.fetch_add(1, StdOrdering::SeqCst);
3137 Err(StreamError::Failed("close".into()))
3138 }
3139 },
3140 )
3141 .run_collect();
3142 assert_eq!(failed, Err(StreamError::Failed("read".into())));
3143 assert_eq!(closed_on_failure.load(StdOrdering::SeqCst), 1);
3144
3145 let active = StdArc::new(StdAtomicUsize::new(0));
3146 let max_active = StdArc::new(StdAtomicUsize::new(0));
3147 let active_for_stage = StdArc::clone(&active);
3148 let max_for_stage = StdArc::clone(&max_active);
3149 let scanned = Source::from_iter(1..=4)
3150 .scan_async(0, move |acc, item| {
3151 let active = StdArc::clone(&active_for_stage);
3152 let max_active = StdArc::clone(&max_for_stage);
3153 async move {
3154 let now = active.fetch_add(1, StdOrdering::SeqCst) + 1;
3155 max_active.fetch_max(now, StdOrdering::SeqCst);
3156 tokio::time::sleep(Duration::from_millis(1)).await;
3157 active.fetch_sub(1, StdOrdering::SeqCst);
3158 Ok(acc + item)
3159 }
3160 })
3161 .run_collect()
3162 .unwrap();
3163 assert_eq!(scanned, vec![0, 1, 3, 6, 10]);
3164 assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
3165
3166 let folded = Source::from_iter(1..=4)
3167 .fold_async(0, |acc, item| async move { Ok(acc + item) })
3168 .run_collect()
3169 .unwrap();
3170 assert_eq!(folded, vec![10]);
3171 }
3172
3173 #[test]
3174 fn wp6b_fold_async_materialization_does_not_drain_upstream() {
3175 let release = StdArc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
3176 let started = StdArc::new(StdAtomicBool::new(false));
3177 let source = {
3178 let release = StdArc::clone(&release);
3179 let started = StdArc::clone(&started);
3180 Source::from_factory(move || {
3181 let release = StdArc::clone(&release);
3182 let started = StdArc::clone(&started);
3183 let mut emitted = false;
3184 Box::new(std::iter::from_fn(move || {
3185 if emitted {
3186 return None;
3187 }
3188 emitted = true;
3189 started.store(true, StdOrdering::SeqCst);
3190 let (released, available) = &*release;
3191 let mut released = released.lock().unwrap();
3192 while !*released {
3193 released = available.wait(released).unwrap();
3194 }
3195 Some(Ok(1))
3196 }))
3197 })
3198 };
3199
3200 let (materialized_tx, materialized_rx) = mpsc::channel();
3201 let join = thread::spawn(move || {
3202 let queue = source
3203 .fold_async(0, |acc, item| async move { Ok(acc + item) })
3204 .run_with(Sink::queue())
3205 .unwrap();
3206 materialized_tx.send(queue).unwrap();
3207 });
3208
3209 let queue = match materialized_rx.recv_timeout(StdDuration::from_secs(1)) {
3210 Ok(queue) => queue,
3211 Err(error) => {
3212 let (released, available) = &*release;
3213 *released.lock().unwrap() = true;
3214 available.notify_all();
3215 let _ = join.join();
3216 panic!("fold_async materialization did not return before first pull: {error}");
3217 }
3218 };
3219 let (released, _) = &*release;
3220 assert!(
3221 !*released.lock().unwrap(),
3222 "test source was released before materialization returned"
3223 );
3224
3225 let (released, available) = &*release;
3226 *released.lock().unwrap() = true;
3227 available.notify_all();
3228 assert_eq!(queue.pull().unwrap(), Some(1));
3229 assert_eq!(queue.pull().unwrap(), None);
3230 join.join().unwrap();
3231 assert!(started.load(StdOrdering::SeqCst));
3232 }
3233
3234 #[test]
3235 fn wp6b_lazy_sink_and_flow_wait_for_first_element() {
3236 let lazy_sink_created = StdArc::new(StdAtomicUsize::new(0));
3237 let lazy_sink_created_for_factory = StdArc::clone(&lazy_sink_created);
3238 let empty_sink = Source::<i32>::empty()
3239 .run_with(Sink::lazy_sink(move || {
3240 lazy_sink_created_for_factory.fetch_add(1, StdOrdering::SeqCst);
3241 Sink::ignore()
3242 }))
3243 .unwrap();
3244 assert!(matches!(empty_sink.wait(), Err(StreamError::Failed(_))));
3245 assert_eq!(lazy_sink_created.load(StdOrdering::SeqCst), 0);
3246
3247 let foreach_sum = StdArc::new(StdAtomicUsize::new(0));
3248 let foreach_sum_for_sink = StdArc::clone(&foreach_sum);
3249 Source::from_iter([1_usize, 2, 3])
3250 .run_with(Sink::foreach_async(2, move |item| {
3251 let foreach_sum = StdArc::clone(&foreach_sum_for_sink);
3252 async move {
3253 foreach_sum.fetch_add(item, StdOrdering::SeqCst);
3254 Ok(())
3255 }
3256 }))
3257 .unwrap()
3258 .wait()
3259 .unwrap();
3260 assert_eq!(foreach_sum.load(StdOrdering::SeqCst), 6);
3261
3262 let lazy_flow_created = StdArc::new(StdAtomicUsize::new(0));
3263 let lazy_flow_created_for_factory = StdArc::clone(&lazy_flow_created);
3264 let lazy_flow = Flow::<i32, i32>::lazy_flow(move || {
3265 lazy_flow_created_for_factory.fetch_add(1, StdOrdering::SeqCst);
3266 Flow::identity()
3267 .map(|item: i32| item + 10)
3268 .map_materialized_value(|_| 123)
3269 });
3270 let mat = (lazy_flow.materialize)().unwrap();
3271 let mut stream = match lazy_flow.transform {
3272 flow::FlowTransform::Runtime(transform) => {
3273 transform(Box::new([Ok(1), Ok(2)].into_iter()), &Materializer::new()).unwrap()
3274 }
3275 flow::FlowTransform::Pure(_) => panic!("lazy flow must be runtime-backed"),
3276 };
3277 assert_eq!(lazy_flow_created.load(StdOrdering::SeqCst), 0);
3278 assert_eq!(stream.next().unwrap().unwrap(), 11);
3279 assert_eq!(mat.wait().unwrap(), 123);
3280 assert_eq!(lazy_flow_created.load(StdOrdering::SeqCst), 1);
3281 assert_eq!(stream.next().unwrap().unwrap(), 12);
3282
3283 let future_flow = Source::from_iter([1, 2])
3284 .via_mat(
3285 Flow::future_flow(|| async {
3286 Ok(Flow::identity()
3287 .map(|item: i32| item * 2)
3288 .map_materialized_value(|_| 77))
3289 }),
3290 Keep::right,
3291 )
3292 .to_mat(Sink::collect(), Keep::both)
3293 .run()
3294 .unwrap();
3295 assert_eq!(future_flow.0.wait().unwrap(), 77);
3296 assert_eq!(future_flow.1.wait().unwrap(), vec![2, 4]);
3297 }
3298
3299 #[test]
3300 fn wp6b_lazy_flow_double_use_in_one_chain_pairs_instances_in_order() {
3301 for round in 0..50 {
3306 let counter = StdArc::new(StdAtomicUsize::new(1));
3307 let factory_counter = StdArc::clone(&counter);
3308 let lazy: Flow<usize, usize, _> = Flow::lazy_flow(move || {
3309 let id = factory_counter.fetch_add(1, StdOrdering::SeqCst);
3310 Flow::identity()
3311 .map(move |x: usize| x * 100 + id)
3312 .map_materialized_value(move |_| id)
3313 });
3314 let lazy_again = lazy.clone();
3315
3316 let ((first_mat, second_mat), out) = Source::from_iter([0usize])
3317 .via_mat(lazy, Keep::right)
3318 .via_mat(lazy_again, Keep::both)
3319 .to_mat(Sink::collect(), Keep::both)
3320 .run()
3321 .unwrap();
3322
3323 let first_id = first_mat.wait().unwrap();
3324 let second_id = second_mat.wait().unwrap();
3325 let element = out.wait().unwrap()[0];
3326 assert_eq!(
3327 element,
3328 first_id * 100 + second_id,
3329 "round {round}: mats ({first_id},{second_id}) cross-wired with transform order"
3330 );
3331 assert_ne!(
3332 first_id, second_id,
3333 "round {round}: same factory instance paired twice"
3334 );
3335 }
3336 }
3337
3338 #[test]
3339 fn wp6b_lazy_flow_clones_materialize_concurrently_without_cross_wiring() {
3340 for _ in 0..20 {
3341 let next_id = StdArc::new(StdAtomicUsize::new(0));
3342 let next_id_for_factory = StdArc::clone(&next_id);
3343 let flow = Flow::<i32, i32>::lazy_flow(move || {
3344 let id = next_id_for_factory.fetch_add(1, StdOrdering::SeqCst) + 1;
3345 Flow::identity()
3346 .map(move |item: i32| item + (id as i32 * 100))
3347 .map_materialized_value(move |_| id)
3348 });
3349 let barrier = StdArc::new(std::sync::Barrier::new(3));
3350
3351 let spawn_materialization = |input: i32| {
3352 let flow = flow.clone();
3353 let barrier = StdArc::clone(&barrier);
3354 thread::spawn(move || {
3355 barrier.wait();
3356 let (mat, values) = Source::single(input)
3357 .via_mat(flow, Keep::right)
3358 .to_mat(Sink::collect(), Keep::both)
3359 .run()
3360 .unwrap();
3361 (input, mat.wait().unwrap(), values.wait().unwrap())
3362 })
3363 };
3364
3365 let first = spawn_materialization(1);
3366 let second = spawn_materialization(2);
3367 barrier.wait();
3368
3369 for result in [first.join().unwrap(), second.join().unwrap()] {
3370 let (input, mat_id, values) = result;
3371 assert_eq!(values, vec![input + (mat_id as i32 * 100)]);
3372 }
3373 assert_eq!(next_id.load(StdOrdering::SeqCst), 2);
3374 }
3375 }
3376
3377 #[test]
3378 fn wp6b_map_with_resource_emits_close_item_before_terminal_error() {
3379 let queue = Source::from_factory(|| {
3380 Box::new(vec![Ok(1), Err(StreamError::Failed("upstream".into()))].into_iter())
3381 })
3382 .map_with_resource(
3383 || Ok(()),
3384 |_resource, item| Ok(item + 10),
3385 |_resource| Ok(Some(99)),
3386 )
3387 .run_with(Sink::queue())
3388 .unwrap();
3389
3390 assert_eq!(queue.pull().unwrap(), Some(11));
3391 assert_eq!(queue.pull().unwrap(), Some(99));
3392 assert_eq!(queue.pull(), Err(StreamError::Failed("upstream".into())));
3393
3394 let failed: StreamResult<Vec<i32>> = Source::single(1)
3395 .map_with_resource(
3396 || Ok(()),
3397 |_resource, _item| -> StreamResult<i32> { Err(StreamError::Failed("map".into())) },
3398 |_resource| -> StreamResult<Option<i32>> {
3399 Err(StreamError::Failed("close".into()))
3400 },
3401 )
3402 .run_collect();
3403 assert_eq!(failed, Err(StreamError::Failed("map".into())));
3404 }
3405
3406 #[test]
3407 fn stateful_and_terminal_source_operators_work() {
3408 let stateful = Source::from_iter([1, 2, 3])
3409 .stateful_map(0, |sum, item| {
3410 *sum += item;
3411 *sum
3412 })
3413 .run_collect()
3414 .unwrap();
3415 assert_eq!(stateful, vec![1, 3, 6]);
3416
3417 let concat = Source::from_iter([1, 2, 3])
3418 .stateful_map_concat(0, |sum, item| {
3419 *sum += item;
3420 [item, *sum]
3421 })
3422 .run_collect()
3423 .unwrap();
3424 assert_eq!(concat, vec![1, 1, 2, 3, 3, 6]);
3425
3426 assert_eq!(
3427 Source::from_iter([1, 2, 3])
3428 .fold(10, |acc, item| acc + item)
3429 .run_collect()
3430 .unwrap(),
3431 vec![16]
3432 );
3433 assert_eq!(
3434 Source::from_iter([1, 2, 3])
3435 .reduce(|acc, item| acc + item)
3436 .run_collect()
3437 .unwrap(),
3438 vec![6]
3439 );
3440 }
3441
3442 #[test]
3443 fn concat_and_sliding_emit_before_unbounded_upstream_finishes() {
3444 let concat = Source::single(())
3445 .map_concat(|_| 0_u64..)
3446 .take(1)
3447 .run_collect()
3448 .unwrap();
3449 assert_eq!(concat, vec![0]);
3450
3451 let sliding = Source::repeat(1_u64)
3452 .sliding(2, 1)
3453 .take(1)
3454 .run_collect()
3455 .unwrap();
3456 assert_eq!(sliding, vec![vec![1, 1]]);
3457 }
3458
3459 #[test]
3460 fn fan_in_source_operators_follow_ordering_rules() {
3461 assert_eq!(
3462 Source::from_iter([1, 2])
3463 .concat(Source::from_iter([3, 4]))
3464 .run_collect()
3465 .unwrap(),
3466 vec![1, 2, 3, 4]
3467 );
3468 assert_eq!(
3469 Source::from_iter([3, 4])
3470 .prepend(Source::from_iter([1, 2]))
3471 .run_collect()
3472 .unwrap(),
3473 vec![1, 2, 3, 4]
3474 );
3475 assert_eq!(
3476 Source::empty()
3477 .or_else(Source::from_iter([10, 20]))
3478 .run_collect()
3479 .unwrap(),
3480 vec![10, 20]
3481 );
3482 assert_eq!(
3483 Source::from_iter([1, 2])
3484 .or_else(Source::from_iter([10, 20]))
3485 .run_collect()
3486 .unwrap(),
3487 vec![1, 2]
3488 );
3489 assert_eq!(
3490 Source::from_iter([1, 2, 3])
3491 .interleave(Source::from_iter([10, 11, 12]), 2)
3492 .run_collect()
3493 .unwrap(),
3494 vec![1, 2, 10, 11, 3, 12]
3495 );
3496 }
3497
3498 #[test]
3499 fn fan_in_flow_operators_compose_with_primary_stream() {
3500 let concat = Source::from_iter([1, 2])
3501 .via(Flow::identity().concat(Source::from_iter([3, 4])))
3502 .run_collect()
3503 .unwrap();
3504 assert_eq!(concat, vec![1, 2, 3, 4]);
3505
3506 let prepend = Source::from_iter([3, 4])
3507 .via(Flow::identity().prepend(Source::from_iter([1, 2])))
3508 .run_collect()
3509 .unwrap();
3510 assert_eq!(prepend, vec![1, 2, 3, 4]);
3511
3512 let interleave = Source::from_iter([1, 2, 3])
3513 .via(Flow::identity().interleave(Source::from_iter([10, 11, 12]), 1))
3514 .run_collect()
3515 .unwrap();
3516 assert_eq!(interleave, vec![1, 10, 2, 11, 3, 12]);
3517
3518 let merge_sorted = Source::from_iter([1, 4])
3519 .via(Flow::identity().merge_sorted(Source::from_iter([2, 3, 5])))
3520 .run_collect()
3521 .unwrap();
3522 assert_eq!(merge_sorted, vec![1, 2, 3, 4, 5]);
3523
3524 let zip_latest = Source::from_iter([1, 2])
3525 .via(Flow::identity().zip_latest(Source::single(10)))
3526 .run_collect()
3527 .unwrap();
3528 assert_eq!(zip_latest, vec![(1, 10), (2, 10)]);
3529
3530 let zip_latest_with = Source::from_iter([1, 2])
3531 .via(
3532 Flow::identity()
3533 .zip_latest_with(Source::single(10), false, |left, right| left + right),
3534 )
3535 .run_collect()
3536 .unwrap();
3537 assert_eq!(zip_latest_with, vec![11, 12]);
3538 }
3539
3540 #[test]
3541 fn fan_in_operators_propagate_errors_and_eager_close() {
3542 assert!(matches!(
3543 Source::failed(StreamError::Failed("boom".into()))
3544 .or_else(Source::from_iter([1, 2]))
3545 .run_collect(),
3546 Err(StreamError::Failed(_))
3547 ));
3548 assert!(matches!(
3549 Source::from_iter([1, 2])
3550 .prepend(Source::failed(StreamError::Failed("boom".into())))
3551 .run_collect(),
3552 Err(StreamError::Failed(_))
3553 ));
3554 assert_eq!(
3555 Source::from_iter([1, 2])
3556 .interleave_all([Source::empty()], 1, true)
3557 .run_collect()
3558 .unwrap(),
3559 vec![1]
3560 );
3561 }
3562
3563 #[test]
3564 fn interleave_lazy_pulls_only_inputs_needed_for_first_segment() {
3565 use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
3566
3567 let pulls: Arc<[AtomicUsize; 3]> = Arc::new([
3568 AtomicUsize::new(0),
3569 AtomicUsize::new(0),
3570 AtomicUsize::new(0),
3571 ]);
3572
3573 let make_source = |idx: usize| {
3574 let pulls = Arc::clone(&pulls);
3575 Source::from_materialized_factory(move |_| {
3576 let pulls = Arc::clone(&pulls);
3577 let mut emitted = false;
3578 Ok((
3579 Box::new(std::iter::from_fn(move || {
3580 pulls[idx].fetch_add(1, Ordering::SeqCst);
3581 if !emitted && idx == 0 {
3582 emitted = true;
3583 Some(Ok(42))
3584 } else {
3585 None
3586 }
3587 })) as BoxStream<i32>,
3588 NotUsed,
3589 ))
3590 })
3591 };
3592
3593 let result = make_source(0)
3594 .interleave_all([make_source(1), make_source(2)], 1, false)
3595 .run_with(Sink::head());
3596
3597 assert_eq!(wait(result.unwrap()), 42);
3598 assert_eq!(pulls[0].load(Ordering::SeqCst), 1);
3599 assert_eq!(
3600 pulls[1].load(Ordering::SeqCst),
3601 0,
3602 "second input should not be pulled when downstream cancels after first element"
3603 );
3604 assert_eq!(
3605 pulls[2].load(Ordering::SeqCst),
3606 0,
3607 "third input should not be pulled before its turn"
3608 );
3609 }
3610
3611 #[test]
3612 fn interleave_non_eager_drains_remaining_when_one_input_completes() {
3613 assert_eq!(
3614 Source::from_iter([1, 2, 3, 4])
3615 .interleave_all(
3616 [Source::from_iter([10]), Source::from_iter([20, 21, 22])],
3617 1,
3618 false
3619 )
3620 .run_collect()
3621 .unwrap(),
3622 vec![1, 10, 20, 2, 21, 3, 22, 4]
3623 );
3624 }
3625
3626 #[test]
3627 fn remaining_merge_and_zip_family_matches_expected_ordering() {
3628 assert_eq!(
3629 Source::from_iter([1, 4])
3630 .merge_sorted(Source::from_iter([2, 3, 5]))
3631 .run_collect()
3632 .unwrap(),
3633 vec![1, 2, 3, 4, 5]
3634 );
3635
3636 assert_eq!(
3637 Source::from_iter([1, 2])
3638 .merge_latest(Source::single(10), false)
3639 .run_collect()
3640 .unwrap(),
3641 vec![vec![1, 10], vec![2, 10]]
3642 );
3643
3644 assert_eq!(
3645 Source::from_iter([1, 2, 3])
3646 .merge_all([Source::from_iter([10, 11])], false)
3647 .run_collect()
3648 .unwrap(),
3649 vec![1, 10, 2, 11, 3]
3650 );
3651
3652 assert_eq!(
3653 Source::from_iter([1, 2, 3])
3654 .zip_with(Source::from_iter([10, 11, 12]), |left, right| left + right)
3655 .run_collect()
3656 .unwrap(),
3657 vec![11, 13, 15]
3658 );
3659
3660 assert_eq!(
3661 Source::from_iter([1, 2])
3662 .zip_latest(Source::single(10))
3663 .run_collect()
3664 .unwrap(),
3665 vec![(1, 10), (2, 10)]
3666 );
3667
3668 assert_eq!(
3669 Source::from_iter([1, 2, 3])
3670 .zip_latest_with(Source::from_iter([10]), false, |left, right| left + right)
3671 .run_collect()
3672 .unwrap(),
3673 vec![11, 12, 13]
3674 );
3675
3676 assert_eq!(
3677 Source::from_iter([1, 2])
3678 .zip_all(Source::from_iter([10, 11, 12]), -1, -2)
3679 .run_collect()
3680 .unwrap(),
3681 vec![(1, 10), (2, 11), (-1, 12)]
3682 );
3683
3684 assert_eq!(
3685 Source::from_iter([5, 6, 7])
3686 .zip_with_index()
3687 .run_collect()
3688 .unwrap(),
3689 vec![(5, 0), (6, 1), (7, 2)]
3690 );
3691
3692 assert_eq!(
3693 Source::zip_n([Source::from_iter([1, 2]), Source::from_iter([10, 20])])
3694 .run_collect()
3695 .unwrap(),
3696 vec![vec![1, 10], vec![2, 20]]
3697 );
3698
3699 assert_eq!(
3700 Source::zip_with_n(
3701 [
3702 Source::from_iter([1, 2]),
3703 Source::from_iter([10, 20]),
3704 Source::from_iter([100, 200]),
3705 ],
3706 |values| values.into_iter().sum::<i32>(),
3707 )
3708 .run_collect()
3709 .unwrap(),
3710 vec![111, 222]
3711 );
3712
3713 assert_eq!(
3714 Source::merge_prioritized_n(
3715 [
3716 (Source::from_iter([1, 2, 3, 4]), 2),
3717 (Source::from_iter([10, 11]), 1),
3718 ],
3719 false,
3720 )
3721 .run_collect()
3722 .unwrap(),
3723 vec![1, 2, 10, 3, 4, 11]
3724 );
3725
3726 assert_eq!(
3727 Source::combine(
3728 Source::from_iter([1, 2, 3]),
3729 Source::from_iter([10, 11]),
3730 std::iter::empty::<Source<i32, NotUsed>>(),
3731 SourceCombineStrategy::Merge {
3732 eager_complete: false,
3733 },
3734 )
3735 .run_collect()
3736 .unwrap(),
3737 vec![1, 10, 2, 11, 3]
3738 );
3739
3740 let combined_sink = Sink::combine(
3741 Sink::ignore(),
3742 Sink::ignore(),
3743 std::iter::empty::<Sink<i32, NotUsed>>(),
3744 SinkCombineStrategy::Broadcast,
3745 );
3746 assert_eq!(
3747 Source::from_iter([1, 2, 3])
3748 .run_with(combined_sink)
3749 .unwrap(),
3750 NotUsed
3751 );
3752 }
3753
3754 #[test]
3755 fn sink_combine_broadcast_delivers_every_element_to_every_child() {
3756 let first_count = StdArc::new(StdAtomicUsize::new(0));
3760 let second_count = StdArc::new(StdAtomicUsize::new(0));
3761 let first_counter = StdArc::clone(&first_count);
3762 let second_counter = StdArc::clone(&second_count);
3763 let combined = Sink::combine(
3764 Sink::foreach(move |_: i32| {
3765 first_counter.fetch_add(1, StdOrdering::SeqCst);
3766 }),
3767 Sink::foreach(move |_: i32| {
3768 second_counter.fetch_add(1, StdOrdering::SeqCst);
3769 }),
3770 std::iter::empty::<Sink<i32, NotUsed>>(),
3771 SinkCombineStrategy::Broadcast,
3772 );
3773 assert_eq!(
3774 Source::from_iter(0..100).run_with(combined).unwrap(),
3775 NotUsed
3776 );
3777 assert!(wait_until(StdDuration::from_secs(1), || {
3782 first_count.load(StdOrdering::SeqCst) == 100
3783 && second_count.load(StdOrdering::SeqCst) == 100
3784 }));
3785 }
3786
3787 #[test]
3788 fn zip_latest_completes_when_one_side_finishes_without_emitting() {
3789 assert_eq!(
3793 Source::from_iter(std::iter::empty::<i32>())
3794 .zip_latest_with(Source::repeat(10), false, |left, right| left + right)
3795 .run_collect()
3796 .unwrap(),
3797 Vec::<i32>::new()
3798 );
3799 assert_eq!(
3800 Source::repeat(10)
3801 .zip_latest_with(
3802 Source::from_iter(std::iter::empty::<i32>()),
3803 false,
3804 |left, right| left + right,
3805 )
3806 .run_collect()
3807 .unwrap(),
3808 Vec::<i32>::new()
3809 );
3810 }
3811
3812 #[test]
3813 fn zip_family_completion_boundaries_match_expected_results() {
3814 assert_eq!(
3815 Source::from_iter([1, 2, 3])
3816 .zip_with(Source::from_iter([10]), |left, right| left + right)
3817 .run_collect()
3818 .unwrap(),
3819 vec![11]
3820 );
3821
3822 assert_eq!(
3823 Source::from_iter([1, 2, 3])
3824 .zip_latest_with(Source::from_iter([10]), true, |left, right| left + right)
3825 .run_collect()
3826 .unwrap(),
3827 vec![11, 12]
3828 );
3829
3830 assert_eq!(
3831 Source::zip_n([
3832 Source::from_iter([1, 2, 3]),
3833 Source::from_iter([10]),
3834 Source::from_iter([100, 200, 300]),
3835 ])
3836 .run_collect()
3837 .unwrap(),
3838 vec![vec![1, 10, 100]]
3839 );
3840 }
3841
3842 #[test]
3843 fn combine_strategies_follow_merge_concat_and_priority_rules() {
3844 assert_eq!(
3845 Source::combine(
3846 Source::from_iter([1, 2]),
3847 Source::from_iter([10, 11]),
3848 [Source::from_iter([100])],
3849 SourceCombineStrategy::Concat,
3850 )
3851 .run_collect()
3852 .unwrap(),
3853 vec![1, 2, 10, 11, 100]
3854 );
3855
3856 assert_eq!(
3857 Source::combine(
3858 Source::from_iter([1, 2, 3, 4]),
3859 Source::from_iter([10, 11]),
3860 std::iter::empty::<Source<i32, NotUsed>>(),
3861 SourceCombineStrategy::Prioritized {
3862 priorities: vec![2, 1],
3863 eager_complete: false,
3864 },
3865 )
3866 .run_collect()
3867 .unwrap(),
3868 vec![1, 2, 10, 3, 4, 11]
3869 );
3870 }
3871
3872 #[test]
3873 fn concat_lazy_defers_follow_on_source_until_needed() {
3874 let source_counter = StdArc::new(StdAtomicUsize::new(0));
3875 let source_counter_clone = StdArc::clone(&source_counter);
3876 let lazy_source = Source::from_materialized_factory(move |_| {
3877 source_counter_clone.fetch_add(1, StdOrdering::SeqCst);
3878 Ok((Box::new(std::iter::once(Ok(99))), NotUsed))
3879 });
3880 let source_head = Source::single(1)
3881 .concat_lazy(lazy_source)
3882 .run_with(Sink::head());
3883 assert_eq!(wait(source_head.unwrap()), 1);
3884 assert_eq!(source_counter.load(StdOrdering::SeqCst), 0);
3885
3886 let flow_counter = StdArc::new(StdAtomicUsize::new(0));
3887 let flow_counter_clone = StdArc::clone(&flow_counter);
3888 let lazy_flow_source = Source::from_materialized_factory(move |_| {
3889 flow_counter_clone.fetch_add(1, StdOrdering::SeqCst);
3890 Ok((Box::new(std::iter::once(Ok(99))), NotUsed))
3891 });
3892 let flow_head = Source::single(1)
3893 .via(Flow::identity().concat_lazy(lazy_flow_source))
3894 .run_with(Sink::head());
3895 assert_eq!(wait(flow_head.unwrap()), 1);
3896 assert_eq!(flow_counter.load(StdOrdering::SeqCst), 0);
3897 }
3898
3899 #[test]
3900 fn also_to_completes_when_side_sink_cancels() {
3901 assert_eq!(
3902 Source::from_iter([1, 2, 3])
3903 .also_to(Sink::cancelled())
3904 .run_collect()
3905 .unwrap(),
3906 Vec::<i32>::new()
3907 );
3908 assert_eq!(
3909 Source::from_iter([1, 2, 3])
3910 .also_to_all([Sink::cancelled(), Sink::cancelled()])
3911 .run_collect()
3912 .unwrap(),
3913 Vec::<i32>::new()
3914 );
3915 }
3916
3917 #[test]
3918 fn also_to_completes_gracefully_when_side_sink_disconnects() {
3919 let result = Source::from_iter(0..100)
3920 .also_to(Sink::head())
3921 .run_collect()
3922 .unwrap();
3923 assert!(!result.is_empty(), "main should emit at least one element");
3924 assert!(
3925 result.len() < 100,
3926 "main should complete early when side disconnects"
3927 );
3928 }
3929
3930 #[test]
3931 fn also_to_propagates_original_error_when_side_is_disconnected() {
3932 let err = StreamError::Failed("distinctive-boom".into());
3933 assert!(matches!(
3934 Source::<i32>::failed(err.clone())
3935 .also_to(Sink::cancelled())
3936 .run_collect(),
3937 Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3938 ));
3939 assert!(matches!(
3940 Source::<i32>::failed(err.clone())
3941 .also_to_all([Sink::cancelled()])
3942 .run_collect(),
3943 Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3944 ));
3945 assert!(matches!(
3946 Source::<i32>::failed(err)
3947 .divert_to(Sink::cancelled(), |_: &i32| true)
3948 .run_collect(),
3949 Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3950 ));
3951 }
3952
3953 #[test]
3954 fn divert_to_routes_matching_elements_to_side_sink() {
3955 let diverted = Source::from_iter([1, 2, 3, 4])
3956 .divert_to(Sink::ignore(), |item| item % 2 == 0)
3957 .run_collect()
3958 .unwrap();
3959 assert_eq!(diverted, vec![1, 3]);
3960 }
3961
3962 #[test]
3963 fn wire_tap_drops_when_side_sink_backpressures() {
3964 let tapped = Source::from_iter([1, 2, 3])
3965 .wire_tap(Sink::head())
3966 .run_collect()
3967 .unwrap();
3968 assert_eq!(tapped, vec![1, 2, 3]);
3969
3970 let tapped_via_flow = Source::from_iter([1, 2, 3])
3971 .via(Flow::identity().wire_tap(Sink::head()))
3972 .run_collect()
3973 .unwrap();
3974 assert_eq!(tapped_via_flow, vec![1, 2, 3]);
3975 }
3976
3977 #[test]
3978 fn async_mapping_variants_complete() {
3979 let ordered = Source::from_iter(0..4)
3980 .map_async(2, |item| async move { Ok(item * 2) })
3981 .run_collect()
3982 .unwrap();
3983 assert_eq!(ordered, vec![0, 2, 4, 6]);
3984
3985 let unordered = Source::from_iter(0..4)
3986 .map_async_unordered(2, |item| async move { Ok(item * 2) })
3987 .run_collect()
3988 .unwrap();
3989 assert_eq!(unordered, vec![0, 2, 4, 6]);
3990
3991 let partitioned = Source::from_iter(0..4)
3992 .map_async_partitioned(4, 1, |item| item % 2, |item| async move { Ok(item + 1) })
3993 .run_collect()
3994 .unwrap();
3995 assert_eq!(partitioned, vec![1, 2, 3, 4]);
3996 }
3997
3998 #[test]
3999 fn map_async_ordered_bounds_pulls_behind_stuck_head() {
4000 let pulls = StdArc::new(StdAtomicUsize::new(0));
4001 let pulls_for_source = StdArc::clone(&pulls);
4002 let probe = Source::from_fn_iter(move || {
4003 let pulls = StdArc::clone(&pulls_for_source);
4004 std::iter::from_fn(move || {
4005 let next = pulls.fetch_add(1, StdOrdering::SeqCst);
4006 Some(next)
4007 })
4008 })
4009 .map_async(2, |item| async move {
4010 if item == 0 {
4011 tokio::time::sleep(StdDuration::from_millis(300)).await;
4012 }
4013 Ok(item)
4014 })
4015 .run_with(TestSink::probe())
4016 .unwrap();
4017
4018 probe.request(16);
4019 thread::sleep(StdDuration::from_millis(100));
4020 assert!(
4021 pulls.load(StdOrdering::SeqCst) <= 3,
4022 "pulled {} elements with parallelism=2 behind a stuck ordered head",
4023 pulls.load(StdOrdering::SeqCst)
4024 );
4025 }
4026
4027 #[test]
4028 fn async_mapping_parks_until_woken_future_completes() {
4029 struct WakeOnceFuture {
4030 value: Option<u64>,
4031 ready: StdArc<StdAtomicBool>,
4032 started: bool,
4033 polls: StdArc<StdAtomicUsize>,
4034 latest_waker: StdArc<Mutex<Option<std::task::Waker>>>,
4035 }
4036
4037 impl std::future::Future for WakeOnceFuture {
4038 type Output = StreamResult<u64>;
4039
4040 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
4041 let this = self.as_mut().get_mut();
4042 this.polls.fetch_add(1, StdOrdering::SeqCst);
4043 *this.latest_waker.lock().expect("latest wake slot mutex") =
4044 Some(cx.waker().clone());
4045 if this.ready.load(StdOrdering::SeqCst) {
4046 return Poll::Ready(Ok(this.value.take().unwrap()));
4047 }
4048
4049 if !this.started {
4050 this.started = true;
4051 let ready = StdArc::clone(&this.ready);
4052 let latest_waker = StdArc::clone(&this.latest_waker);
4053 thread::spawn(move || {
4054 thread::sleep(Duration::from_millis(20));
4055 ready.store(true, StdOrdering::SeqCst);
4056 if let Some(waker) =
4057 latest_waker.lock().expect("latest wake slot mutex").take()
4058 {
4059 waker.wake();
4060 }
4061 });
4062 }
4063 Poll::Pending
4064 }
4065 }
4066
4067 let polls = StdArc::new(StdAtomicUsize::new(0));
4068 let polls_for_stage = StdArc::clone(&polls);
4069 let start = Instant::now();
4070
4071 let values = Source::single(41)
4072 .map_async(1, move |item| WakeOnceFuture {
4073 value: Some(item + 1),
4074 ready: StdArc::new(StdAtomicBool::new(false)),
4075 started: false,
4076 polls: StdArc::clone(&polls_for_stage),
4077 latest_waker: StdArc::new(Mutex::new(None)),
4078 })
4079 .run_collect()
4080 .unwrap();
4081
4082 assert_eq!(values, vec![42]);
4083 let elapsed = start.elapsed();
4084 assert!(
4085 elapsed >= StdDuration::from_millis(15) && elapsed < StdDuration::from_millis(250),
4086 "pending future should park until woken once, elapsed={elapsed:?}"
4087 );
4088 assert!(
4089 polls.load(StdOrdering::SeqCst) < 4096,
4090 "pending future was repolled too aggressively"
4091 );
4092 }
4093
4094 #[test]
4095 fn async_mapping_emits_before_unbounded_upstream_finishes() {
4096 let ordered = Source::repeat(1)
4097 .map_async(2, |item| async move { Ok(item + 1) })
4098 .take(1)
4099 .run_collect()
4100 .unwrap();
4101 assert_eq!(ordered, vec![2]);
4102
4103 let unordered = Source::repeat(1)
4104 .map_async_unordered(2, |item| async move { Ok(item + 1) })
4105 .take(1)
4106 .run_collect()
4107 .unwrap();
4108 assert_eq!(unordered, vec![2]);
4109
4110 let partitioned = Source::repeat(1)
4111 .map_async_partitioned(2, 1, |_| 0_u8, |item| async move { Ok(item + 1) })
4112 .take(1)
4113 .run_collect()
4114 .unwrap();
4115 assert_eq!(partitioned, vec![2]);
4116 }
4117
4118 #[test]
4119 fn partitioned_async_mapping_limits_same_key_concurrency() {
4120 let active = StdArc::new(StdAtomicUsize::new(0));
4121 let max_active = StdArc::new(StdAtomicUsize::new(0));
4122 let active_for_stage = StdArc::clone(&active);
4123 let max_for_stage = StdArc::clone(&max_active);
4124
4125 let values = Source::from_iter(0..6)
4126 .map_async_partitioned(
4127 4,
4128 1,
4129 |_| 0_u8,
4130 move |item| {
4131 let active = StdArc::clone(&active_for_stage);
4132 let max_active = StdArc::clone(&max_for_stage);
4133 let current = active.fetch_add(1, StdOrdering::SeqCst) + 1;
4134 max_active.fetch_max(current, StdOrdering::SeqCst);
4135 async move {
4136 thread::sleep(Duration::from_millis(1));
4137 active.fetch_sub(1, StdOrdering::SeqCst);
4138 Ok(item)
4139 }
4140 },
4141 )
4142 .run_collect()
4143 .unwrap();
4144
4145 assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
4146 assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
4147 }
4148
4149 #[test]
4150 fn partitioned_async_mapping_scans_past_blocked_pending_key() {
4151 let active = StdArc::new(StdAtomicUsize::new(0));
4152 let max_active = StdArc::new(StdAtomicUsize::new(0));
4153 let active_for_stage = StdArc::clone(&active);
4154 let max_for_stage = StdArc::clone(&max_active);
4155 let (release_tx, release_rx) = oneshot::channel::<()>();
4156 let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
4157 let release_rx_for_stage = StdArc::clone(&release_rx);
4158 let max_for_release = StdArc::clone(&max_active);
4159
4160 let releaser = thread::spawn(move || {
4161 let deadline = Instant::now() + StdDuration::from_secs(1);
4162 while max_for_release.load(StdOrdering::SeqCst) < 2 && Instant::now() < deadline {
4163 thread::yield_now();
4164 }
4165 let _ = release_tx.send(());
4166 });
4167
4168 let values = Source::from_iter([0, 2, 1])
4169 .map_async_partitioned(
4170 2,
4171 1,
4172 |item| item % 2,
4173 move |item| {
4174 let active = StdArc::clone(&active_for_stage);
4175 let max_active = StdArc::clone(&max_for_stage);
4176 let release_rx = StdArc::clone(&release_rx_for_stage);
4177 let current = active.fetch_add(1, StdOrdering::SeqCst) + 1;
4178 max_active.fetch_max(current, StdOrdering::SeqCst);
4179 async move {
4180 if item == 0 {
4181 let receiver = release_rx
4182 .lock()
4183 .expect("release receiver mutex")
4184 .take()
4185 .expect("release receiver present");
4186 let _ = receiver.await;
4187 }
4188 active.fetch_sub(1, StdOrdering::SeqCst);
4189 Ok(item)
4190 }
4191 },
4192 )
4193 .run_collect()
4194 .unwrap();
4195 releaser.join().unwrap();
4196
4197 assert_eq!(values, vec![0, 2, 1]);
4198 assert_eq!(max_active.load(StdOrdering::SeqCst), 2);
4199 }
4200
4201 #[test]
4202 fn partitioned_async_mapping_p1_still_evaluates_partition() {
4203 let partitions = StdArc::new(StdAtomicUsize::new(0));
4204 let partitions_for_stage = StdArc::clone(&partitions);
4205
4206 let values = Source::from_iter(0..8)
4207 .map_async_partitioned(
4208 1,
4209 1,
4210 move |item| {
4211 partitions_for_stage.fetch_add(1, StdOrdering::SeqCst);
4212 item % 2
4213 },
4214 |item| async move { Ok(item + 1) },
4215 )
4216 .run_collect()
4217 .unwrap();
4218
4219 assert_eq!(values, (1..9).collect::<Vec<_>>());
4220 assert_eq!(partitions.load(StdOrdering::SeqCst), 8);
4221 }
4222
4223 #[test]
4224 fn partitioned_async_mapping_handles_many_keys_high_parallelism() {
4225 let active_by_key =
4226 StdArc::new((0..16).map(|_| StdAtomicUsize::new(0)).collect::<Vec<_>>());
4227 let max_by_key = StdArc::new((0..16).map(|_| StdAtomicUsize::new(0)).collect::<Vec<_>>());
4228 let active_for_stage = StdArc::clone(&active_by_key);
4229 let max_for_stage = StdArc::clone(&max_by_key);
4230
4231 let values = Source::from_iter(0..512_usize)
4232 .map_async_partitioned(
4233 32,
4234 1,
4235 |item| item % 16,
4236 move |item| {
4237 let active = StdArc::clone(&active_for_stage);
4238 let max_active = StdArc::clone(&max_for_stage);
4239 let key = item % 16;
4240 let current = active[key].fetch_add(1, StdOrdering::SeqCst) + 1;
4241 max_active[key].fetch_max(current, StdOrdering::SeqCst);
4242 async move {
4243 active[key].fetch_sub(1, StdOrdering::SeqCst);
4244 Ok(item)
4245 }
4246 },
4247 )
4248 .run_collect()
4249 .unwrap();
4250
4251 assert_eq!(values, (0..512).collect::<Vec<_>>());
4252 for max_active in max_by_key.iter() {
4253 assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
4254 }
4255 }
4256
4257 #[test]
4258 fn error_operators_map_recover_and_complete() {
4259 let mapped = Source::<i32>::failed(StreamError::Failed("boom".into()))
4260 .map_error(|_| StreamError::Failed("mapped".into()))
4261 .run_collect();
4262 assert_eq!(mapped, Err(StreamError::Failed("mapped".into())));
4263
4264 let recovered = Source::<i32>::failed(StreamError::Failed("boom".into()))
4265 .recover(|error| match error {
4266 StreamError::Failed(_) => Some(42),
4267 _ => None,
4268 })
4269 .run_collect()
4270 .unwrap();
4271 assert_eq!(recovered, vec![42]);
4272
4273 let unrecovered = Source::<i32>::failed(StreamError::Failed("original".into()))
4274 .recover(|_| None)
4275 .run_collect();
4276 assert_eq!(unrecovered, Err(StreamError::Failed("original".into())));
4277
4278 let recovered_with = Source::<i32>::failed(StreamError::Failed("boom".into()))
4279 .recover_with_retries(1, |_| Some(Source::from_iter([1, 2])))
4280 .run_collect()
4281 .unwrap();
4282 assert_eq!(recovered_with, vec![1, 2]);
4283
4284 let declined_recover_with = Source::<i32>::failed(StreamError::Failed("declined".into()))
4285 .recover_with_retries(1, |_| None)
4286 .run_collect();
4287 assert_eq!(
4288 declined_recover_with,
4289 Err(StreamError::Failed("declined".into()))
4290 );
4291
4292 let completed = Source::from_factory(|| {
4293 Box::new(vec![Ok(1), Err(StreamError::Failed("ignored".into())), Ok(2)].into_iter())
4294 })
4295 .on_error_complete()
4296 .run_collect()
4297 .unwrap();
4298 assert_eq!(completed, vec![1]);
4299 }
4300
4301 #[test]
4302 fn sliding_matches_akka_window_semantics() {
4303 assert_eq!(
4305 Source::from_iter(1..=4)
4306 .sliding(3, 1)
4307 .run_collect()
4308 .unwrap(),
4309 vec![vec![1, 2, 3], vec![2, 3, 4]]
4310 );
4311 assert_eq!(
4312 Source::from_iter(1..=4)
4313 .sliding(2, 1)
4314 .run_collect()
4315 .unwrap(),
4316 vec![vec![1, 2], vec![2, 3], vec![3, 4]]
4317 );
4318 assert_eq!(
4320 Source::from_iter(1..=3)
4321 .sliding(3, 1)
4322 .run_collect()
4323 .unwrap(),
4324 vec![vec![1, 2, 3]]
4325 );
4326 assert_eq!(
4328 Source::from_iter(1..=2)
4329 .sliding(3, 1)
4330 .run_collect()
4331 .unwrap(),
4332 vec![vec![1, 2]]
4333 );
4334 assert_eq!(
4336 Source::from_iter(1..=3)
4337 .sliding(1, 1)
4338 .run_collect()
4339 .unwrap(),
4340 vec![vec![1], vec![2], vec![3]]
4341 );
4342 assert_eq!(
4344 Source::from_iter(1..=6)
4345 .sliding(2, 3)
4346 .run_collect()
4347 .unwrap(),
4348 vec![vec![1, 2], vec![4, 5]]
4349 );
4350 assert_eq!(
4352 Source::from_iter(1..=3)
4353 .sliding(2, 4)
4354 .run_collect()
4355 .unwrap(),
4356 vec![vec![1, 2]]
4357 );
4358 }
4359
4360 #[test]
4361 fn recover_with_retries_indefinitely_like_akka() {
4362 let attempts = StdArc::new(StdAtomicUsize::new(0));
4363 let attempts_in_stage = StdArc::clone(&attempts);
4364 let recovered = Source::<i32>::failed(StreamError::Failed("boom".into()))
4367 .recover_with(move |_error| {
4368 if attempts_in_stage.fetch_add(1, StdOrdering::SeqCst) < 5 {
4369 Some(Source::<i32>::failed(StreamError::Failed("again".into())))
4370 } else {
4371 Some(Source::from_iter([42]))
4372 }
4373 })
4374 .run_collect()
4375 .unwrap();
4376 assert_eq!(recovered, vec![42]);
4377 assert_eq!(attempts.load(StdOrdering::SeqCst), 6);
4378 }
4379
4380 #[test]
4381 fn many_concurrent_streams_do_not_starve_the_pool() {
4382 let materializer = Materializer::new();
4391 let busy = 6_usize;
4392
4393 let mut held = Vec::with_capacity(busy);
4394 for _ in 0..busy {
4395 held.push(
4396 Source::single(1_u64)
4397 .run_with_materializer(Sink::never(), &materializer)
4398 .unwrap(),
4399 );
4400 }
4401
4402 for _ in 0..400 {
4403 if materializer.active_streams() >= busy {
4404 break;
4405 }
4406 thread::sleep(Duration::from_millis(5));
4407 }
4408 assert_eq!(materializer.active_streams(), busy);
4409
4410 let sum = Source::from_iter(0_u64..5)
4413 .run_with_materializer(Sink::fold(0_u64, |acc, item| acc + item), &materializer)
4414 .unwrap();
4415 assert_eq!(sum.wait().unwrap(), 10);
4416
4417 materializer.shutdown();
4418 for completion in held {
4419 assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
4420 }
4421 }
4422}