1use std::{
4 collections::{BTreeMap, HashMap, VecDeque},
5 fmt,
6 future::Future,
7 hash::Hash,
8 marker::PhantomData,
9 panic::{AssertUnwindSafe, catch_unwind},
10 pin::Pin,
11 sync::{
12 Arc, Condvar, Mutex, OnceLock,
13 atomic::{AtomicBool, AtomicUsize, Ordering},
14 },
15 task::{Context, Poll},
16 thread,
17 time::Duration,
18};
19
20use futures::{channel::oneshot, executor::block_on};
21use thiserror::Error;
22use tokio::{
23 runtime::{Builder as TokioRuntimeBuilder, Runtime as TokioRuntime},
24 task::{JoinError, JoinHandle},
25};
26
27pub(crate) type BoxStream<T> = Box<dyn Iterator<Item = StreamResult<T>> + Send>;
28pub(crate) type PureTransform<In, Out> = Arc<dyn Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync>;
29pub(crate) type RuntimeTransform<In, Out> =
30 Arc<dyn Fn(BoxStream<In>, &Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync>;
31type SinkRunner<In, Mat> = dyn Fn(BoxStream<In>, &Materializer) -> StreamResult<Mat> + Send + Sync;
32type RunnableGraphRunner<Mat> = dyn Fn(&Materializer) -> StreamResult<Mat> + Send + Sync;
33const STREAM_READY_SPINS: usize = 256;
34const STREAM_SPIN_BACKOFF: usize = 8;
39const STREAM_MAX_PARK: Duration = Duration::from_millis(1);
40
41#[derive(Clone, Copy, Debug, PartialEq, Eq)]
45struct InlineMicroSourceHint {
46 max_success_items: usize,
47}
48
49#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
50struct SourceHints {
51 inline_head_terminal: bool,
52 inline_micro: Option<InlineMicroSourceHint>,
55}
56
57impl SourceHints {
58 const fn with_inline_micro(max_success_items: usize) -> Self {
59 Self {
60 inline_head_terminal: true,
61 inline_micro: Some(InlineMicroSourceHint { max_success_items }),
62 }
63 }
64
65 fn after_flow(self, flow: FlowHints) -> Self {
66 if flow.preserves_inline_head_terminal {
67 Self {
70 inline_head_terminal: true,
71 inline_micro: None,
72 }
73 } else {
74 Self::default()
75 }
76 }
77}
78
79#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
80struct FlowHints {
81 preserves_inline_head_terminal: bool,
82}
83
84impl FlowHints {
85 const PRESERVES_INLINE_HEAD_TERMINAL: Self = Self {
86 preserves_inline_head_terminal: true,
87 };
88
89 fn then(self, next: Self) -> Self {
90 Self {
91 preserves_inline_head_terminal: self.preserves_inline_head_terminal
92 && next.preserves_inline_head_terminal,
93 }
94 }
95}
96
97struct PartitionSlot<Key, Out> {
98 key: Option<Key>,
99 active: usize,
100 queued: VecDeque<(usize, Out)>,
101 in_ready_queue: bool,
102}
103
104struct AbortOnDropHandle<T> {
105 handle: JoinHandle<T>,
106}
107
108impl<T> AbortOnDropHandle<T> {
109 fn new(handle: JoinHandle<T>) -> Self {
110 Self { handle }
111 }
112}
113
114impl<T> Drop for AbortOnDropHandle<T> {
115 fn drop(&mut self) {
116 self.handle.abort();
117 }
118}
119
120impl<T> Future for AbortOnDropHandle<T> {
121 type Output = Result<T, JoinError>;
122
123 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
124 Pin::new(&mut self.handle).poll(cx)
125 }
126}
127
128impl<T> Unpin for AbortOnDropHandle<T> {}
129
130pub(crate) fn stream_tokio_runtime() -> &'static TokioRuntime {
131 static RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
132 RUNTIME.get_or_init(|| {
133 TokioRuntimeBuilder::new_multi_thread()
134 .enable_all()
135 .thread_name("datum-stream-tokio")
136 .build()
137 .expect("stream tokio runtime")
138 })
139}
140
141fn spawn_tokio_task<Fut, T>(future: Fut) -> AbortOnDropHandle<T>
142where
143 Fut: Future<Output = T> + Send + 'static,
144 T: Send + 'static,
145{
146 AbortOnDropHandle::new(stream_tokio_runtime().spawn(future))
147}
148
149pub(crate) fn current_stream_cancelled() -> Option<Arc<AtomicBool>> {
150 runtime::current_stream_cancelled()
151}
152
153pub(super) fn catch_unwind_failed<T, F>(context: &'static str, f: F) -> StreamResult<T>
154where
155 F: FnOnce() -> T,
156{
157 catch_unwind(AssertUnwindSafe(f))
158 .map_err(|_| StreamError::Failed(format!("{context} panicked")))
159}
160
161impl<Key, Out> PartitionSlot<Key, Out> {
162 fn new(key: Key) -> Self {
163 Self {
164 key: Some(key),
165 active: 0,
166 queued: VecDeque::new(),
167 in_ready_queue: false,
168 }
169 }
170}
171
172#[inline(always)]
173fn partition_slot_for<Key, Out>(
174 key: Key,
175 slots_by_key: &mut HashMap<Key, usize>,
176 slots: &mut Vec<PartitionSlot<Key, Out>>,
177 free_slots: &mut Vec<usize>,
178) -> usize
179where
180 Key: Clone + Eq + Hash,
181{
182 if let Some(slot) = slots_by_key.get(&key) {
183 return *slot;
184 }
185
186 let slot = if let Some(slot) = free_slots.pop() {
187 let state = &mut slots[slot];
188 state.key = Some(key.clone());
189 state.active = 0;
190 state.queued.clear();
191 state.in_ready_queue = false;
192 slot
193 } else {
194 slots.push(PartitionSlot::new(key.clone()));
195 slots.len() - 1
196 };
197 slots_by_key.insert(key, slot);
198 slot
199}
200
201#[inline(always)]
202fn retire_partition_slot<Key, Out>(
203 slot: usize,
204 slots_by_key: &mut HashMap<Key, usize>,
205 slots: &mut [PartitionSlot<Key, Out>],
206 free_slots: &mut Vec<usize>,
207) where
208 Key: Eq + Hash,
209{
210 let state = &mut slots[slot];
211 if let Some(key) = state.key.take() {
212 slots_by_key.remove(&key);
213 }
214 state.active = 0;
215 state.queued.clear();
216 state.in_ready_queue = false;
217 free_slots.push(slot);
218}
219
220#[inline(always)]
221fn ready_partition_slot<Key, Out>(
222 slots: &mut [PartitionSlot<Key, Out>],
223 ready_slots: &mut VecDeque<usize>,
224 slot: usize,
225 per_partition: usize,
226) {
227 if let Some(state) = slots.get_mut(slot)
228 && state.key.is_some()
229 && !state.in_ready_queue
230 && state.active < per_partition
231 && !state.queued.is_empty()
232 {
233 state.in_ready_queue = true;
234 ready_slots.push_back(slot);
235 }
236}
237
238#[inline(always)]
239fn pop_ready_partition_slot<Key, Out>(
240 slots: &mut [PartitionSlot<Key, Out>],
241 ready_slots: &mut VecDeque<usize>,
242 per_partition: usize,
243) -> Option<(usize, usize, Out)> {
244 while let Some(slot) = ready_slots.pop_front() {
245 let mut requeue = false;
246 let item = if let Some(state) = slots.get_mut(slot) {
247 state.in_ready_queue = false;
248 if state.key.is_some() && state.active < per_partition {
249 let item = state.queued.pop_front().map(|(index, item)| {
250 state.active += 1;
251 (index, slot, item)
252 });
253 if !state.queued.is_empty() && state.active < per_partition {
254 state.in_ready_queue = true;
255 requeue = true;
256 }
257 item
258 } else {
259 None
260 }
261 } else {
262 None
263 };
264
265 if requeue {
266 ready_slots.push_back(slot);
267 }
268 if item.is_some() {
269 return item;
270 }
271 }
272 None
273}
274
275pub(crate) trait SourceFactory<Out, Mat>: Send + Sync {
276 fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)>;
277}
278
279struct FnSourceFactory<F>(F);
280
281impl<Out, Mat, F> SourceFactory<Out, Mat> for FnSourceFactory<F>
282where
283 F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync,
284{
285 fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)> {
286 (self.0)(materializer)
287 }
288}
289
290struct MapSourceFactory<In, Out, Mat, F> {
291 source: Arc<dyn SourceFactory<In, Mat>>,
292 stage: F,
293 _marker: PhantomData<fn(In) -> Out>,
294}
295
296impl<In, Out, Mat, F> SourceFactory<Out, Mat> for MapSourceFactory<In, Out, Mat, F>
297where
298 In: Send + 'static,
299 Out: Send + 'static,
300 Mat: Send + 'static,
301 F: Fn(In) -> Out + Send + Sync + 'static,
302{
303 fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)> {
304 let (stream, mat) = Arc::clone(&self.source).create(materializer)?;
305 Ok((
306 Box::new(MapSourceStream {
307 input: stream,
308 factory: self,
309 }),
310 mat,
311 ))
312 }
313}
314
315struct MapSourceStream<In, Out, Mat, F> {
316 input: BoxStream<In>,
317 factory: Arc<MapSourceFactory<In, Out, Mat, F>>,
318}
319
320impl<In, Out, Mat, F> Iterator for MapSourceStream<In, Out, Mat, F>
321where
322 F: Fn(In) -> Out,
323{
324 type Item = StreamResult<Out>;
325
326 fn next(&mut self) -> Option<Self::Item> {
327 self.input
328 .next()
329 .map(|item| item.map(|item| (self.factory.stage)(item)))
330 }
331}
332
333fn merge_streams<Out>(streams: Vec<BoxStream<Out>>, eager_complete: bool) -> BoxStream<Out>
334where
335 Out: Send + 'static,
336{
337 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
338 let mut current = 0usize;
339 Box::new(std::iter::from_fn(move || {
340 loop {
341 let index = next_active_optional_stream(&streams, current, |_| true)?;
342 current = (index + 1) % streams.len().max(1);
343 let Some(stream) = streams[index].as_mut() else {
344 continue;
345 };
346 match stream.next() {
347 Some(item) => return Some(item),
348 None => {
349 streams[index] = None;
350 if eager_complete {
351 return None;
352 }
353 }
354 }
355 }
356 }))
357}
358
359fn merge_prioritized_streams<Out>(
360 streams: Vec<BoxStream<Out>>,
361 priorities: Vec<usize>,
362 eager_complete: bool,
363) -> BoxStream<Out>
364where
365 Out: Send + 'static,
366{
367 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
368 let schedule: Vec<usize> = priorities
369 .into_iter()
370 .enumerate()
371 .flat_map(|(index, weight)| std::iter::repeat_n(index, weight))
372 .collect();
373 let mut schedule_index = 0usize;
374 Box::new(std::iter::from_fn(move || {
375 loop {
376 if streams.iter().all(Option::is_none) {
377 return None;
378 }
379 let index = next_weighted_stream(&streams, &schedule, &mut schedule_index)?;
380 let Some(stream) = streams[index].as_mut() else {
381 continue;
382 };
383 match stream.next() {
384 Some(item) => return Some(item),
385 None => {
386 streams[index] = None;
387 if eager_complete {
388 return None;
389 }
390 }
391 }
392 }
393 }))
394}
395
396fn merge_sorted_stream<Out>(mut left: BoxStream<Out>, mut right: BoxStream<Out>) -> BoxStream<Out>
397where
398 Out: Ord + Send + 'static,
399{
400 let mut left_next: Option<Out> = None;
401 let mut right_next: Option<Out> = None;
402 let mut left_done = false;
403 let mut right_done = false;
404 Box::new(std::iter::from_fn(move || {
405 loop {
406 if left_next.is_none() && !left_done {
407 match left.next() {
408 Some(Ok(item)) => left_next = Some(item),
409 Some(Err(error)) => return Some(Err(error)),
410 None => left_done = true,
411 }
412 }
413 if right_next.is_none() && !right_done {
414 match right.next() {
415 Some(Ok(item)) => right_next = Some(item),
416 Some(Err(error)) => return Some(Err(error)),
417 None => right_done = true,
418 }
419 }
420
421 let next = match (&left_next, &right_next) {
422 (Some(left_item), Some(right_item)) => {
423 if left_item <= right_item {
424 left_next.take()
425 } else {
426 right_next.take()
427 }
428 }
429 (Some(_), None) if right_done => left_next.take(),
430 (None, Some(_)) if left_done => right_next.take(),
431 (None, None) if left_done && right_done => return None,
432 _ => continue,
433 };
434 if let Some(item) = next {
435 return Some(Ok(item));
436 }
437 }
438 }))
439}
440
441fn merge_latest_streams<Out>(
442 streams: Vec<BoxStream<Out>>,
443 eager_complete: bool,
444) -> BoxStream<Vec<Out>>
445where
446 Out: Clone + Send + 'static,
447{
448 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
449 let mut latest = vec![None; streams.len()];
450 let mut seen = 0usize;
451 let mut current = 0usize;
452 let mut pending = VecDeque::<Vec<Out>>::new();
453 Box::new(std::iter::from_fn(move || {
454 loop {
455 if let Some(output) = pending.pop_front() {
456 return Some(Ok(output));
457 }
458 if streams.iter().all(Option::is_none) {
459 return None;
460 }
461 let index = next_active_optional_stream(&streams, current, |_| true)?;
462 current = (index + 1) % streams.len().max(1);
463 let Some(stream) = streams[index].as_mut() else {
464 continue;
465 };
466 match stream.next() {
467 Some(Ok(item)) => {
468 if latest[index].is_none() {
469 seen += 1;
470 }
471 latest[index] = Some(item);
472 if seen == latest.len() {
473 pending.push_back(
474 latest
475 .iter()
476 .map(|item| item.clone().expect("merge-latest initialized"))
477 .collect(),
478 );
479 }
480 }
481 Some(Err(error)) => return Some(Err(error)),
482 None => {
483 streams[index] = None;
484 if eager_complete {
485 return None;
486 }
487 }
488 }
489 }
490 }))
491}
492
493fn zip_streams<Left, Right>(
494 mut left: BoxStream<Left>,
495 mut right: BoxStream<Right>,
496) -> BoxStream<(Left, Right)>
497where
498 Left: Send + 'static,
499 Right: Send + 'static,
500{
501 let mut left_next: Option<Left> = None;
502 let mut right_next: Option<Right> = None;
503 let mut left_done = false;
504 let mut right_done = false;
505 Box::new(std::iter::from_fn(move || {
506 loop {
507 if left_next.is_none() && !left_done {
508 match left.next() {
509 Some(Ok(item)) => left_next = Some(item),
510 Some(Err(error)) => return Some(Err(error)),
511 None => left_done = true,
512 }
513 }
514 if right_next.is_none() && !right_done {
515 match right.next() {
516 Some(Ok(item)) => right_next = Some(item),
517 Some(Err(error)) => return Some(Err(error)),
518 None => right_done = true,
519 }
520 }
521 match (left_next.take(), right_next.take()) {
522 (Some(left_item), Some(right_item)) => return Some(Ok((left_item, right_item))),
523 (left_item, right_item) => {
524 left_next = left_item;
525 right_next = right_item;
526 if (left_done && left_next.is_none()) || (right_done && right_next.is_none()) {
527 return None;
528 }
529 }
530 }
531 }
532 }))
533}
534
535fn zip_latest_with_stream<Left, Right, Out, F>(
536 mut left: BoxStream<Left>,
537 mut right: BoxStream<Right>,
538 eager_complete: bool,
539 combine: Arc<F>,
540) -> BoxStream<Out>
541where
542 Left: Clone + Send + 'static,
543 Right: Clone + Send + 'static,
544 Out: Send + 'static,
545 F: Fn(Left, Right) -> Out + Send + Sync + 'static,
546{
547 let mut left_latest: Option<Left> = None;
548 let mut right_latest: Option<Right> = None;
549 let mut left_done = false;
550 let mut right_done = false;
551 let mut turn_left = true;
552 let mut pending = VecDeque::<Out>::new();
553
554 Box::new(std::iter::from_fn(move || {
555 loop {
556 if let Some(output) = pending.pop_front() {
557 return Some(Ok(output));
558 }
559 if eager_complete && (left_done || right_done) {
560 return None;
561 }
562 if left_done && right_done {
563 return None;
564 }
565 if (left_done && left_latest.is_none()) || (right_done && right_latest.is_none()) {
569 return None;
570 }
571
572 let pull_left = if left_done {
573 false
574 } else if right_done {
575 true
576 } else {
577 let value = turn_left;
578 turn_left = !turn_left;
579 value
580 };
581
582 if pull_left {
583 match left.next() {
584 Some(Ok(item)) => {
585 left_latest = Some(item);
586 if let (Some(left_item), Some(right_item)) = (&left_latest, &right_latest) {
587 pending.push_back(combine(left_item.clone(), right_item.clone()));
588 }
589 }
590 Some(Err(error)) => return Some(Err(error)),
591 None => {
592 left_done = true;
593 if eager_complete {
594 return None;
595 }
596 }
597 }
598 } else {
599 match right.next() {
600 Some(Ok(item)) => {
601 right_latest = Some(item);
602 if let (Some(left_item), Some(right_item)) = (&left_latest, &right_latest) {
603 pending.push_back(combine(left_item.clone(), right_item.clone()));
604 }
605 }
606 Some(Err(error)) => return Some(Err(error)),
607 None => {
608 right_done = true;
609 if eager_complete {
610 return None;
611 }
612 }
613 }
614 }
615 }
616 }))
617}
618
619fn zip_all_stream<Left, Right>(
620 mut left: BoxStream<Left>,
621 mut right: BoxStream<Right>,
622 left_fill: Left,
623 right_fill: Right,
624) -> BoxStream<(Left, Right)>
625where
626 Left: Clone + Send + 'static,
627 Right: Clone + Send + 'static,
628{
629 let mut left_done = false;
630 let mut right_done = false;
631 Box::new(std::iter::from_fn(move || {
632 if left_done && right_done {
633 return None;
634 }
635
636 let left_item = if left_done {
637 None
638 } else {
639 match left.next() {
640 Some(Ok(item)) => Some(item),
641 Some(Err(error)) => return Some(Err(error)),
642 None => {
643 left_done = true;
644 None
645 }
646 }
647 };
648 let right_item = if right_done {
649 None
650 } else {
651 match right.next() {
652 Some(Ok(item)) => Some(item),
653 Some(Err(error)) => return Some(Err(error)),
654 None => {
655 right_done = true;
656 None
657 }
658 }
659 };
660
661 match (left_item, right_item) {
662 (None, None) if left_done && right_done => None,
663 (Some(left_value), Some(right_value)) => Some(Ok((left_value, right_value))),
664 (Some(left_value), None) => Some(Ok((left_value, right_fill.clone()))),
665 (None, Some(right_value)) => Some(Ok((left_fill.clone(), right_value))),
666 (None, None) => None,
667 }
668 }))
669}
670
671fn zip_n_streams<Out, Next, F>(streams: Vec<BoxStream<Out>>, zipper: Arc<F>) -> BoxStream<Next>
672where
673 Out: Send + 'static,
674 Next: Send + 'static,
675 F: Fn(Vec<Out>) -> Next + Send + Sync + 'static,
676{
677 let count = streams.len();
678 if count == 0 {
679 return Box::new(std::iter::empty());
680 }
681 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
682 let mut slots: Vec<Option<Out>> = (0..count).map(|_| None).collect();
683 let mut current = 0usize;
684 Box::new(std::iter::from_fn(move || {
685 loop {
686 if slots.iter().all(Option::is_some) {
687 let values = slots
688 .iter_mut()
689 .map(|slot| slot.take().expect("zip-n slot filled"))
690 .collect();
691 return Some(Ok(zipper(values)));
692 }
693
694 let index = next_active_optional_stream(&streams, current, |idx| slots[idx].is_none())?;
695 current = (index + 1) % count.max(1);
696 let Some(stream) = streams[index].as_mut() else {
697 continue;
698 };
699 match stream.next() {
700 Some(Ok(item)) => slots[index] = Some(item),
701 Some(Err(error)) => return Some(Err(error)),
702 None => {
703 streams[index] = None;
704 slots[index].as_ref()?;
705 }
706 }
707 }
708 }))
709}
710
711fn next_active_optional_stream<T, F>(
712 streams: &[Option<BoxStream<T>>],
713 current: usize,
714 predicate: F,
715) -> Option<usize>
716where
717 T: Send + 'static,
718 F: Fn(usize) -> bool,
719{
720 if streams.is_empty() {
721 return None;
722 }
723 for offset in 0..streams.len() {
724 let index = (current + offset) % streams.len();
725 if streams[index].is_some() && predicate(index) {
726 return Some(index);
727 }
728 }
729 None
730}
731
732fn next_weighted_stream<T>(
733 streams: &[Option<BoxStream<T>>],
734 schedule: &[usize],
735 schedule_index: &mut usize,
736) -> Option<usize>
737where
738 T: Send + 'static,
739{
740 if streams.is_empty() || schedule.is_empty() {
741 return None;
742 }
743 for _ in 0..schedule.len() {
744 let index = schedule[*schedule_index % schedule.len()];
745 *schedule_index = (*schedule_index + 1) % schedule.len();
746 if streams.get(index).is_some_and(Option::is_some) {
747 return Some(index);
748 }
749 }
750 None
751}
752
753mod completion;
754mod error;
755mod flow;
756mod rate;
757mod restart;
758mod runtime;
759mod sink;
760mod source;
761mod time;
762mod timer;
763
764pub(crate) trait SplitSegmentHookDyn: Send + Sync + 'static {
768 fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Send + Sync>;
769}
770
771pub(crate) trait FoldFastPathDyn<In: Send + 'static>: Send + Sync + 'static {
774 fn try_register(
778 &self,
779 hook: Arc<dyn SplitSegmentHookDyn>,
780 ) -> Option<StreamResult<Box<dyn std::any::Any + Send>>>;
781}
782
783use self::runtime::runtime_checked_stream;
784
785pub use self::{
786 completion::{Cancellable, StreamCompletion},
787 error::{StreamError, StreamResult, Supervision, SupervisionDecider, SupervisionDirective},
788 flow::{BidiFlow, Flow},
789 rate::{AggregateTimer, OverflowStrategy},
790 restart::{RestartFlow, RestartSettings, RestartSink, RestartSource, RetryFlow},
791 runtime::{Materializer, Runtime},
792 sink::{RunnableGraph, Sink, SinkCombineStrategy},
793 source::{Demand, Keep, MaybeHandle, NotUsed, PushOutlet, Source, SourceCombineStrategy},
794 time::{DelayOverflowStrategy, ThrottleMode},
795};
796
797#[cfg(test)]
798mod tests {
799 use super::*;
800 use crate::Attributes;
801 use crate::testkit::TestSink;
802 use std::fs;
803 use std::sync::{
804 Arc as StdArc,
805 atomic::{
806 AtomicBool as StdAtomicBool, AtomicUsize as StdAtomicUsize, Ordering as StdOrdering,
807 },
808 mpsc,
809 };
810 use std::time::Duration as StdDuration;
811 use std::time::Instant;
812
813 fn wait<T>(completion: StreamCompletion<T>) -> T {
814 completion.wait().unwrap()
815 }
816
817 fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
818 let deadline = Instant::now() + timeout;
819 while Instant::now() < deadline {
820 if condition() {
821 return true;
822 }
823 thread::sleep(Duration::from_millis(2));
824 }
825 condition()
826 }
827
828 fn linux_thread_count(thread_name: &str) -> usize {
829 fs::read_dir("/proc/self/task")
830 .expect("task directory readable")
831 .filter_map(Result::ok)
832 .filter_map(|entry| fs::read_to_string(entry.path().join("comm")).ok())
833 .filter(|name| name.trim() == thread_name)
834 .count()
835 }
836
837 #[test]
838 fn source_blueprints_are_reusable() {
839 let source = Source::from_iter(0..5).map(|item| item + 1);
840
841 assert_eq!(source.clone().run_collect().unwrap(), vec![1, 2, 3, 4, 5]);
842 assert_eq!(source.run_collect().unwrap(), vec![1, 2, 3, 4, 5]);
843 }
844
845 #[test]
846 fn source_map_preserves_materialized_value() {
847 let graph = Source::single(1)
848 .map_materialized_value(|_| "source")
849 .map(|item| item + 1)
850 .to_mat(Sink::head(), Keep::both);
851
852 let materialized = graph.run().unwrap();
853 assert_eq!(materialized.0, "source");
854 assert_eq!(wait(materialized.1), 2);
855 }
856
857 #[test]
858 fn source_and_flow_compose() {
859 let flow = Flow::identity()
860 .map(|item: i32| item * 2)
861 .filter(|item| item % 3 == 0);
862
863 let result = Source::from_iter(0..8).via(flow).run_collect().unwrap();
864
865 assert_eq!(result, vec![0, 6, 12]);
866 }
867
868 #[test]
869 fn sink_setup_sees_materializer_defaults_and_local_attributes() {
870 let observed = StdArc::new(Mutex::new(None));
871 let observed_in_setup = StdArc::clone(&observed);
872 let sink = Sink::<i32, StreamCompletion<NotUsed>>::setup(move |_materializer, attrs| {
873 *observed_in_setup.lock().unwrap() = Some((
874 attrs.name().map(str::to_owned),
875 attrs.input_buffer_hint(),
876 attrs.dispatcher_hint().map(str::to_owned),
877 ));
878 Sink::ignore()
879 })
880 .add_attributes(Attributes::named("sink-inner"))
881 .add_attributes(Attributes::input_buffer(4, 4))
882 .add_attributes(Attributes::dispatcher("bench-dispatcher"));
883
884 let materializer = Materializer::new().with_attributes(Attributes::named("mat-outer"));
885 wait(
886 Source::from_iter([1, 2, 3])
887 .run_with_materializer(sink, &materializer)
888 .unwrap(),
889 );
890
891 assert_eq!(
892 *observed.lock().unwrap(),
893 Some((
894 Some("sink-inner".to_owned()),
895 Some((4, 4)),
896 Some("bench-dispatcher".to_owned())
897 ))
898 );
899 }
900
901 #[test]
902 fn sink_pre_materialize_feeds_existing_materialization() {
903 let materializer = Materializer::new();
904 let (completion, pre) = Sink::<i32, StreamCompletion<Vec<i32>>>::collect()
905 .pre_materialize(&materializer)
906 .unwrap();
907
908 Source::from_iter([1, 2, 3])
909 .run_with_materializer(pre, &materializer)
910 .unwrap();
911
912 assert_eq!(wait(completion), vec![1, 2, 3]);
913 }
914
915 #[test]
916 fn flow_from_sink_and_source_connects_both_sides() {
917 assert_eq!(
918 Source::from_iter([1, 2, 3])
919 .via(Flow::from_sink_and_source(
920 Sink::foreach(|_item: i32| {}),
921 Source::from_iter([10, 20, 30]),
922 ))
923 .run_collect()
924 .unwrap(),
925 vec![10, 20, 30]
926 );
927 }
928
929 #[test]
930 fn from_sink_and_source_keeps_sink_running_after_source_side_completes() {
931 let completed = StdArc::new(StdAtomicBool::new(false));
932 let on_complete = StdArc::clone(&completed);
933 let flow = Flow::from_sink_and_source(
934 Sink::on_complete(move || {
935 on_complete.store(true, StdOrdering::SeqCst);
936 }),
937 Source::single(10),
938 );
939
940 let result = Source::from_iter([1, 2, 3])
941 .via(flow)
942 .run_collect()
943 .unwrap();
944
945 assert_eq!(result, vec![10]);
946 assert!(wait_until(StdDuration::from_secs(1), || {
947 completed.load(StdOrdering::SeqCst)
948 }));
949 }
950
951 #[test]
952 fn from_sink_and_source_coupled_cancels_source_when_sink_finishes_first() {
953 let cancellable = StdArc::new(Mutex::new(None));
954 let observed = StdArc::clone(&cancellable);
955 let flow = Flow::from_sink_and_source_coupled(
956 Sink::ignore(),
957 Source::tick(
958 StdDuration::from_millis(50),
959 StdDuration::from_millis(50),
960 10,
961 )
962 .map_materialized_value(move |handle| {
963 *observed.lock().unwrap() = Some(handle.clone());
964 handle
965 }),
966 );
967
968 let completion = Source::from_iter(std::iter::empty::<i32>())
969 .via(flow)
970 .run_with(Sink::ignore())
971 .unwrap();
972 assert!(wait_until(StdDuration::from_secs(1), || {
973 cancellable
974 .lock()
975 .unwrap()
976 .as_ref()
977 .is_some_and(Cancellable::is_cancelled)
978 }));
979 assert_eq!(wait(completion), NotUsed);
980 }
981
982 #[test]
983 fn bidi_flow_join_and_atop_compose() {
984 let codec = BidiFlow::from_flows(
985 Flow::identity().map(|item: i32| item + 1),
986 Flow::identity().map(|item: i32| item * 2),
987 )
988 .named("codec");
989 let framing = BidiFlow::from_flows(
990 Flow::identity().map(|item: i32| item * 3),
991 Flow::identity().map(|item: i32| item - 4),
992 );
993
994 let joined = codec
995 .clone()
996 .join(Flow::identity().map(|item: i32| item - 5));
997 let stacked = codec.atop(framing).join(Flow::identity());
998
999 assert_eq!(
1000 Source::single(10).via(joined).run_collect().unwrap(),
1001 vec![12]
1002 );
1003 assert_eq!(
1004 Source::single(10).via(stacked).run_collect().unwrap(),
1005 vec![58]
1006 );
1007 }
1008
1009 #[test]
1010 fn flow_buffer_then_map_runs_end_to_end() {
1011 let flow = Flow::identity()
1012 .buffer(8, OverflowStrategy::Backpressure)
1013 .map(|item: i32| item + 1);
1014
1015 let result = Source::from_iter(0..4).via(flow).run_collect().unwrap();
1016
1017 assert_eq!(result, vec![1, 2, 3, 4]);
1018 }
1019
1020 #[test]
1021 fn public_flow_combinators_preserve_runtime_transform_after_buffer() {
1022 fn buffered_flow() -> Flow<i32, i32> {
1023 Flow::identity().buffer(8, OverflowStrategy::Backpressure)
1024 }
1025
1026 assert_eq!(
1027 Source::from_iter(0..4)
1028 .via(buffered_flow().filter(|item| *item % 2 == 0))
1029 .run_collect()
1030 .unwrap(),
1031 vec![0, 2]
1032 );
1033 assert_eq!(
1034 Source::from_iter(0..4)
1035 .via(buffered_flow().filter_not(|item| *item % 2 == 0))
1036 .run_collect()
1037 .unwrap(),
1038 vec![1, 3]
1039 );
1040 assert_eq!(
1041 Source::from_iter(0..4)
1042 .via(buffered_flow().filter_map(|item| (item % 2 == 0).then_some(item + 10)))
1043 .run_collect()
1044 .unwrap(),
1045 vec![10, 12]
1046 );
1047 assert_eq!(
1048 Source::from_iter(0..3)
1049 .via(buffered_flow().map_concat(|item| [item, item + 10]))
1050 .run_collect()
1051 .unwrap(),
1052 vec![0, 10, 1, 11, 2, 12]
1053 );
1054 assert_eq!(
1055 Source::from_iter(0..3)
1056 .via(buffered_flow().stateful_map(5, |state, item| {
1057 *state += item;
1058 *state
1059 }))
1060 .run_collect()
1061 .unwrap(),
1062 vec![5, 6, 8]
1063 );
1064 assert_eq!(
1065 Source::from_iter(0..3)
1066 .via(buffered_flow().stateful_map_concat(0, |state, item| {
1067 *state += item;
1068 [*state, item]
1069 }))
1070 .run_collect()
1071 .unwrap(),
1072 vec![0, 0, 1, 1, 3, 2]
1073 );
1074 assert_eq!(
1075 Source::from_iter(0..4)
1076 .via(buffered_flow().map_async(2, |item| async move { Ok(item + 1) }))
1077 .run_collect()
1078 .unwrap(),
1079 vec![1, 2, 3, 4]
1080 );
1081 assert_eq!(
1082 Source::from_iter(0..4)
1083 .via(buffered_flow().map_async_unordered(2, |item| async move { Ok(item + 1) }))
1084 .run_collect()
1085 .unwrap(),
1086 vec![1, 2, 3, 4]
1087 );
1088 assert_eq!(
1089 Source::from_iter(0..4)
1090 .via(buffered_flow().map_async_partitioned(
1091 2,
1092 1,
1093 |item| item % 2,
1094 |item| async move { Ok(item + 1) },
1095 ))
1096 .run_collect()
1097 .unwrap(),
1098 vec![1, 2, 3, 4]
1099 );
1100 assert_eq!(
1101 Source::from_iter(0..5)
1102 .via(buffered_flow().take(3))
1103 .run_collect()
1104 .unwrap(),
1105 vec![0, 1, 2]
1106 );
1107 assert_eq!(
1108 Source::from_iter(0..5)
1109 .via(buffered_flow().drop(2))
1110 .run_collect()
1111 .unwrap(),
1112 vec![2, 3, 4]
1113 );
1114 assert_eq!(
1115 Source::from_iter(0..5)
1116 .via(buffered_flow().take_while(|item| *item < 3))
1117 .run_collect()
1118 .unwrap(),
1119 vec![0, 1, 2]
1120 );
1121 assert_eq!(
1122 Source::from_iter(0..5)
1123 .via(buffered_flow().drop_while(|item| *item < 3))
1124 .run_collect()
1125 .unwrap(),
1126 vec![3, 4]
1127 );
1128 assert_eq!(
1129 Source::from_iter(0..3)
1130 .via(buffered_flow().limit(5))
1131 .run_collect()
1132 .unwrap(),
1133 vec![0, 1, 2]
1134 );
1135 assert_eq!(
1136 Source::from_iter(0..5)
1137 .via(buffered_flow().grouped(2))
1138 .run_collect()
1139 .unwrap(),
1140 vec![vec![0, 1], vec![2, 3], vec![4]]
1141 );
1142 assert_eq!(
1143 Source::from_iter(1..=3)
1144 .via(buffered_flow().scan(0, |acc, item| acc + item))
1145 .run_collect()
1146 .unwrap(),
1147 vec![0, 1, 3, 6]
1148 );
1149 assert_eq!(
1150 Source::from_iter(1..=4)
1151 .via(buffered_flow().sliding(2, 1))
1152 .run_collect()
1153 .unwrap(),
1154 vec![vec![1, 2], vec![2, 3], vec![3, 4]]
1155 );
1156 assert_eq!(
1157 Source::from_iter(1..=4)
1158 .via(buffered_flow().fold(0, |acc, item| acc + item))
1159 .run_collect()
1160 .unwrap(),
1161 vec![10]
1162 );
1163 assert_eq!(
1164 Source::from_iter(1..=4)
1165 .via(buffered_flow().reduce(|acc, item| acc + item))
1166 .run_collect()
1167 .unwrap(),
1168 vec![10]
1169 );
1170 assert_eq!(
1171 Source::from_factory(|| {
1172 Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into())), Ok(2)].into_iter())
1173 })
1174 .via(buffered_flow().map_error(|_| StreamError::Failed("mapped".into())))
1175 .run_collect(),
1176 Err(StreamError::Failed("mapped".into()))
1177 );
1178 assert_eq!(
1179 Source::<i32>::failed(StreamError::Failed("boom".into()))
1180 .via(buffered_flow().recover(|_| Some(42)))
1181 .run_collect()
1182 .unwrap(),
1183 vec![42]
1184 );
1185 assert_eq!(
1186 Source::<i32>::failed(StreamError::Failed("boom".into()))
1187 .via(buffered_flow().recover_with(|_| Some(Source::from_iter([7, 8]))))
1188 .run_collect()
1189 .unwrap(),
1190 vec![7, 8]
1191 );
1192 assert_eq!(
1193 Source::<i32>::failed(StreamError::Failed("boom".into()))
1194 .via(buffered_flow().recover_with_retries(1, |_| Some(Source::from_iter([9]))))
1195 .run_collect()
1196 .unwrap(),
1197 vec![9]
1198 );
1199 assert_eq!(
1200 Source::from_factory(|| {
1201 Box::new(vec![Ok(1), Err(StreamError::Failed("ignored".into())), Ok(2)].into_iter())
1202 })
1203 .via(buffered_flow().on_error_complete())
1204 .run_collect()
1205 .unwrap(),
1206 vec![1]
1207 );
1208
1209 let materialized = Source::from_iter([1, 2, 3])
1210 .run_with(
1211 buffered_flow()
1212 .via(Flow::identity().map(|item| item + 1))
1213 .map_materialized_value(|_| "buffered-flow")
1214 .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both),
1215 )
1216 .unwrap();
1217 assert_eq!(materialized.0, "buffered-flow");
1218 assert_eq!(wait(materialized.1), 9);
1219
1220 let kept = Source::from_iter([1, 2, 3])
1221 .run_with(
1222 buffered_flow()
1223 .via_mat_with(Flow::identity().map(|item| item + 1), |_, _| "combined")
1224 .to(Sink::fold(0, |acc, item| acc + item)),
1225 )
1226 .unwrap();
1227 assert_eq!(kept, "combined");
1228 }
1229
1230 #[test]
1231 fn runtime_rate_flows_compose_in_flow_form() {
1232 let conflate = Flow::identity()
1233 .conflate(|left: i32, right| left + right)
1234 .map(|item| item + 1);
1235 assert_eq!(
1236 Source::single(4).via(conflate).run_collect().unwrap(),
1237 vec![5]
1238 );
1239
1240 let batch = Flow::identity()
1241 .batch(4, |item: i32| item, |left, right| left + right)
1242 .map(|item| item + 1);
1243 assert_eq!(Source::single(4).via(batch).run_collect().unwrap(), vec![5]);
1244
1245 let expand = Flow::identity()
1246 .expand(std::iter::once::<i32>)
1247 .map(|item| item + 1);
1248 assert_eq!(
1249 Source::from_iter(0..4).via(expand).run_collect().unwrap(),
1250 vec![1, 2, 3, 4]
1251 );
1252
1253 let aggregate = Flow::identity()
1254 .aggregate_with_boundary(
1255 Vec::<i32>::new,
1256 |mut items, item| {
1257 items.push(item);
1258 let ready = !items.is_empty();
1259 (items, ready)
1260 },
1261 |items| items.into_iter().sum::<i32>(),
1262 None,
1263 )
1264 .map(|item| item + 1);
1265 assert_eq!(
1266 Source::from_iter(0..4)
1267 .via(aggregate)
1268 .run_collect()
1269 .unwrap(),
1270 vec![1, 2, 3, 4]
1271 );
1272
1273 let detached = Flow::identity().detach().map(|item: i32| item + 1);
1274 assert_eq!(
1275 Source::from_iter(0..4).via(detached).run_collect().unwrap(),
1276 vec![1, 2, 3, 4]
1277 );
1278 }
1279
1280 #[test]
1281 fn high_use_source_flow_operators_work() {
1282 let result = Source::from_iter(0..8)
1283 .drop(1)
1284 .take(5)
1285 .filter_not(|item| item % 2 == 0)
1286 .map_concat(|item| [item, item + 10])
1287 .grouped(3)
1288 .run_collect()
1289 .unwrap();
1290
1291 assert_eq!(result, vec![vec![1, 11, 3], vec![13, 5, 15]]);
1292 }
1293
1294 #[test]
1295 fn prefix_and_tail_emits_prefix_and_live_tail() {
1296 let mut outer = Source::from_iter(0..5)
1297 .prefix_and_tail(2)
1298 .run_collect()
1299 .unwrap();
1300 assert_eq!(outer.len(), 1);
1301 let (prefix, tail) = outer.pop().unwrap();
1302 assert_eq!(prefix, vec![0, 1]);
1303 assert_eq!(tail.clone().run_collect().unwrap(), vec![2, 3, 4]);
1304 assert_eq!(
1305 tail.run_collect(),
1306 Err(StreamError::Failed(
1307 "substream source cannot be materialized more than once".into()
1308 ))
1309 );
1310 }
1311
1312 #[test]
1313 fn prefix_and_tail_fails_before_prefix_is_ready() {
1314 let result = Source::from_factory(|| {
1315 Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into())), Ok(2)].into_iter())
1316 })
1317 .prefix_and_tail(2)
1318 .run_collect();
1319 assert!(matches!(result, Err(StreamError::Failed(message)) if message == "boom"));
1320 }
1321
1322 #[test]
1323 fn prefix_and_tail_tail_propagates_late_upstream_failure() {
1324 let mut outer = Source::from_factory(|| {
1325 Box::new(vec![Ok(1), Ok(2), Err(StreamError::Failed("boom".into())), Ok(3)].into_iter())
1326 })
1327 .prefix_and_tail(2)
1328 .run_collect()
1329 .unwrap();
1330 let (prefix, tail) = outer.pop().unwrap();
1331 assert_eq!(prefix, vec![1, 2]);
1332 assert_eq!(tail.run_collect(), Err(StreamError::Failed("boom".into())));
1333 }
1334
1335 #[test]
1336 fn prefix_and_tail_accepts_non_clone_elements() {
1337 #[derive(Debug, PartialEq, Eq)]
1338 struct NonClone(u8);
1339
1340 let mut outer = Source::from_factory(|| {
1341 Box::new(vec![Ok(NonClone(1)), Ok(NonClone(2)), Ok(NonClone(3))].into_iter())
1342 })
1343 .prefix_and_tail(2)
1344 .run_collect()
1345 .unwrap();
1346 let (prefix, tail) = outer.pop().unwrap();
1347 assert_eq!(prefix, vec![NonClone(1), NonClone(2)]);
1348 assert_eq!(tail.run_collect().unwrap(), vec![NonClone(3)]);
1349 }
1350
1351 #[test]
1352 fn flat_map_prefix_materializes_on_short_upstream_completion() {
1353 let values = Source::from_iter([1, 2])
1354 .flat_map_prefix(3, |prefix| {
1355 let sum = prefix.into_iter().sum::<i32>();
1356 Flow::identity().prepend(Source::single(sum))
1357 })
1358 .run_collect()
1359 .unwrap();
1360 assert_eq!(values, vec![3]);
1361 }
1362
1363 #[test]
1364 fn flat_map_prefix_does_not_materialize_on_early_upstream_failure() {
1365 let invoked = StdArc::new(StdAtomicBool::new(false));
1366 let invoked_for_stage = StdArc::clone(&invoked);
1367 let result = Source::from_factory(|| {
1368 Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into()))].into_iter())
1369 })
1370 .flat_map_prefix(3, move |_prefix| {
1371 invoked_for_stage.store(true, StdOrdering::SeqCst);
1372 Flow::identity()
1373 })
1374 .run_collect();
1375 assert_eq!(result, Err(StreamError::Failed("boom".into())));
1376 assert!(!invoked.load(StdOrdering::SeqCst));
1377 }
1378
1379 #[test]
1380 fn flat_map_concat_flattens_nested_sources_sequentially() {
1381 let values = Source::from_iter([1, 2, 3])
1382 .flat_map_concat(|item| Source::from_iter(0..item))
1383 .run_collect()
1384 .unwrap();
1385 assert_eq!(values, vec![0, 0, 1, 0, 1, 2]);
1386 }
1387
1388 #[test]
1389 fn flat_map_merge_respects_breadth_bound() {
1390 let active = StdArc::new(StdAtomicUsize::new(0));
1391 let max_active = StdArc::new(StdAtomicUsize::new(0));
1392 let active_for_stage = StdArc::clone(&active);
1393 let max_for_stage = StdArc::clone(&max_active);
1394
1395 let mut values = Source::from_iter(0..6)
1396 .flat_map_merge(2, move |item| {
1397 let active = StdArc::clone(&active_for_stage);
1398 let max_active = StdArc::clone(&max_for_stage);
1399 Source::future(move || {
1400 let active = StdArc::clone(&active);
1401 let max_active = StdArc::clone(&max_active);
1402 async move {
1403 let now = active.fetch_add(1, StdOrdering::SeqCst) + 1;
1404 loop {
1405 let seen = max_active.load(StdOrdering::SeqCst);
1406 if now <= seen {
1407 break;
1408 }
1409 if max_active
1410 .compare_exchange(
1411 seen,
1412 now,
1413 StdOrdering::SeqCst,
1414 StdOrdering::SeqCst,
1415 )
1416 .is_ok()
1417 {
1418 break;
1419 }
1420 }
1421 thread::sleep(StdDuration::from_millis(20));
1422 active.fetch_sub(1, StdOrdering::SeqCst);
1423 Ok(item)
1424 }
1425 })
1426 })
1427 .run_collect()
1428 .unwrap();
1429 values.sort_unstable();
1430 assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
1431 assert!(max_active.load(StdOrdering::SeqCst) <= 2);
1432 }
1433
1434 #[test]
1435 fn flat_map_merge_propagates_inner_failures() {
1436 let result = Source::from_iter([0, 1, 2])
1437 .flat_map_merge(2, |item| {
1438 if item == 1 {
1439 Source::failed(StreamError::Failed("boom".into()))
1440 } else {
1441 Source::single(item)
1442 }
1443 })
1444 .run_collect();
1445 assert_eq!(result, Err(StreamError::Failed("boom".into())));
1446 }
1447
1448 #[test]
1449 fn flat_map_merge_emits_ready_inner_output_while_upstream_is_blocked() {
1450 let (release_tx, release_rx) = mpsc::channel();
1451 let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
1452 let queue = Source::from_factory(move || {
1453 let release_rx = StdArc::clone(&release_rx);
1454 let mut step = 0_u8;
1455 Box::new(std::iter::from_fn(move || {
1456 let item = match step {
1457 0 => Some(Ok(0)),
1458 1 => {
1459 release_rx
1460 .lock()
1461 .unwrap()
1462 .as_ref()
1463 .expect("release receiver available")
1464 .recv_timeout(StdDuration::from_secs(1))
1465 .expect("timed out waiting to release second upstream element");
1466 Some(Ok(1))
1467 }
1468 _ => None,
1469 };
1470 step += 1;
1471 item
1472 }))
1473 })
1474 .flat_map_merge(2, |item| Source::single(item + 10))
1475 .run_with(Sink::queue())
1476 .unwrap();
1477
1478 assert_eq!(queue.pull().unwrap(), Some(10));
1479 release_tx.send(()).unwrap();
1480 assert_eq!(queue.pull().unwrap(), Some(11));
1481 assert!(queue.pull().unwrap().is_none());
1482 }
1483
1484 #[test]
1485 fn group_by_routes_keys_and_drops_closed_keys() {
1486 let outer = Source::from_iter([0, 1, 2, 3, 4])
1487 .group_by(4, |item| item % 2, false)
1488 .run_with(Sink::queue())
1489 .unwrap();
1490
1491 let even = outer.pull().unwrap().unwrap();
1492 let even_completion = even.run_with(Sink::ignore()).unwrap();
1493 let odd = outer.pull().unwrap().unwrap();
1494 drop(even_completion);
1495
1496 assert_eq!(odd.run_collect().unwrap(), vec![1, 3]);
1497 assert!(outer.pull().unwrap().is_none());
1498 }
1499
1500 #[test]
1501 fn group_by_fails_when_distinct_key_limit_is_exceeded() {
1502 let outer = Source::from_iter([0, 1, 2])
1503 .group_by(2, |item| *item, false)
1504 .run_with(Sink::queue())
1505 .unwrap();
1506
1507 let _ = outer.pull().unwrap().unwrap();
1508 let _ = outer.pull().unwrap().unwrap();
1509 assert!(matches!(
1510 outer.pull(),
1511 Err(StreamError::Failed(message)) if message == "group_by reached max_substreams (2)"
1512 ));
1513 }
1514
1515 #[test]
1516 fn group_by_can_recreate_closed_substreams_when_enabled() {
1517 let (release_tx, release_rx) = mpsc::channel();
1518 let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
1519 let outer = Source::from_factory(move || {
1520 let release_rx = StdArc::clone(&release_rx);
1521 let mut step = 0_u8;
1522 Box::new(std::iter::from_fn(move || {
1523 let item = match step {
1524 0 => Some(Ok(0)),
1525 1 => Some(Ok(1)),
1526 2 => {
1527 release_rx
1528 .lock()
1529 .unwrap()
1530 .as_ref()
1531 .expect("release receiver available")
1532 .recv_timeout(StdDuration::from_secs(1))
1533 .expect("timed out waiting to release recreated key");
1534 Some(Ok(0))
1535 }
1536 _ => None,
1537 };
1538 step += 1;
1539 item
1540 }))
1541 })
1542 .group_by(4, |item| item % 2, true)
1543 .run_with(Sink::queue())
1544 .unwrap();
1545
1546 let even = outer.pull().unwrap().unwrap();
1547 assert_eq!(wait(even.run_with(Sink::head()).unwrap()), 0);
1548 release_tx.send(()).unwrap();
1549
1550 let odd = outer.pull().unwrap().unwrap();
1551 assert_eq!(odd.run_collect().unwrap(), vec![1]);
1552
1553 let recreated_even = outer.pull().unwrap().unwrap();
1554 assert_eq!(recreated_even.run_collect().unwrap(), vec![0]);
1555 assert!(outer.pull().unwrap().is_none());
1556 }
1557
1558 #[test]
1559 fn group_by_panicking_key_fn_abruptly_terminates_live_substreams() {
1560 let outer = Source::from_iter([0, 1])
1561 .group_by(
1562 4,
1563 |item| {
1564 assert_ne!(*item, 1, "boom");
1565 item % 2
1566 },
1567 false,
1568 )
1569 .run_with(Sink::queue())
1570 .unwrap();
1571
1572 let substream = outer.pull().unwrap().unwrap();
1573 let (result_tx, result_rx) = mpsc::channel();
1574 thread::spawn(move || {
1575 let _ = result_tx.send(substream.run_collect());
1576 });
1577
1578 assert_eq!(
1579 result_rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1580 Err(StreamError::AbruptTermination)
1581 );
1582 assert!(matches!(outer.pull(), Err(StreamError::AbruptTermination)));
1583 }
1584
1585 #[test]
1586 fn split_when_starts_new_substream_on_boundary_element() {
1587 let outer = Source::from_iter([1, 2, 0, 3, 0, 4, 5])
1588 .split_when(|item| *item == 0)
1589 .run_with(Sink::queue())
1590 .unwrap();
1591
1592 let first = outer.pull().unwrap().unwrap();
1593 assert_eq!(first.run_collect().unwrap(), vec![1, 2]);
1594 let second = outer.pull().unwrap().unwrap();
1595 assert_eq!(second.run_collect().unwrap(), vec![0, 3]);
1596 let third = outer.pull().unwrap().unwrap();
1597 assert_eq!(third.run_collect().unwrap(), vec![0, 4, 5]);
1598 assert!(outer.pull().unwrap().is_none());
1599 }
1600
1601 #[test]
1602 fn split_after_ends_current_substream_on_boundary_element() {
1603 let outer = Source::from_iter([1, 2, 0, 3, 0, 4, 5])
1604 .split_after(|item| *item == 0)
1605 .run_with(Sink::queue())
1606 .unwrap();
1607
1608 let first = outer.pull().unwrap().unwrap();
1609 assert_eq!(first.run_collect().unwrap(), vec![1, 2, 0]);
1610 let second = outer.pull().unwrap().unwrap();
1611 assert_eq!(second.run_collect().unwrap(), vec![3, 0]);
1612 let third = outer.pull().unwrap().unwrap();
1613 assert_eq!(third.run_collect().unwrap(), vec![4, 5]);
1614 assert!(outer.pull().unwrap().is_none());
1615 }
1616
1617 #[test]
1618 fn split_when_panicking_predicate_abruptly_terminates_live_substreams() {
1619 let outer = Source::from_iter([1, 2])
1620 .split_when(|item| {
1621 assert_ne!(*item, 2, "boom");
1622 false
1623 })
1624 .run_with(Sink::queue())
1625 .unwrap();
1626
1627 let substream = outer.pull().unwrap().unwrap();
1628 let (result_tx, result_rx) = mpsc::channel();
1629 thread::spawn(move || {
1630 let _ = result_tx.send(substream.run_collect());
1631 });
1632
1633 assert_eq!(
1634 result_rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1635 Err(StreamError::AbruptTermination)
1636 );
1637 assert!(matches!(outer.pull(), Err(StreamError::AbruptTermination)));
1638 }
1639
1640 #[test]
1641 fn split_when_pre_buffer_segments_match_expected_count() {
1642 let outer = Source::from_iter(0..100)
1643 .split_when(|item| *item != 0 && *item % 10 == 0)
1644 .run_with(Sink::queue())
1645 .unwrap();
1646 let mut segment_count = 0;
1647 while let Some(substream) = outer.pull().unwrap() {
1648 let items: Vec<i32> = substream.run_collect().unwrap();
1649 assert!(!items.is_empty(), "segment should not be empty");
1650 segment_count += 1;
1651 }
1652 assert_eq!(segment_count, 10, "100 elements in segments of 10");
1653 }
1654
1655 #[test]
1656 fn split_after_pre_buffer_segments_match_expected_count() {
1657 let outer = Source::from_iter(0..100)
1658 .split_after(|item| (*item + 1) % 10 == 0)
1659 .run_with(Sink::queue())
1660 .unwrap();
1661 let mut segment_count = 0;
1662 let mut total = 0_i32;
1663 while let Some(substream) = outer.pull().unwrap() {
1664 let items: Vec<i32> = substream.run_collect().unwrap();
1665 assert!(!items.is_empty(), "segment should not be empty");
1666 total += items.len() as i32;
1667 segment_count += 1;
1668 }
1669 assert_eq!(segment_count, 10);
1670 assert_eq!(total, 100);
1671 }
1672
1673 #[test]
1674 fn group_by_single_key_fused_matches_general_path() {
1675 let outer = Source::from_iter(0..1000i64)
1676 .group_by(1, |_| 0u8, false)
1677 .run_with(Sink::queue())
1678 .unwrap();
1679 let substream = outer.pull().unwrap().unwrap();
1680 let items: Vec<i64> = substream.run_collect().unwrap();
1681 assert_eq!(items.len(), 1000);
1682 assert_eq!(items[0], 0);
1683 assert_eq!(items[999], 999);
1684 assert!(outer.pull().unwrap().is_none());
1685 }
1686
1687 #[test]
1688 fn group_by_single_key_fused_handles_key_change_with_substream_limit() {
1689 let outer = Source::from_iter([0, 1, 0])
1690 .group_by(2, |item| *item, false)
1691 .run_with(Sink::queue())
1692 .unwrap();
1693 let mut sources = vec![];
1694 while let Some(source) = outer.pull().unwrap() {
1695 sources.push(source);
1696 }
1697 assert_eq!(sources.len(), 2);
1698 assert_eq!(sources[0].clone().run_collect().unwrap(), vec![0, 0]);
1699 assert_eq!(sources[1].clone().run_collect().unwrap(), vec![1]);
1700 }
1701
1702 #[test]
1703 fn flat_map_merge_lock_lighter_matches_expected_count() {
1704 let items = Source::from_iter(0..20)
1705 .flat_map_merge(2, |item| Source::single(item + 100))
1706 .run_with(Sink::queue())
1707 .unwrap();
1708 let mut count = 0;
1709 while items.pull().unwrap().is_some() {
1710 count += 1;
1711 }
1712 assert_eq!(count, 20);
1713 }
1714
1715 #[test]
1725 fn group_by_single_key_emits_substream_before_upstream_completes() {
1726 let (tx, rx) = mpsc::sync_channel::<i32>(0);
1729 let rx = StdArc::new(std::sync::Mutex::new(rx));
1730
1731 let outer = Source::from_factory({
1732 let rx = StdArc::clone(&rx);
1733 move || {
1734 let rx = StdArc::clone(&rx);
1735 Box::new(std::iter::from_fn(move || {
1736 rx.lock().unwrap().recv().ok().map(Ok)
1737 })) as BoxStream<i32>
1738 }
1739 })
1740 .group_by(1, |_| 0u8, false)
1741 .run_with(Sink::queue())
1742 .unwrap();
1743
1744 let (sub_tx, sub_rx) = mpsc::channel::<Source<i32>>();
1747 let outer_thread = thread::spawn(move || {
1748 let substream = outer.pull().unwrap().expect("expected a substream");
1749 sub_tx.send(substream).unwrap();
1750 });
1751
1752 tx.send(0).unwrap();
1755
1756 let substream = sub_rx
1758 .recv_timeout(StdDuration::from_secs(5))
1759 .expect("timed out — group_by buffered first element before emitting substream");
1760
1761 for i in 1..100_i32 {
1763 tx.send(i).unwrap();
1764 }
1765 drop(tx);
1766
1767 let items: Vec<i32> = substream.run_collect().unwrap();
1768 assert_eq!(items.len(), 100);
1769 outer_thread.join().unwrap();
1770 }
1771
1772 #[test]
1779 fn split_when_emits_substream_before_segment_ends() {
1780 const SEGMENT_LEN: usize = 300;
1784
1785 let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT_LEN * 2 + 4);
1786 for i in 0..SEGMENT_LEN as i32 {
1787 tx.send(i).unwrap();
1788 }
1789 tx.send(-1).unwrap(); tx.send(99).unwrap(); drop(tx);
1792
1793 let outer = Source::from_iter(rx)
1794 .split_when(|item| *item == -1)
1795 .run_with(Sink::queue())
1796 .unwrap();
1797
1798 let (result_tx, result_rx) = mpsc::channel();
1799 thread::spawn(move || {
1800 let first = outer.pull().unwrap().expect("expected first substream");
1801 let items: Vec<i32> = first.run_collect().unwrap();
1802 let second = outer.pull().unwrap().expect("expected second substream");
1803 let items2: Vec<i32> = second.run_collect().unwrap();
1804 let done = outer.pull().unwrap().is_none();
1805 let _ = result_tx.send((items, items2, done));
1806 });
1807
1808 let (items, items2, done) = result_rx
1809 .recv_timeout(StdDuration::from_secs(5))
1810 .expect("timed out — split_when is buffering the whole segment");
1811 assert_eq!(items.len(), SEGMENT_LEN);
1812 assert_eq!(items2, vec![-1, 99]);
1813 assert!(done);
1814 }
1815
1816 #[test]
1817 fn split_after_emits_substream_before_segment_ends() {
1818 const SEGMENT_LEN: usize = 300;
1819
1820 let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT_LEN * 2 + 4);
1821 for i in 0..SEGMENT_LEN as i32 {
1822 tx.send(i).unwrap();
1823 }
1824 tx.send(-1).unwrap(); tx.send(99).unwrap();
1826 drop(tx);
1827
1828 let outer = Source::from_iter(rx)
1829 .split_after(|item| *item == -1)
1830 .run_with(Sink::queue())
1831 .unwrap();
1832
1833 let (result_tx, result_rx) = mpsc::channel();
1834 thread::spawn(move || {
1835 let first = outer.pull().unwrap().expect("expected first substream");
1836 let items: Vec<i32> = first.run_collect().unwrap();
1837 let second = outer.pull().unwrap().expect("expected second substream");
1838 let items2: Vec<i32> = second.run_collect().unwrap();
1839 let done = outer.pull().unwrap().is_none();
1840 let _ = result_tx.send((items, items2, done));
1841 });
1842
1843 let (items, items2, done) = result_rx
1844 .recv_timeout(StdDuration::from_secs(5))
1845 .expect("timed out — split_after is buffering the whole segment");
1846 assert_eq!(items.len(), SEGMENT_LEN + 1);
1848 assert_eq!(items2, vec![99]);
1849 assert!(done);
1850 }
1851
1852 #[test]
1856 fn flat_map_merge_coordinator_no_lost_wakeup_stress() {
1857 for _ in 0..20 {
1858 let result = Source::from_iter(0..50_i32)
1859 .flat_map_merge(8, |item| Source::from_iter(item..item + 3))
1860 .run_with(Sink::fold(0i64, |acc, item| acc + item as i64))
1861 .unwrap()
1862 .wait();
1863 assert_eq!(result, Ok(3825), "flat_map_merge produced wrong sum");
1866 }
1867 }
1868
1869 #[test]
1874 fn flat_map_merge_single_mutex_race_stress() {
1875 for _ in 0..20 {
1876 let result = Source::from_iter(0..100_i64)
1877 .flat_map_merge(16, |item| Source::from_iter([item, item + 1000]))
1878 .run_with(Sink::fold(0i64, |acc, v| acc + v))
1879 .unwrap()
1880 .wait();
1881 assert_eq!(result, Ok(109_900), "flat_map_merge single-mutex stress");
1885 }
1886 }
1887
1888 #[test]
1895 fn split_when_bounded_memory_rendezvous() {
1896 const SEGMENT: usize = 100;
1899 let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT * 4);
1900 for i in 0..SEGMENT as i32 {
1901 tx.send(i).unwrap();
1902 }
1903 tx.send(-1).unwrap(); for i in 0..10_i32 {
1906 tx.send(i).unwrap();
1907 }
1908 drop(tx);
1909
1910 let outer = Source::from_iter(rx)
1911 .split_when(|item| *item == -1)
1912 .run_with(Sink::queue())
1913 .unwrap();
1914
1915 let (result_tx, result_rx) = mpsc::channel();
1916 thread::spawn(move || {
1917 let first = outer.pull().unwrap().expect("first segment");
1918 let seg1: Vec<i32> = first.run_collect().unwrap();
1919 let second = outer.pull().unwrap().expect("second segment");
1920 let seg2: Vec<i32> = second.run_collect().unwrap();
1921 let done = outer.pull().unwrap().is_none();
1922 result_tx.send((seg1, seg2, done)).unwrap();
1923 });
1924
1925 let (seg1, seg2, done) = result_rx
1926 .recv_timeout(StdDuration::from_secs(5))
1927 .expect("timed out — split_when writer held items past LIVE_SUBSTREAM_BATCH");
1928 assert_eq!(seg1.len(), SEGMENT, "first segment length");
1929 assert_eq!(seg2[0], -1, "boundary element starts second segment");
1930 assert_eq!(seg2.len(), 11, "second segment: boundary + 10 items");
1931 assert!(done);
1932 }
1933
1934 #[test]
1938 fn group_by_single_key_bounded_memory_rendezvous() {
1939 const N: usize = 200;
1942 let outer = Source::from_iter(0..N as i64)
1943 .group_by(1, |_| 0u8, false)
1944 .run_with(Sink::queue())
1945 .unwrap();
1946
1947 let (result_tx, result_rx) = mpsc::channel();
1948 thread::spawn(move || {
1949 let substream = outer.pull().unwrap().expect("substream");
1950 let items: Vec<i64> = substream.run_collect().unwrap();
1951 let done = outer.pull().unwrap().is_none();
1952 result_tx.send((items, done)).unwrap();
1953 });
1954
1955 let (items, done) = result_rx
1956 .recv_timeout(StdDuration::from_secs(5))
1957 .expect("timed out — group_by write batch held items beyond LIVE_SUBSTREAM_BATCH");
1958 assert_eq!(items.len(), N, "all items delivered");
1959 assert_eq!(items[0], 0);
1960 assert_eq!(items[N - 1], (N - 1) as i64);
1961 assert!(done);
1962 }
1963
1964 #[test]
1965 fn scan_emits_seed_and_accumulated_values() {
1966 let result = Source::from_iter(1..=3)
1967 .scan(0, |acc, item| acc + item)
1968 .run_collect()
1969 .unwrap();
1970
1971 assert_eq!(result, vec![0, 1, 3, 6]);
1972 }
1973
1974 #[test]
1975 fn limit_fails_after_max_elements() {
1976 let result = Source::from_iter(0..3).limit(2).run_collect();
1977
1978 assert_eq!(result, Err(StreamError::LimitExceeded { max: 2 }));
1979 }
1980
1981 #[test]
1982 fn limit_weighted_fails_with_limit_error_like_akka() {
1983 let result = Source::from_iter(["this", "is", "some", "string"])
1984 .via(Flow::identity().limit_weighted(15, |item: &&str| item.len()))
1985 .run_collect();
1986
1987 assert_eq!(result, Err(StreamError::LimitExceeded { max: 15 }));
1988 }
1989
1990 #[test]
1991 fn grouped_weighted_allows_oversized_first_element_like_akka() {
1992 let result = Source::from_iter([10_usize, 1, 2])
1993 .via(Flow::identity().grouped_weighted(5, |item: &usize| *item))
1994 .run_collect()
1995 .unwrap();
1996
1997 assert_eq!(result, vec![vec![10], vec![1, 2]]);
1998 }
1999
2000 #[test]
2001 fn grouped_weighted_keeps_oversized_later_element_in_current_group_like_akka() {
2002 let result = Source::from_iter([1_usize, 10, 2])
2003 .via(Flow::identity().grouped_weighted(5, |item: &usize| *item))
2004 .run_collect()
2005 .unwrap();
2006
2007 assert_eq!(result, vec![vec![1, 10], vec![2]]);
2008 }
2009
2010 #[test]
2011 fn sink_terminals_materialize_results() {
2012 let sum = Source::from_iter(1..=4)
2013 .run_with(Sink::fold(0, |acc, item| acc + item))
2014 .unwrap();
2015
2016 assert_eq!(wait(sum), 10);
2017 assert_eq!(
2018 wait(Source::from_iter(1..=4).run_with(Sink::head()).unwrap()),
2019 1
2020 );
2021 assert_eq!(
2022 wait(Source::from_iter(1..=4).run_with(Sink::last()).unwrap()),
2023 4
2024 );
2025 }
2026
2027 #[test]
2028 fn all_terminal_sink_variants_complete() {
2029 assert_eq!(
2030 wait(
2031 Source::from_iter([1, 2, 3])
2032 .run_with(Sink::collect())
2033 .unwrap()
2034 ),
2035 vec![1, 2, 3]
2036 );
2037 assert_eq!(
2038 wait(
2039 Source::<i32>::empty()
2040 .run_with(Sink::head_option())
2041 .unwrap()
2042 ),
2043 None
2044 );
2045 assert_eq!(
2046 wait(
2047 Source::from_iter([1, 2, 3])
2048 .run_with(Sink::last_option())
2049 .unwrap()
2050 ),
2051 Some(3)
2052 );
2053 assert_eq!(
2054 wait(
2055 Source::from_iter([1, 2, 3])
2056 .run_with(Sink::reduce(|acc, item| acc + item))
2057 .unwrap()
2058 ),
2059 6
2060 );
2061
2062 let seen = StdArc::new(StdAtomicUsize::new(0));
2063 let seen_by_sink = StdArc::clone(&seen);
2064 assert_eq!(
2065 wait(
2066 Source::from_iter([1_usize, 2, 3])
2067 .run_with(Sink::foreach(move |item| {
2068 seen_by_sink.fetch_add(item, StdOrdering::SeqCst);
2069 }))
2070 .unwrap()
2071 ),
2072 NotUsed
2073 );
2074 assert_eq!(seen.load(StdOrdering::SeqCst), 6);
2075 }
2076
2077 #[test]
2078 fn take_last_zero_returns_empty_vector() {
2079 let result = Source::from_iter([1, 2, 3])
2080 .run_with(Sink::take_last(0))
2081 .unwrap();
2082
2083 assert_eq!(wait(result), Vec::<i32>::new());
2084 }
2085
2086 #[test]
2087 fn bounded_head_terminals_complete_inline() {
2088 let materializer = Materializer::new();
2089
2090 let mut head = Source::from_iter(0_u64..1_000)
2091 .run_with_materializer(Sink::head(), &materializer)
2092 .unwrap();
2093 assert_eq!(materializer.active_streams(), 0);
2094 assert_eq!(head.try_wait(), Some(Ok(0)));
2095
2096 let mut filtered_head = Source::from_iter(0_u64..1_000)
2097 .filter(|item| *item >= 10)
2098 .run_with_materializer(Sink::head(), &materializer)
2099 .unwrap();
2100 assert_eq!(materializer.active_streams(), 0);
2101 assert_eq!(filtered_head.try_wait(), Some(Ok(10)));
2102
2103 let mut head_option = Source::<u64>::empty()
2104 .run_with_materializer(Sink::head_option(), &materializer)
2105 .unwrap();
2106 assert_eq!(materializer.active_streams(), 0);
2107 assert_eq!(head_option.try_wait(), Some(Ok(None)));
2108 }
2109
2110 #[test]
2111 fn bounded_head_fast_path_preserves_terminal_errors() {
2112 let materializer = Materializer::new();
2113
2114 let mut empty = Source::<u64>::empty()
2115 .run_with_materializer(Sink::head(), &materializer)
2116 .unwrap();
2117 assert_eq!(empty.try_wait(), Some(Err(StreamError::EmptyStream)));
2118
2119 let mut failed = Source::<u64>::failed(StreamError::Failed("boom".into()))
2120 .run_with_materializer(Sink::head(), &materializer)
2121 .unwrap();
2122 assert_eq!(
2123 failed.try_wait(),
2124 Some(Err(StreamError::Failed("boom".into())))
2125 );
2126 assert_eq!(materializer.active_streams(), 0);
2127 }
2128
2129 #[test]
2130 fn runnable_graph_composes_source_and_sink() {
2131 let graph = Source::from_iter(1..=4)
2132 .map(|item| item * 2)
2133 .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::right);
2134
2135 assert_eq!(wait(graph.run().unwrap()), 20);
2136
2137 let graph = Source::single(1)
2138 .map_materialized_value(|_| 20)
2139 .to(Sink::ignore())
2140 .map_materialized_value(|value| value + 1);
2141 assert_eq!(graph.run().unwrap(), 21);
2142
2143 let ignored = Source::single(1).to(Sink::ignore()).run().unwrap();
2144 assert_eq!(ignored, NotUsed);
2145 }
2146
2147 #[test]
2148 fn materialized_values_follow_keep_defaults() {
2149 let source = Source::single(1).map_materialized_value(|_| "source");
2150 let flow = Flow::identity().map_materialized_value(|_| "flow");
2151
2152 let source_mat = source.clone().via(flow.clone()).to(Sink::ignore()).run();
2153 assert_eq!(source_mat.unwrap(), "source");
2154
2155 let combined = source
2156 .via_mat(flow, Keep::both)
2157 .to_mat(Sink::ignore(), Keep::both)
2158 .run()
2159 .unwrap();
2160 assert_eq!(combined.0, ("source", "flow"));
2161 assert_eq!(wait(combined.1), NotUsed);
2162
2163 let sink_mat = Source::single(41)
2164 .map_materialized_value(|_| "ignored source")
2165 .run_with(Sink::fold(1, |acc, item| acc + item))
2166 .unwrap();
2167 assert_eq!(wait(sink_mat), 42);
2168 }
2169
2170 #[test]
2171 fn flow_to_sink_preserves_flow_materialized_value_by_default() {
2172 let sink = Flow::identity()
2173 .map(|item: i32| item + 1)
2174 .map_materialized_value(|_| "flow")
2175 .to(Sink::fold(0, |acc, item| acc + item));
2176
2177 let materialized = Source::from_iter([1, 2, 3]).run_with(sink).unwrap();
2178
2179 assert_eq!(materialized, "flow");
2180 let explicit = Flow::identity()
2181 .map(|item: i32| item + 1)
2182 .map_materialized_value(|_| "flow")
2183 .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both)
2184 .run_with(Source::from_iter([1, 2, 3]))
2185 .unwrap();
2186 assert_eq!(explicit, NotUsed);
2187
2188 let explicit = Source::from_iter([1, 2, 3])
2189 .run_with(
2190 Flow::identity()
2191 .map(|item: i32| item + 1)
2192 .map_materialized_value(|_| "flow")
2193 .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both),
2194 )
2195 .unwrap();
2196 assert_eq!(explicit.0, "flow");
2197 assert_eq!(wait(explicit.1), 9);
2198 }
2199
2200 #[test]
2201 fn materializer_shutdown_fails_materialization() {
2202 let materializer = Materializer::new();
2203 let named = materializer.with_name_prefix("test-stream");
2204 materializer.shutdown();
2205
2206 let graph = Source::single(1).to(Sink::ignore());
2207
2208 assert_eq!(named.name_prefix(), "test-stream");
2209 assert_eq!(
2210 graph.run_with_materializer(&named),
2211 Err(StreamError::AbruptTermination)
2212 );
2213 }
2214
2215 #[test]
2216 fn materializer_shutdown_fails_running_stream_completion() {
2217 let materializer = Materializer::new();
2218 let completion = Source::repeat(1)
2219 .run_with_materializer(Sink::ignore(), &materializer)
2220 .unwrap();
2221
2222 assert_eq!(materializer.active_streams(), 1);
2223 materializer.shutdown();
2224 assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
2225 assert_eq!(materializer.active_streams(), 0);
2226 }
2227
2228 #[test]
2229 fn dropped_stream_completion_cancels_running_stream() {
2230 let materializer = Materializer::new();
2231 let completion = Source::repeat(1)
2232 .run_with_materializer(Sink::ignore(), &materializer)
2233 .unwrap();
2234
2235 assert_eq!(materializer.active_streams(), 1);
2236 drop(completion);
2237 for _ in 0..50 {
2238 if materializer.active_streams() == 0 {
2239 break;
2240 }
2241 thread::sleep(Duration::from_millis(5));
2242 }
2243 assert_eq!(materializer.active_streams(), 0);
2244 }
2245
2246 #[test]
2247 fn runtime_timers_fire_cancel_and_stop_on_shutdown() {
2248 let materializer = Materializer::new();
2249 let (once_tx, once_rx) = mpsc::channel();
2250 let once = materializer.schedule_once(Duration::from_millis(5), move || {
2251 once_tx.send(()).unwrap();
2252 });
2253 once_rx.recv_timeout(Duration::from_millis(250)).unwrap();
2254 assert!(!once.is_cancelled());
2255
2256 let (cancelled_tx, cancelled_rx) = mpsc::channel();
2257 let cancelled = materializer.schedule_once(Duration::from_millis(25), move || {
2258 cancelled_tx.send(()).unwrap();
2259 });
2260 assert!(cancelled.cancel());
2261 assert!(!cancelled.cancel());
2262 assert!(cancelled.is_cancelled());
2263 assert!(
2264 cancelled_rx
2265 .recv_timeout(Duration::from_millis(75))
2266 .is_err()
2267 );
2268
2269 let fixed_delay_count = StdArc::new(StdAtomicUsize::new(0));
2270 let fixed_delay_task_count = StdArc::clone(&fixed_delay_count);
2271 let fixed_delay = materializer.schedule_with_fixed_delay(
2272 Duration::from_millis(1),
2273 Duration::from_millis(5),
2274 move || {
2275 fixed_delay_task_count.fetch_add(1, StdOrdering::SeqCst);
2276 },
2277 );
2278 thread::sleep(Duration::from_millis(25));
2279 assert!(fixed_delay_count.load(StdOrdering::SeqCst) > 0);
2280 fixed_delay.cancel();
2281
2282 let fixed_rate_count = StdArc::new(StdAtomicUsize::new(0));
2283 let fixed_rate_task_count = StdArc::clone(&fixed_rate_count);
2284 let fixed_rate = materializer.schedule_at_fixed_rate(
2285 Duration::from_millis(1),
2286 Duration::from_millis(5),
2287 move || {
2288 fixed_rate_task_count.fetch_add(1, StdOrdering::SeqCst);
2289 },
2290 );
2291 thread::sleep(Duration::from_millis(25));
2292 assert!(fixed_rate_count.load(StdOrdering::SeqCst) > 0);
2293 fixed_rate.cancel();
2294
2295 let shutdown_materializer = Materializer::new();
2296 let (shutdown_tx, shutdown_rx) = mpsc::channel();
2297 shutdown_materializer.schedule_once(Duration::from_millis(25), move || {
2298 shutdown_tx.send(()).unwrap();
2299 });
2300 shutdown_materializer.shutdown();
2301 assert!(shutdown_rx.recv_timeout(Duration::from_millis(75)).is_err());
2302 }
2303
2304 #[test]
2305 fn runtime_timer_driver_preserves_fixed_rate_cadence_under_slow_tasks() {
2306 let interval = Duration::from_millis(20);
2307 let task_time = Duration::from_millis(50);
2308
2309 let rate_materializer = Materializer::new();
2310 let (rate_tx, rate_rx) = mpsc::channel();
2311 let fixed_rate =
2312 rate_materializer.schedule_at_fixed_rate(Duration::ZERO, interval, move || {
2313 thread::sleep(task_time);
2314 rate_tx.send(Instant::now()).unwrap();
2315 });
2316 let rate_first = rate_rx.recv_timeout(Duration::from_millis(250)).unwrap();
2317 let rate_second = rate_rx.recv_timeout(Duration::from_millis(250)).unwrap();
2318 fixed_rate.cancel();
2319 rate_materializer.shutdown();
2320
2321 let delay_materializer = Materializer::new();
2322 let (delay_tx, delay_rx) = mpsc::channel();
2323 let fixed_delay =
2324 delay_materializer.schedule_with_fixed_delay(Duration::ZERO, interval, move || {
2325 thread::sleep(task_time);
2326 delay_tx.send(Instant::now()).unwrap();
2327 });
2328 let delay_first = delay_rx.recv_timeout(Duration::from_millis(250)).unwrap();
2329 let delay_second = delay_rx.recv_timeout(Duration::from_millis(250)).unwrap();
2330 fixed_delay.cancel();
2331 delay_materializer.shutdown();
2332
2333 let rate_gap = rate_second.duration_since(rate_first);
2334 let delay_gap = delay_second.duration_since(delay_first);
2335 assert!(
2336 delay_gap > rate_gap + Duration::from_millis(12),
2337 "fixed-rate gap {rate_gap:?} should stay meaningfully below fixed-delay gap {delay_gap:?}",
2338 );
2339 }
2340
2341 #[test]
2342 fn runtime_repeating_timer_cancellation_stops_future_fires() {
2343 let materializer = Materializer::new();
2344 let (tx, rx) = mpsc::channel();
2345 let timer = materializer.schedule_at_fixed_rate(
2346 Duration::from_millis(1),
2347 Duration::from_millis(30),
2348 move || {
2349 tx.send(()).unwrap();
2350 },
2351 );
2352
2353 rx.recv_timeout(Duration::from_millis(250)).unwrap();
2354 assert!(timer.cancel());
2355 assert!(rx.recv_timeout(Duration::from_millis(90)).is_err());
2356 materializer.shutdown();
2357 }
2358
2359 #[test]
2360 fn runtime_panicking_once_timer_does_not_kill_driver_or_later_timers() {
2361 let materializer = Materializer::new();
2362 materializer.schedule_once(Duration::from_millis(1), || {
2363 panic!("timer boom");
2364 });
2365
2366 let (tx, rx) = mpsc::channel();
2367 materializer.schedule_once(Duration::from_millis(20), move || {
2368 tx.send(()).unwrap();
2369 });
2370
2371 rx.recv_timeout(Duration::from_millis(250)).unwrap();
2372 materializer.shutdown();
2373 }
2374
2375 #[test]
2376 fn runtime_panicking_fixed_rate_timer_stops_itself_and_leaves_driver_alive() {
2377 let materializer = Materializer::new();
2378 let panic_count = StdArc::new(StdAtomicUsize::new(0));
2379 let panic_count_task = StdArc::clone(&panic_count);
2380 materializer.schedule_at_fixed_rate(Duration::ZERO, Duration::from_millis(20), move || {
2381 panic_count_task.fetch_add(1, StdOrdering::SeqCst);
2382 panic!("fixed-rate boom");
2383 });
2384
2385 assert!(wait_until(Duration::from_millis(150), || {
2386 panic_count.load(StdOrdering::SeqCst) == 1
2387 }));
2388
2389 let (tx, rx) = mpsc::channel();
2390 materializer.schedule_once(Duration::from_millis(30), move || {
2391 tx.send(()).unwrap();
2392 });
2393 rx.recv_timeout(Duration::from_millis(250)).unwrap();
2394
2395 thread::sleep(Duration::from_millis(90));
2396 assert_eq!(panic_count.load(StdOrdering::SeqCst), 1);
2397 materializer.shutdown();
2398 }
2399
2400 #[test]
2401 fn runtime_slow_timer_task_does_not_delay_unrelated_timers() {
2402 let materializer = Materializer::new();
2403 let started = StdArc::new(StdAtomicBool::new(false));
2404 let started_task = StdArc::clone(&started);
2405 let slow_timer = materializer.schedule_at_fixed_rate(
2406 Duration::ZERO,
2407 Duration::from_millis(250),
2408 move || {
2409 started_task.store(true, StdOrdering::SeqCst);
2410 thread::sleep(Duration::from_millis(200));
2411 },
2412 );
2413
2414 assert!(wait_until(Duration::from_millis(100), || {
2415 started.load(StdOrdering::SeqCst)
2416 }));
2417
2418 let start = Instant::now();
2419 let (tx, rx) = mpsc::channel();
2420 materializer.schedule_once(Duration::from_millis(10), move || {
2421 tx.send(Instant::now()).unwrap();
2422 });
2423 let fired_at = rx.recv_timeout(Duration::from_millis(350)).unwrap();
2424 let elapsed = fired_at.duration_since(start);
2425
2426 slow_timer.cancel();
2427 materializer.shutdown();
2428 assert!(
2429 elapsed < Duration::from_millis(150),
2430 "unrelated timer was delayed by a blocking timer task: {elapsed:?}",
2431 );
2432 }
2433
2434 #[test]
2435 fn runtime_shutdown_stops_timer_driver_thread() {
2436 let materializer = Materializer::new();
2437 assert!(wait_until(Duration::from_secs(1), || materializer
2438 .timer_driver_is_live()));
2439
2440 materializer.shutdown();
2441 assert!(wait_until(Duration::from_secs(2), || !materializer
2442 .timer_driver_is_live()));
2443 }
2444
2445 #[test]
2446 fn runtime_timer_driver_orders_many_timers_by_deadline() {
2447 let materializer = Materializer::new();
2448 let (tx, rx) = mpsc::channel();
2449 let schedule = [(45_u64, 4_u8), (5, 1), (35, 3), (15, 2), (55, 5)];
2450
2451 for (delay_ms, value) in schedule {
2452 let tx = tx.clone();
2453 materializer.schedule_once(Duration::from_millis(delay_ms), move || {
2454 tx.send(value).unwrap();
2455 });
2456 }
2457 drop(tx);
2458
2459 let mut received = Vec::new();
2460 for _ in 0..schedule.len() {
2461 received.push(rx.recv_timeout(Duration::from_millis(250)).unwrap());
2462 }
2463 materializer.shutdown();
2464
2465 assert_eq!(received, vec![1, 2, 3, 4, 5]);
2466 }
2467
2468 #[test]
2469 fn runtime_timer_driver_uses_one_thread_per_runtime_regardless_of_timer_count() {
2470 let materializer = Materializer::new();
2471 let thread_name = materializer.timer_thread_name().to_owned();
2472 assert!(wait_until(Duration::from_millis(200), || {
2473 linux_thread_count(&thread_name) == 1
2474 }));
2475
2476 for _ in 0..128 {
2477 materializer.schedule_once(Duration::from_secs(60), || {});
2478 }
2479
2480 assert!(
2481 linux_thread_count(&thread_name) == 1,
2482 "scheduling timers should not create extra timer threads for a runtime",
2483 );
2484 materializer.shutdown();
2485 assert!(wait_until(Duration::from_millis(200), || {
2486 linux_thread_count(&thread_name) == 0
2487 }));
2488 }
2489
2490 #[test]
2491 fn cancelled_and_never_sinks_have_distinct_materialization_results() {
2492 assert_eq!(
2493 Source::repeat(1)
2494 .run_with(Sink::cancelled())
2495 .expect("cancelled sink materializes"),
2496 NotUsed
2497 );
2498 assert_eq!(
2499 Source::single(1)
2500 .run_with(Sink::never())
2501 .expect("never sink materializes")
2502 .try_wait(),
2503 None
2504 );
2505 }
2506
2507 #[test]
2508 fn never_sink_finishes_on_materializer_shutdown() {
2509 let materializer = Materializer::new();
2510 let completion = Source::single(1)
2511 .run_with_materializer(Sink::never(), &materializer)
2512 .unwrap();
2513
2514 materializer.shutdown();
2515 assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
2516 }
2517
2518 #[test]
2519 fn dropping_source_never_completion_releases_parked_worker() {
2520 let materializer = Materializer::new();
2521 let completion = Source::<i32>::never()
2522 .run_with_materializer(Sink::ignore(), &materializer)
2523 .unwrap();
2524
2525 assert!(wait_until(StdDuration::from_secs(1), || {
2526 materializer.active_streams() == 1
2527 }));
2528 assert_eq!(materializer.active_streams(), 1);
2529
2530 drop(completion);
2531
2532 assert!(wait_until(StdDuration::from_secs(15), || {
2533 materializer.active_streams() == 0
2534 }));
2535 assert_eq!(materializer.active_streams(), 0);
2536 }
2537
2538 #[test]
2539 fn future_and_maybe_sources_emit_values() {
2540 let future_value = Source::future(|| async { Ok(7) }).run_collect().unwrap();
2541 assert_eq!(future_value, vec![7]);
2542
2543 let future_source = Source::future_source(|| async { Ok(Source::from_iter([1, 2, 3])) })
2544 .run_collect()
2545 .unwrap();
2546 assert_eq!(future_source, vec![1, 2, 3]);
2547
2548 let (handle, source) = Source::maybe();
2549 assert_eq!(
2550 source.clone().run_collect(),
2551 Err(StreamError::MaybeIncomplete)
2552 );
2553 handle.complete(9).unwrap();
2554 assert_eq!(source.run_collect().unwrap(), vec![9]);
2555 }
2556
2557 #[test]
2558 fn wp6b_source_generators_emit_and_fail_like_stream_errors() {
2559 assert_eq!(
2560 Source::cycle(|| [1, 2, 3].into_iter())
2561 .take(8)
2562 .run_collect()
2563 .unwrap(),
2564 vec![1, 2, 3, 1, 2, 3, 1, 2]
2565 );
2566 assert_eq!(
2567 Source::<i32>::cycle(std::iter::empty::<i32>).run_collect(),
2568 Err(StreamError::Failed("empty iterator".into()))
2569 );
2570 assert_eq!(
2571 Source::unfold(0, |state| (state < 4).then_some((state + 1, state)))
2572 .run_collect()
2573 .unwrap(),
2574 vec![0, 1, 2, 3]
2575 );
2576 assert_eq!(
2577 Source::unfold_async(0, |state| async move {
2578 Ok((state < 4).then_some((state + 1, state * 2)))
2579 })
2580 .run_collect()
2581 .unwrap(),
2582 vec![0, 2, 4, 6]
2583 );
2584 assert!(matches!(
2585 Source::<i32>::lazy_single(|| panic!("boom")).run_collect(),
2586 Err(StreamError::Failed(message)) if message == "lazy_single factory panicked"
2587 ));
2588 }
2589
2590 #[test]
2591 fn wp6b_lazy_sources_defer_until_first_pull_and_complete_deferred_mat() {
2592 let created = StdArc::new(StdAtomicUsize::new(0));
2593 let created_for_source = StdArc::clone(&created);
2594 let source = Source::<i32>::lazy_source(move || {
2595 created_for_source.fetch_add(1, StdOrdering::SeqCst);
2596 Source::from_iter([7, 8]).map_materialized_value(|_| 99)
2597 });
2598 let materializer = Materializer::new();
2599 let (mut stream, mut mat) = StdArc::clone(&source.factory)
2600 .create(&materializer)
2601 .unwrap();
2602
2603 assert_eq!(created.load(StdOrdering::SeqCst), 0);
2604 assert!(mat.try_wait().is_none());
2605 assert_eq!(stream.next().unwrap().unwrap(), 7);
2606 assert_eq!(mat.wait().unwrap(), 99);
2607 assert_eq!(created.load(StdOrdering::SeqCst), 1);
2608 assert_eq!(stream.next().unwrap().unwrap(), 8);
2609
2610 let never_created = StdArc::new(StdAtomicUsize::new(0));
2611 let never_created_for_source = StdArc::clone(&never_created);
2612 let mat = Source::<i32>::lazy_future_source(move || {
2613 never_created_for_source.fetch_add(1, StdOrdering::SeqCst);
2614 async { Ok(Source::single(1)) }
2615 })
2616 .to(Sink::cancelled())
2617 .run()
2618 .unwrap();
2619 assert!(matches!(mat.wait(), Err(StreamError::Failed(_))));
2620 assert_eq!(never_created.load(StdOrdering::SeqCst), 0);
2621
2622 let lazy_future = StdArc::new(StdAtomicUsize::new(0));
2623 let lazy_future_for_source = StdArc::clone(&lazy_future);
2624 let source = Source::lazy_future(move || {
2625 lazy_future_for_source.fetch_add(1, StdOrdering::SeqCst);
2626 async { Ok(42) }
2627 });
2628 let (mut stream, _) = StdArc::clone(&source.factory)
2629 .create(&Materializer::new())
2630 .unwrap();
2631 assert_eq!(lazy_future.load(StdOrdering::SeqCst), 0);
2632 assert_eq!(stream.next().unwrap().unwrap(), 42);
2633 assert_eq!(lazy_future.load(StdOrdering::SeqCst), 1);
2634 }
2635
2636 #[test]
2637 fn wp6b_unfold_resource_closes_on_completion_failure_and_cancellation() {
2638 let closed = StdArc::new(StdAtomicUsize::new(0));
2639 let closed_on_complete = StdArc::clone(&closed);
2640 let values = Source::unfold_resource(
2641 || Ok(std::collections::VecDeque::from([1, 2, 3])),
2642 |items| Ok(items.pop_front()),
2643 move |_items| {
2644 closed_on_complete.fetch_add(1, StdOrdering::SeqCst);
2645 Ok(())
2646 },
2647 )
2648 .run_collect()
2649 .unwrap();
2650 assert_eq!(values, vec![1, 2, 3]);
2651 assert_eq!(closed.load(StdOrdering::SeqCst), 1);
2652
2653 let closed_on_failure = StdArc::new(StdAtomicUsize::new(0));
2654 let closed_on_failure_for_close = StdArc::clone(&closed_on_failure);
2655 let failed = Source::<i32>::unfold_resource(
2656 || Ok(()),
2657 |_| Err(StreamError::Failed("read".into())),
2658 move |_| {
2659 closed_on_failure_for_close.fetch_add(1, StdOrdering::SeqCst);
2660 Err(StreamError::Failed("close".into()))
2661 },
2662 )
2663 .run_collect();
2664 assert_eq!(failed, Err(StreamError::Failed("read".into())));
2665 assert_eq!(closed_on_failure.load(StdOrdering::SeqCst), 1);
2666
2667 let closed_on_cancel = StdArc::new(StdAtomicUsize::new(0));
2668 let closed_on_cancel_for_close = StdArc::clone(&closed_on_cancel);
2669 let first = Source::unfold_resource(
2670 || Ok(0_usize),
2671 |next| {
2672 let item = *next;
2673 *next += 1;
2674 Ok(Some(item))
2675 },
2676 move |_| {
2677 closed_on_cancel_for_close.fetch_add(1, StdOrdering::SeqCst);
2678 Ok(())
2679 },
2680 )
2681 .run_with(Sink::head())
2682 .unwrap();
2683 assert_eq!(first.wait().unwrap(), 0);
2684 assert!(wait_until(Duration::from_millis(250), || {
2685 closed_on_cancel.load(StdOrdering::SeqCst) == 1
2686 }));
2687 }
2688
2689 #[test]
2690 fn wp6b_async_resource_and_async_accumulators_are_sequential() {
2691 let closed = StdArc::new(StdAtomicUsize::new(0));
2692 let closed_for_close = StdArc::clone(&closed);
2693 let values = Source::unfold_resource_async(
2694 || async { Ok(std::collections::VecDeque::from([1, 2, 3])) },
2695 |items| {
2696 let item = items.pop_front();
2697 async move { Ok(item) }
2698 },
2699 move |_items| {
2700 let closed = StdArc::clone(&closed_for_close);
2701 async move {
2702 closed.fetch_add(1, StdOrdering::SeqCst);
2703 Ok(())
2704 }
2705 },
2706 )
2707 .run_collect()
2708 .unwrap();
2709 assert_eq!(values, vec![1, 2, 3]);
2710 assert_eq!(closed.load(StdOrdering::SeqCst), 1);
2711
2712 let closed_on_failure = StdArc::new(StdAtomicUsize::new(0));
2713 let closed_on_failure_for_close = StdArc::clone(&closed_on_failure);
2714 let failed = Source::<i32>::unfold_resource_async(
2715 || async { Ok(()) },
2716 |_resource| async { Err(StreamError::Failed("read".into())) },
2717 move |_resource| {
2718 let closed_on_failure = StdArc::clone(&closed_on_failure_for_close);
2719 async move {
2720 closed_on_failure.fetch_add(1, StdOrdering::SeqCst);
2721 Err(StreamError::Failed("close".into()))
2722 }
2723 },
2724 )
2725 .run_collect();
2726 assert_eq!(failed, Err(StreamError::Failed("read".into())));
2727 assert_eq!(closed_on_failure.load(StdOrdering::SeqCst), 1);
2728
2729 let active = StdArc::new(StdAtomicUsize::new(0));
2730 let max_active = StdArc::new(StdAtomicUsize::new(0));
2731 let active_for_stage = StdArc::clone(&active);
2732 let max_for_stage = StdArc::clone(&max_active);
2733 let scanned = Source::from_iter(1..=4)
2734 .scan_async(0, move |acc, item| {
2735 let active = StdArc::clone(&active_for_stage);
2736 let max_active = StdArc::clone(&max_for_stage);
2737 async move {
2738 let now = active.fetch_add(1, StdOrdering::SeqCst) + 1;
2739 max_active.fetch_max(now, StdOrdering::SeqCst);
2740 tokio::time::sleep(Duration::from_millis(1)).await;
2741 active.fetch_sub(1, StdOrdering::SeqCst);
2742 Ok(acc + item)
2743 }
2744 })
2745 .run_collect()
2746 .unwrap();
2747 assert_eq!(scanned, vec![0, 1, 3, 6, 10]);
2748 assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
2749
2750 let folded = Source::from_iter(1..=4)
2751 .fold_async(0, |acc, item| async move { Ok(acc + item) })
2752 .run_collect()
2753 .unwrap();
2754 assert_eq!(folded, vec![10]);
2755 }
2756
2757 #[test]
2758 fn wp6b_fold_async_materialization_does_not_drain_upstream() {
2759 let release = StdArc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
2760 let started = StdArc::new(StdAtomicBool::new(false));
2761 let source = {
2762 let release = StdArc::clone(&release);
2763 let started = StdArc::clone(&started);
2764 Source::from_factory(move || {
2765 let release = StdArc::clone(&release);
2766 let started = StdArc::clone(&started);
2767 let mut emitted = false;
2768 Box::new(std::iter::from_fn(move || {
2769 if emitted {
2770 return None;
2771 }
2772 emitted = true;
2773 started.store(true, StdOrdering::SeqCst);
2774 let (released, available) = &*release;
2775 let mut released = released.lock().unwrap();
2776 while !*released {
2777 released = available.wait(released).unwrap();
2778 }
2779 Some(Ok(1))
2780 }))
2781 })
2782 };
2783
2784 let (materialized_tx, materialized_rx) = mpsc::channel();
2785 let join = thread::spawn(move || {
2786 let queue = source
2787 .fold_async(0, |acc, item| async move { Ok(acc + item) })
2788 .run_with(Sink::queue())
2789 .unwrap();
2790 materialized_tx.send(queue).unwrap();
2791 });
2792
2793 let queue = match materialized_rx.recv_timeout(StdDuration::from_secs(1)) {
2794 Ok(queue) => queue,
2795 Err(error) => {
2796 let (released, available) = &*release;
2797 *released.lock().unwrap() = true;
2798 available.notify_all();
2799 let _ = join.join();
2800 panic!("fold_async materialization did not return before first pull: {error}");
2801 }
2802 };
2803 let (released, _) = &*release;
2804 assert!(
2805 !*released.lock().unwrap(),
2806 "test source was released before materialization returned"
2807 );
2808
2809 let (released, available) = &*release;
2810 *released.lock().unwrap() = true;
2811 available.notify_all();
2812 assert_eq!(queue.pull().unwrap(), Some(1));
2813 assert_eq!(queue.pull().unwrap(), None);
2814 join.join().unwrap();
2815 assert!(started.load(StdOrdering::SeqCst));
2816 }
2817
2818 #[test]
2819 fn wp6b_lazy_sink_and_flow_wait_for_first_element() {
2820 let lazy_sink_created = StdArc::new(StdAtomicUsize::new(0));
2821 let lazy_sink_created_for_factory = StdArc::clone(&lazy_sink_created);
2822 let empty_sink = Source::<i32>::empty()
2823 .run_with(Sink::lazy_sink(move || {
2824 lazy_sink_created_for_factory.fetch_add(1, StdOrdering::SeqCst);
2825 Sink::ignore()
2826 }))
2827 .unwrap();
2828 assert!(matches!(empty_sink.wait(), Err(StreamError::Failed(_))));
2829 assert_eq!(lazy_sink_created.load(StdOrdering::SeqCst), 0);
2830
2831 let foreach_sum = StdArc::new(StdAtomicUsize::new(0));
2832 let foreach_sum_for_sink = StdArc::clone(&foreach_sum);
2833 Source::from_iter([1_usize, 2, 3])
2834 .run_with(Sink::foreach_async(2, move |item| {
2835 let foreach_sum = StdArc::clone(&foreach_sum_for_sink);
2836 async move {
2837 foreach_sum.fetch_add(item, StdOrdering::SeqCst);
2838 Ok(())
2839 }
2840 }))
2841 .unwrap()
2842 .wait()
2843 .unwrap();
2844 assert_eq!(foreach_sum.load(StdOrdering::SeqCst), 6);
2845
2846 let lazy_flow_created = StdArc::new(StdAtomicUsize::new(0));
2847 let lazy_flow_created_for_factory = StdArc::clone(&lazy_flow_created);
2848 let lazy_flow = Flow::<i32, i32>::lazy_flow(move || {
2849 lazy_flow_created_for_factory.fetch_add(1, StdOrdering::SeqCst);
2850 Flow::identity()
2851 .map(|item: i32| item + 10)
2852 .map_materialized_value(|_| 123)
2853 });
2854 let mat = (lazy_flow.materialize)().unwrap();
2855 let mut stream = match lazy_flow.transform {
2856 flow::FlowTransform::Runtime(transform) => {
2857 transform(Box::new([Ok(1), Ok(2)].into_iter()), &Materializer::new()).unwrap()
2858 }
2859 flow::FlowTransform::Pure(_) => panic!("lazy flow must be runtime-backed"),
2860 };
2861 assert_eq!(lazy_flow_created.load(StdOrdering::SeqCst), 0);
2862 assert_eq!(stream.next().unwrap().unwrap(), 11);
2863 assert_eq!(mat.wait().unwrap(), 123);
2864 assert_eq!(lazy_flow_created.load(StdOrdering::SeqCst), 1);
2865 assert_eq!(stream.next().unwrap().unwrap(), 12);
2866
2867 let future_flow = Source::from_iter([1, 2])
2868 .via_mat(
2869 Flow::future_flow(|| async {
2870 Ok(Flow::identity()
2871 .map(|item: i32| item * 2)
2872 .map_materialized_value(|_| 77))
2873 }),
2874 Keep::right,
2875 )
2876 .to_mat(Sink::collect(), Keep::both)
2877 .run()
2878 .unwrap();
2879 assert_eq!(future_flow.0.wait().unwrap(), 77);
2880 assert_eq!(future_flow.1.wait().unwrap(), vec![2, 4]);
2881 }
2882
2883 #[test]
2884 fn wp6b_lazy_flow_double_use_in_one_chain_pairs_instances_in_order() {
2885 for round in 0..50 {
2890 let counter = StdArc::new(StdAtomicUsize::new(1));
2891 let factory_counter = StdArc::clone(&counter);
2892 let lazy: Flow<usize, usize, _> = Flow::lazy_flow(move || {
2893 let id = factory_counter.fetch_add(1, StdOrdering::SeqCst);
2894 Flow::identity()
2895 .map(move |x: usize| x * 100 + id)
2896 .map_materialized_value(move |_| id)
2897 });
2898 let lazy_again = lazy.clone();
2899
2900 let ((first_mat, second_mat), out) = Source::from_iter([0usize])
2901 .via_mat(lazy, Keep::right)
2902 .via_mat(lazy_again, Keep::both)
2903 .to_mat(Sink::collect(), Keep::both)
2904 .run()
2905 .unwrap();
2906
2907 let first_id = first_mat.wait().unwrap();
2908 let second_id = second_mat.wait().unwrap();
2909 let element = out.wait().unwrap()[0];
2910 assert_eq!(
2911 element,
2912 first_id * 100 + second_id,
2913 "round {round}: mats ({first_id},{second_id}) cross-wired with transform order"
2914 );
2915 assert_ne!(
2916 first_id, second_id,
2917 "round {round}: same factory instance paired twice"
2918 );
2919 }
2920 }
2921
2922 #[test]
2923 fn wp6b_lazy_flow_clones_materialize_concurrently_without_cross_wiring() {
2924 for _ in 0..20 {
2925 let next_id = StdArc::new(StdAtomicUsize::new(0));
2926 let next_id_for_factory = StdArc::clone(&next_id);
2927 let flow = Flow::<i32, i32>::lazy_flow(move || {
2928 let id = next_id_for_factory.fetch_add(1, StdOrdering::SeqCst) + 1;
2929 Flow::identity()
2930 .map(move |item: i32| item + (id as i32 * 100))
2931 .map_materialized_value(move |_| id)
2932 });
2933 let barrier = StdArc::new(std::sync::Barrier::new(3));
2934
2935 let spawn_materialization = |input: i32| {
2936 let flow = flow.clone();
2937 let barrier = StdArc::clone(&barrier);
2938 thread::spawn(move || {
2939 barrier.wait();
2940 let (mat, values) = Source::single(input)
2941 .via_mat(flow, Keep::right)
2942 .to_mat(Sink::collect(), Keep::both)
2943 .run()
2944 .unwrap();
2945 (input, mat.wait().unwrap(), values.wait().unwrap())
2946 })
2947 };
2948
2949 let first = spawn_materialization(1);
2950 let second = spawn_materialization(2);
2951 barrier.wait();
2952
2953 for result in [first.join().unwrap(), second.join().unwrap()] {
2954 let (input, mat_id, values) = result;
2955 assert_eq!(values, vec![input + (mat_id as i32 * 100)]);
2956 }
2957 assert_eq!(next_id.load(StdOrdering::SeqCst), 2);
2958 }
2959 }
2960
2961 #[test]
2962 fn wp6b_map_with_resource_emits_close_item_before_terminal_error() {
2963 let queue = Source::from_factory(|| {
2964 Box::new(vec![Ok(1), Err(StreamError::Failed("upstream".into()))].into_iter())
2965 })
2966 .map_with_resource(
2967 || Ok(()),
2968 |_resource, item| Ok(item + 10),
2969 |_resource| Ok(Some(99)),
2970 )
2971 .run_with(Sink::queue())
2972 .unwrap();
2973
2974 assert_eq!(queue.pull().unwrap(), Some(11));
2975 assert_eq!(queue.pull().unwrap(), Some(99));
2976 assert_eq!(queue.pull(), Err(StreamError::Failed("upstream".into())));
2977
2978 let failed: StreamResult<Vec<i32>> = Source::single(1)
2979 .map_with_resource(
2980 || Ok(()),
2981 |_resource, _item| -> StreamResult<i32> { Err(StreamError::Failed("map".into())) },
2982 |_resource| -> StreamResult<Option<i32>> {
2983 Err(StreamError::Failed("close".into()))
2984 },
2985 )
2986 .run_collect();
2987 assert_eq!(failed, Err(StreamError::Failed("map".into())));
2988 }
2989
2990 #[test]
2991 fn stateful_and_terminal_source_operators_work() {
2992 let stateful = Source::from_iter([1, 2, 3])
2993 .stateful_map(0, |sum, item| {
2994 *sum += item;
2995 *sum
2996 })
2997 .run_collect()
2998 .unwrap();
2999 assert_eq!(stateful, vec![1, 3, 6]);
3000
3001 let concat = Source::from_iter([1, 2, 3])
3002 .stateful_map_concat(0, |sum, item| {
3003 *sum += item;
3004 [item, *sum]
3005 })
3006 .run_collect()
3007 .unwrap();
3008 assert_eq!(concat, vec![1, 1, 2, 3, 3, 6]);
3009
3010 assert_eq!(
3011 Source::from_iter([1, 2, 3])
3012 .fold(10, |acc, item| acc + item)
3013 .run_collect()
3014 .unwrap(),
3015 vec![16]
3016 );
3017 assert_eq!(
3018 Source::from_iter([1, 2, 3])
3019 .reduce(|acc, item| acc + item)
3020 .run_collect()
3021 .unwrap(),
3022 vec![6]
3023 );
3024 }
3025
3026 #[test]
3027 fn concat_and_sliding_emit_before_unbounded_upstream_finishes() {
3028 let concat = Source::single(())
3029 .map_concat(|_| 0_u64..)
3030 .take(1)
3031 .run_collect()
3032 .unwrap();
3033 assert_eq!(concat, vec![0]);
3034
3035 let sliding = Source::repeat(1_u64)
3036 .sliding(2, 1)
3037 .take(1)
3038 .run_collect()
3039 .unwrap();
3040 assert_eq!(sliding, vec![vec![1, 1]]);
3041 }
3042
3043 #[test]
3044 fn fan_in_source_operators_follow_ordering_rules() {
3045 assert_eq!(
3046 Source::from_iter([1, 2])
3047 .concat(Source::from_iter([3, 4]))
3048 .run_collect()
3049 .unwrap(),
3050 vec![1, 2, 3, 4]
3051 );
3052 assert_eq!(
3053 Source::from_iter([3, 4])
3054 .prepend(Source::from_iter([1, 2]))
3055 .run_collect()
3056 .unwrap(),
3057 vec![1, 2, 3, 4]
3058 );
3059 assert_eq!(
3060 Source::empty()
3061 .or_else(Source::from_iter([10, 20]))
3062 .run_collect()
3063 .unwrap(),
3064 vec![10, 20]
3065 );
3066 assert_eq!(
3067 Source::from_iter([1, 2])
3068 .or_else(Source::from_iter([10, 20]))
3069 .run_collect()
3070 .unwrap(),
3071 vec![1, 2]
3072 );
3073 assert_eq!(
3074 Source::from_iter([1, 2, 3])
3075 .interleave(Source::from_iter([10, 11, 12]), 2)
3076 .run_collect()
3077 .unwrap(),
3078 vec![1, 2, 10, 11, 3, 12]
3079 );
3080 }
3081
3082 #[test]
3083 fn fan_in_flow_operators_compose_with_primary_stream() {
3084 let concat = Source::from_iter([1, 2])
3085 .via(Flow::identity().concat(Source::from_iter([3, 4])))
3086 .run_collect()
3087 .unwrap();
3088 assert_eq!(concat, vec![1, 2, 3, 4]);
3089
3090 let prepend = Source::from_iter([3, 4])
3091 .via(Flow::identity().prepend(Source::from_iter([1, 2])))
3092 .run_collect()
3093 .unwrap();
3094 assert_eq!(prepend, vec![1, 2, 3, 4]);
3095
3096 let interleave = Source::from_iter([1, 2, 3])
3097 .via(Flow::identity().interleave(Source::from_iter([10, 11, 12]), 1))
3098 .run_collect()
3099 .unwrap();
3100 assert_eq!(interleave, vec![1, 10, 2, 11, 3, 12]);
3101
3102 let merge_sorted = Source::from_iter([1, 4])
3103 .via(Flow::identity().merge_sorted(Source::from_iter([2, 3, 5])))
3104 .run_collect()
3105 .unwrap();
3106 assert_eq!(merge_sorted, vec![1, 2, 3, 4, 5]);
3107
3108 let zip_latest = Source::from_iter([1, 2])
3109 .via(Flow::identity().zip_latest(Source::single(10)))
3110 .run_collect()
3111 .unwrap();
3112 assert_eq!(zip_latest, vec![(1, 10), (2, 10)]);
3113
3114 let zip_latest_with = Source::from_iter([1, 2])
3115 .via(
3116 Flow::identity()
3117 .zip_latest_with(Source::single(10), false, |left, right| left + right),
3118 )
3119 .run_collect()
3120 .unwrap();
3121 assert_eq!(zip_latest_with, vec![11, 12]);
3122 }
3123
3124 #[test]
3125 fn fan_in_operators_propagate_errors_and_eager_close() {
3126 assert!(matches!(
3127 Source::failed(StreamError::Failed("boom".into()))
3128 .or_else(Source::from_iter([1, 2]))
3129 .run_collect(),
3130 Err(StreamError::Failed(_))
3131 ));
3132 assert!(matches!(
3133 Source::from_iter([1, 2])
3134 .prepend(Source::failed(StreamError::Failed("boom".into())))
3135 .run_collect(),
3136 Err(StreamError::Failed(_))
3137 ));
3138 assert_eq!(
3139 Source::from_iter([1, 2])
3140 .interleave_all([Source::empty()], 1, true)
3141 .run_collect()
3142 .unwrap(),
3143 vec![1]
3144 );
3145 }
3146
3147 #[test]
3148 fn interleave_lazy_pulls_only_inputs_needed_for_first_segment() {
3149 use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
3150
3151 let pulls: Arc<[AtomicUsize; 3]> = Arc::new([
3152 AtomicUsize::new(0),
3153 AtomicUsize::new(0),
3154 AtomicUsize::new(0),
3155 ]);
3156
3157 let make_source = |idx: usize| {
3158 let pulls = Arc::clone(&pulls);
3159 Source::from_materialized_factory(move |_| {
3160 let pulls = Arc::clone(&pulls);
3161 let mut emitted = false;
3162 Ok((
3163 Box::new(std::iter::from_fn(move || {
3164 pulls[idx].fetch_add(1, Ordering::SeqCst);
3165 if !emitted && idx == 0 {
3166 emitted = true;
3167 Some(Ok(42))
3168 } else {
3169 None
3170 }
3171 })) as BoxStream<i32>,
3172 NotUsed,
3173 ))
3174 })
3175 };
3176
3177 let result = make_source(0)
3178 .interleave_all([make_source(1), make_source(2)], 1, false)
3179 .run_with(Sink::head());
3180
3181 assert_eq!(wait(result.unwrap()), 42);
3182 assert_eq!(pulls[0].load(Ordering::SeqCst), 1);
3183 assert_eq!(
3184 pulls[1].load(Ordering::SeqCst),
3185 0,
3186 "second input should not be pulled when downstream cancels after first element"
3187 );
3188 assert_eq!(
3189 pulls[2].load(Ordering::SeqCst),
3190 0,
3191 "third input should not be pulled before its turn"
3192 );
3193 }
3194
3195 #[test]
3196 fn interleave_non_eager_drains_remaining_when_one_input_completes() {
3197 assert_eq!(
3198 Source::from_iter([1, 2, 3, 4])
3199 .interleave_all(
3200 [Source::from_iter([10]), Source::from_iter([20, 21, 22])],
3201 1,
3202 false
3203 )
3204 .run_collect()
3205 .unwrap(),
3206 vec![1, 10, 20, 2, 21, 3, 22, 4]
3207 );
3208 }
3209
3210 #[test]
3211 fn remaining_merge_and_zip_family_matches_expected_ordering() {
3212 assert_eq!(
3213 Source::from_iter([1, 4])
3214 .merge_sorted(Source::from_iter([2, 3, 5]))
3215 .run_collect()
3216 .unwrap(),
3217 vec![1, 2, 3, 4, 5]
3218 );
3219
3220 assert_eq!(
3221 Source::from_iter([1, 2])
3222 .merge_latest(Source::single(10), false)
3223 .run_collect()
3224 .unwrap(),
3225 vec![vec![1, 10], vec![2, 10]]
3226 );
3227
3228 assert_eq!(
3229 Source::from_iter([1, 2, 3])
3230 .merge_all([Source::from_iter([10, 11])], false)
3231 .run_collect()
3232 .unwrap(),
3233 vec![1, 10, 2, 11, 3]
3234 );
3235
3236 assert_eq!(
3237 Source::from_iter([1, 2, 3])
3238 .zip_with(Source::from_iter([10, 11, 12]), |left, right| left + right)
3239 .run_collect()
3240 .unwrap(),
3241 vec![11, 13, 15]
3242 );
3243
3244 assert_eq!(
3245 Source::from_iter([1, 2])
3246 .zip_latest(Source::single(10))
3247 .run_collect()
3248 .unwrap(),
3249 vec![(1, 10), (2, 10)]
3250 );
3251
3252 assert_eq!(
3253 Source::from_iter([1, 2, 3])
3254 .zip_latest_with(Source::from_iter([10]), false, |left, right| left + right)
3255 .run_collect()
3256 .unwrap(),
3257 vec![11, 12, 13]
3258 );
3259
3260 assert_eq!(
3261 Source::from_iter([1, 2])
3262 .zip_all(Source::from_iter([10, 11, 12]), -1, -2)
3263 .run_collect()
3264 .unwrap(),
3265 vec![(1, 10), (2, 11), (-1, 12)]
3266 );
3267
3268 assert_eq!(
3269 Source::from_iter([5, 6, 7])
3270 .zip_with_index()
3271 .run_collect()
3272 .unwrap(),
3273 vec![(5, 0), (6, 1), (7, 2)]
3274 );
3275
3276 assert_eq!(
3277 Source::zip_n([Source::from_iter([1, 2]), Source::from_iter([10, 20])])
3278 .run_collect()
3279 .unwrap(),
3280 vec![vec![1, 10], vec![2, 20]]
3281 );
3282
3283 assert_eq!(
3284 Source::zip_with_n(
3285 [
3286 Source::from_iter([1, 2]),
3287 Source::from_iter([10, 20]),
3288 Source::from_iter([100, 200]),
3289 ],
3290 |values| values.into_iter().sum::<i32>(),
3291 )
3292 .run_collect()
3293 .unwrap(),
3294 vec![111, 222]
3295 );
3296
3297 assert_eq!(
3298 Source::merge_prioritized_n(
3299 [
3300 (Source::from_iter([1, 2, 3, 4]), 2),
3301 (Source::from_iter([10, 11]), 1),
3302 ],
3303 false,
3304 )
3305 .run_collect()
3306 .unwrap(),
3307 vec![1, 2, 10, 3, 4, 11]
3308 );
3309
3310 assert_eq!(
3311 Source::combine(
3312 Source::from_iter([1, 2, 3]),
3313 Source::from_iter([10, 11]),
3314 std::iter::empty::<Source<i32, NotUsed>>(),
3315 SourceCombineStrategy::Merge {
3316 eager_complete: false,
3317 },
3318 )
3319 .run_collect()
3320 .unwrap(),
3321 vec![1, 10, 2, 11, 3]
3322 );
3323
3324 let combined_sink = Sink::combine(
3325 Sink::ignore(),
3326 Sink::ignore(),
3327 std::iter::empty::<Sink<i32, NotUsed>>(),
3328 SinkCombineStrategy::Broadcast,
3329 );
3330 assert_eq!(
3331 Source::from_iter([1, 2, 3])
3332 .run_with(combined_sink)
3333 .unwrap(),
3334 NotUsed
3335 );
3336 }
3337
3338 #[test]
3339 fn sink_combine_broadcast_delivers_every_element_to_every_child() {
3340 let first_count = StdArc::new(StdAtomicUsize::new(0));
3344 let second_count = StdArc::new(StdAtomicUsize::new(0));
3345 let first_counter = StdArc::clone(&first_count);
3346 let second_counter = StdArc::clone(&second_count);
3347 let combined = Sink::combine(
3348 Sink::foreach(move |_: i32| {
3349 first_counter.fetch_add(1, StdOrdering::SeqCst);
3350 }),
3351 Sink::foreach(move |_: i32| {
3352 second_counter.fetch_add(1, StdOrdering::SeqCst);
3353 }),
3354 std::iter::empty::<Sink<i32, NotUsed>>(),
3355 SinkCombineStrategy::Broadcast,
3356 );
3357 assert_eq!(
3358 Source::from_iter(0..100).run_with(combined).unwrap(),
3359 NotUsed
3360 );
3361 assert!(wait_until(StdDuration::from_secs(1), || {
3366 first_count.load(StdOrdering::SeqCst) == 100
3367 && second_count.load(StdOrdering::SeqCst) == 100
3368 }));
3369 }
3370
3371 #[test]
3372 fn zip_latest_completes_when_one_side_finishes_without_emitting() {
3373 assert_eq!(
3377 Source::from_iter(std::iter::empty::<i32>())
3378 .zip_latest_with(Source::repeat(10), false, |left, right| left + right)
3379 .run_collect()
3380 .unwrap(),
3381 Vec::<i32>::new()
3382 );
3383 assert_eq!(
3384 Source::repeat(10)
3385 .zip_latest_with(
3386 Source::from_iter(std::iter::empty::<i32>()),
3387 false,
3388 |left, right| left + right,
3389 )
3390 .run_collect()
3391 .unwrap(),
3392 Vec::<i32>::new()
3393 );
3394 }
3395
3396 #[test]
3397 fn zip_family_completion_boundaries_match_expected_results() {
3398 assert_eq!(
3399 Source::from_iter([1, 2, 3])
3400 .zip_with(Source::from_iter([10]), |left, right| left + right)
3401 .run_collect()
3402 .unwrap(),
3403 vec![11]
3404 );
3405
3406 assert_eq!(
3407 Source::from_iter([1, 2, 3])
3408 .zip_latest_with(Source::from_iter([10]), true, |left, right| left + right)
3409 .run_collect()
3410 .unwrap(),
3411 vec![11, 12]
3412 );
3413
3414 assert_eq!(
3415 Source::zip_n([
3416 Source::from_iter([1, 2, 3]),
3417 Source::from_iter([10]),
3418 Source::from_iter([100, 200, 300]),
3419 ])
3420 .run_collect()
3421 .unwrap(),
3422 vec![vec![1, 10, 100]]
3423 );
3424 }
3425
3426 #[test]
3427 fn combine_strategies_follow_merge_concat_and_priority_rules() {
3428 assert_eq!(
3429 Source::combine(
3430 Source::from_iter([1, 2]),
3431 Source::from_iter([10, 11]),
3432 [Source::from_iter([100])],
3433 SourceCombineStrategy::Concat,
3434 )
3435 .run_collect()
3436 .unwrap(),
3437 vec![1, 2, 10, 11, 100]
3438 );
3439
3440 assert_eq!(
3441 Source::combine(
3442 Source::from_iter([1, 2, 3, 4]),
3443 Source::from_iter([10, 11]),
3444 std::iter::empty::<Source<i32, NotUsed>>(),
3445 SourceCombineStrategy::Prioritized {
3446 priorities: vec![2, 1],
3447 eager_complete: false,
3448 },
3449 )
3450 .run_collect()
3451 .unwrap(),
3452 vec![1, 2, 10, 3, 4, 11]
3453 );
3454 }
3455
3456 #[test]
3457 fn concat_lazy_defers_follow_on_source_until_needed() {
3458 let source_counter = StdArc::new(StdAtomicUsize::new(0));
3459 let source_counter_clone = StdArc::clone(&source_counter);
3460 let lazy_source = Source::from_materialized_factory(move |_| {
3461 source_counter_clone.fetch_add(1, StdOrdering::SeqCst);
3462 Ok((Box::new(std::iter::once(Ok(99))), NotUsed))
3463 });
3464 let source_head = Source::single(1)
3465 .concat_lazy(lazy_source)
3466 .run_with(Sink::head());
3467 assert_eq!(wait(source_head.unwrap()), 1);
3468 assert_eq!(source_counter.load(StdOrdering::SeqCst), 0);
3469
3470 let flow_counter = StdArc::new(StdAtomicUsize::new(0));
3471 let flow_counter_clone = StdArc::clone(&flow_counter);
3472 let lazy_flow_source = Source::from_materialized_factory(move |_| {
3473 flow_counter_clone.fetch_add(1, StdOrdering::SeqCst);
3474 Ok((Box::new(std::iter::once(Ok(99))), NotUsed))
3475 });
3476 let flow_head = Source::single(1)
3477 .via(Flow::identity().concat_lazy(lazy_flow_source))
3478 .run_with(Sink::head());
3479 assert_eq!(wait(flow_head.unwrap()), 1);
3480 assert_eq!(flow_counter.load(StdOrdering::SeqCst), 0);
3481 }
3482
3483 #[test]
3484 fn also_to_completes_when_side_sink_cancels() {
3485 assert_eq!(
3486 Source::from_iter([1, 2, 3])
3487 .also_to(Sink::cancelled())
3488 .run_collect()
3489 .unwrap(),
3490 Vec::<i32>::new()
3491 );
3492 assert_eq!(
3493 Source::from_iter([1, 2, 3])
3494 .also_to_all([Sink::cancelled(), Sink::cancelled()])
3495 .run_collect()
3496 .unwrap(),
3497 Vec::<i32>::new()
3498 );
3499 }
3500
3501 #[test]
3502 fn also_to_completes_gracefully_when_side_sink_disconnects() {
3503 let result = Source::from_iter(0..100)
3504 .also_to(Sink::head())
3505 .run_collect()
3506 .unwrap();
3507 assert!(!result.is_empty(), "main should emit at least one element");
3508 assert!(
3509 result.len() < 100,
3510 "main should complete early when side disconnects"
3511 );
3512 }
3513
3514 #[test]
3515 fn also_to_propagates_original_error_when_side_is_disconnected() {
3516 let err = StreamError::Failed("distinctive-boom".into());
3517 assert!(matches!(
3518 Source::<i32>::failed(err.clone())
3519 .also_to(Sink::cancelled())
3520 .run_collect(),
3521 Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3522 ));
3523 assert!(matches!(
3524 Source::<i32>::failed(err.clone())
3525 .also_to_all([Sink::cancelled()])
3526 .run_collect(),
3527 Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3528 ));
3529 assert!(matches!(
3530 Source::<i32>::failed(err)
3531 .divert_to(Sink::cancelled(), |_: &i32| true)
3532 .run_collect(),
3533 Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3534 ));
3535 }
3536
3537 #[test]
3538 fn divert_to_routes_matching_elements_to_side_sink() {
3539 let diverted = Source::from_iter([1, 2, 3, 4])
3540 .divert_to(Sink::ignore(), |item| item % 2 == 0)
3541 .run_collect()
3542 .unwrap();
3543 assert_eq!(diverted, vec![1, 3]);
3544 }
3545
3546 #[test]
3547 fn wire_tap_drops_when_side_sink_backpressures() {
3548 let tapped = Source::from_iter([1, 2, 3])
3549 .wire_tap(Sink::head())
3550 .run_collect()
3551 .unwrap();
3552 assert_eq!(tapped, vec![1, 2, 3]);
3553
3554 let tapped_via_flow = Source::from_iter([1, 2, 3])
3555 .via(Flow::identity().wire_tap(Sink::head()))
3556 .run_collect()
3557 .unwrap();
3558 assert_eq!(tapped_via_flow, vec![1, 2, 3]);
3559 }
3560
3561 #[test]
3562 fn async_mapping_variants_complete() {
3563 let ordered = Source::from_iter(0..4)
3564 .map_async(2, |item| async move { Ok(item * 2) })
3565 .run_collect()
3566 .unwrap();
3567 assert_eq!(ordered, vec![0, 2, 4, 6]);
3568
3569 let unordered = Source::from_iter(0..4)
3570 .map_async_unordered(2, |item| async move { Ok(item * 2) })
3571 .run_collect()
3572 .unwrap();
3573 assert_eq!(unordered, vec![0, 2, 4, 6]);
3574
3575 let partitioned = Source::from_iter(0..4)
3576 .map_async_partitioned(4, 1, |item| item % 2, |item| async move { Ok(item + 1) })
3577 .run_collect()
3578 .unwrap();
3579 assert_eq!(partitioned, vec![1, 2, 3, 4]);
3580 }
3581
3582 #[test]
3583 fn map_async_ordered_bounds_pulls_behind_stuck_head() {
3584 let pulls = StdArc::new(StdAtomicUsize::new(0));
3585 let pulls_for_source = StdArc::clone(&pulls);
3586 let probe = Source::from_fn_iter(move || {
3587 let pulls = StdArc::clone(&pulls_for_source);
3588 std::iter::from_fn(move || {
3589 let next = pulls.fetch_add(1, StdOrdering::SeqCst);
3590 Some(next)
3591 })
3592 })
3593 .map_async(2, |item| async move {
3594 if item == 0 {
3595 tokio::time::sleep(StdDuration::from_millis(300)).await;
3596 }
3597 Ok(item)
3598 })
3599 .run_with(TestSink::probe())
3600 .unwrap();
3601
3602 probe.request(16);
3603 thread::sleep(StdDuration::from_millis(100));
3604 assert!(
3605 pulls.load(StdOrdering::SeqCst) <= 3,
3606 "pulled {} elements with parallelism=2 behind a stuck ordered head",
3607 pulls.load(StdOrdering::SeqCst)
3608 );
3609 }
3610
3611 #[test]
3612 fn async_mapping_parks_until_woken_future_completes() {
3613 struct WakeOnceFuture {
3614 value: Option<u64>,
3615 ready: StdArc<StdAtomicBool>,
3616 started: bool,
3617 polls: StdArc<StdAtomicUsize>,
3618 latest_waker: StdArc<Mutex<Option<std::task::Waker>>>,
3619 }
3620
3621 impl std::future::Future for WakeOnceFuture {
3622 type Output = StreamResult<u64>;
3623
3624 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3625 let this = self.as_mut().get_mut();
3626 this.polls.fetch_add(1, StdOrdering::SeqCst);
3627 *this.latest_waker.lock().expect("latest wake slot mutex") =
3628 Some(cx.waker().clone());
3629 if this.ready.load(StdOrdering::SeqCst) {
3630 return Poll::Ready(Ok(this.value.take().unwrap()));
3631 }
3632
3633 if !this.started {
3634 this.started = true;
3635 let ready = StdArc::clone(&this.ready);
3636 let latest_waker = StdArc::clone(&this.latest_waker);
3637 thread::spawn(move || {
3638 thread::sleep(Duration::from_millis(20));
3639 ready.store(true, StdOrdering::SeqCst);
3640 if let Some(waker) =
3641 latest_waker.lock().expect("latest wake slot mutex").take()
3642 {
3643 waker.wake();
3644 }
3645 });
3646 }
3647 Poll::Pending
3648 }
3649 }
3650
3651 let polls = StdArc::new(StdAtomicUsize::new(0));
3652 let polls_for_stage = StdArc::clone(&polls);
3653 let start = Instant::now();
3654
3655 let values = Source::single(41)
3656 .map_async(1, move |item| WakeOnceFuture {
3657 value: Some(item + 1),
3658 ready: StdArc::new(StdAtomicBool::new(false)),
3659 started: false,
3660 polls: StdArc::clone(&polls_for_stage),
3661 latest_waker: StdArc::new(Mutex::new(None)),
3662 })
3663 .run_collect()
3664 .unwrap();
3665
3666 assert_eq!(values, vec![42]);
3667 let elapsed = start.elapsed();
3668 assert!(
3669 elapsed >= StdDuration::from_millis(15) && elapsed < StdDuration::from_millis(250),
3670 "pending future should park until woken once, elapsed={elapsed:?}"
3671 );
3672 assert!(
3673 polls.load(StdOrdering::SeqCst) < 4096,
3674 "pending future was repolled too aggressively"
3675 );
3676 }
3677
3678 #[test]
3679 fn async_mapping_emits_before_unbounded_upstream_finishes() {
3680 let ordered = Source::repeat(1)
3681 .map_async(2, |item| async move { Ok(item + 1) })
3682 .take(1)
3683 .run_collect()
3684 .unwrap();
3685 assert_eq!(ordered, vec![2]);
3686
3687 let unordered = Source::repeat(1)
3688 .map_async_unordered(2, |item| async move { Ok(item + 1) })
3689 .take(1)
3690 .run_collect()
3691 .unwrap();
3692 assert_eq!(unordered, vec![2]);
3693
3694 let partitioned = Source::repeat(1)
3695 .map_async_partitioned(2, 1, |_| 0_u8, |item| async move { Ok(item + 1) })
3696 .take(1)
3697 .run_collect()
3698 .unwrap();
3699 assert_eq!(partitioned, vec![2]);
3700 }
3701
3702 #[test]
3703 fn partitioned_async_mapping_limits_same_key_concurrency() {
3704 let active = StdArc::new(StdAtomicUsize::new(0));
3705 let max_active = StdArc::new(StdAtomicUsize::new(0));
3706 let active_for_stage = StdArc::clone(&active);
3707 let max_for_stage = StdArc::clone(&max_active);
3708
3709 let values = Source::from_iter(0..6)
3710 .map_async_partitioned(
3711 4,
3712 1,
3713 |_| 0_u8,
3714 move |item| {
3715 let active = StdArc::clone(&active_for_stage);
3716 let max_active = StdArc::clone(&max_for_stage);
3717 let current = active.fetch_add(1, StdOrdering::SeqCst) + 1;
3718 max_active.fetch_max(current, StdOrdering::SeqCst);
3719 async move {
3720 thread::sleep(Duration::from_millis(1));
3721 active.fetch_sub(1, StdOrdering::SeqCst);
3722 Ok(item)
3723 }
3724 },
3725 )
3726 .run_collect()
3727 .unwrap();
3728
3729 assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
3730 assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
3731 }
3732
3733 #[test]
3734 fn partitioned_async_mapping_scans_past_blocked_pending_key() {
3735 let active = StdArc::new(StdAtomicUsize::new(0));
3736 let max_active = StdArc::new(StdAtomicUsize::new(0));
3737 let active_for_stage = StdArc::clone(&active);
3738 let max_for_stage = StdArc::clone(&max_active);
3739 let (release_tx, release_rx) = oneshot::channel::<()>();
3740 let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
3741 let release_rx_for_stage = StdArc::clone(&release_rx);
3742 let max_for_release = StdArc::clone(&max_active);
3743
3744 let releaser = thread::spawn(move || {
3745 let deadline = Instant::now() + StdDuration::from_secs(1);
3746 while max_for_release.load(StdOrdering::SeqCst) < 2 && Instant::now() < deadline {
3747 thread::yield_now();
3748 }
3749 let _ = release_tx.send(());
3750 });
3751
3752 let values = Source::from_iter([0, 2, 1])
3753 .map_async_partitioned(
3754 2,
3755 1,
3756 |item| item % 2,
3757 move |item| {
3758 let active = StdArc::clone(&active_for_stage);
3759 let max_active = StdArc::clone(&max_for_stage);
3760 let release_rx = StdArc::clone(&release_rx_for_stage);
3761 let current = active.fetch_add(1, StdOrdering::SeqCst) + 1;
3762 max_active.fetch_max(current, StdOrdering::SeqCst);
3763 async move {
3764 if item == 0 {
3765 let receiver = release_rx
3766 .lock()
3767 .expect("release receiver mutex")
3768 .take()
3769 .expect("release receiver present");
3770 let _ = receiver.await;
3771 }
3772 active.fetch_sub(1, StdOrdering::SeqCst);
3773 Ok(item)
3774 }
3775 },
3776 )
3777 .run_collect()
3778 .unwrap();
3779 releaser.join().unwrap();
3780
3781 assert_eq!(values, vec![0, 2, 1]);
3782 assert_eq!(max_active.load(StdOrdering::SeqCst), 2);
3783 }
3784
3785 #[test]
3786 fn partitioned_async_mapping_p1_still_evaluates_partition() {
3787 let partitions = StdArc::new(StdAtomicUsize::new(0));
3788 let partitions_for_stage = StdArc::clone(&partitions);
3789
3790 let values = Source::from_iter(0..8)
3791 .map_async_partitioned(
3792 1,
3793 1,
3794 move |item| {
3795 partitions_for_stage.fetch_add(1, StdOrdering::SeqCst);
3796 item % 2
3797 },
3798 |item| async move { Ok(item + 1) },
3799 )
3800 .run_collect()
3801 .unwrap();
3802
3803 assert_eq!(values, (1..9).collect::<Vec<_>>());
3804 assert_eq!(partitions.load(StdOrdering::SeqCst), 8);
3805 }
3806
3807 #[test]
3808 fn partitioned_async_mapping_handles_many_keys_high_parallelism() {
3809 let active_by_key =
3810 StdArc::new((0..16).map(|_| StdAtomicUsize::new(0)).collect::<Vec<_>>());
3811 let max_by_key = StdArc::new((0..16).map(|_| StdAtomicUsize::new(0)).collect::<Vec<_>>());
3812 let active_for_stage = StdArc::clone(&active_by_key);
3813 let max_for_stage = StdArc::clone(&max_by_key);
3814
3815 let values = Source::from_iter(0..512_usize)
3816 .map_async_partitioned(
3817 32,
3818 1,
3819 |item| item % 16,
3820 move |item| {
3821 let active = StdArc::clone(&active_for_stage);
3822 let max_active = StdArc::clone(&max_for_stage);
3823 let key = item % 16;
3824 let current = active[key].fetch_add(1, StdOrdering::SeqCst) + 1;
3825 max_active[key].fetch_max(current, StdOrdering::SeqCst);
3826 async move {
3827 active[key].fetch_sub(1, StdOrdering::SeqCst);
3828 Ok(item)
3829 }
3830 },
3831 )
3832 .run_collect()
3833 .unwrap();
3834
3835 assert_eq!(values, (0..512).collect::<Vec<_>>());
3836 for max_active in max_by_key.iter() {
3837 assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
3838 }
3839 }
3840
3841 #[test]
3842 fn error_operators_map_recover_and_complete() {
3843 let mapped = Source::<i32>::failed(StreamError::Failed("boom".into()))
3844 .map_error(|_| StreamError::Failed("mapped".into()))
3845 .run_collect();
3846 assert_eq!(mapped, Err(StreamError::Failed("mapped".into())));
3847
3848 let recovered = Source::<i32>::failed(StreamError::Failed("boom".into()))
3849 .recover(|error| match error {
3850 StreamError::Failed(_) => Some(42),
3851 _ => None,
3852 })
3853 .run_collect()
3854 .unwrap();
3855 assert_eq!(recovered, vec![42]);
3856
3857 let unrecovered = Source::<i32>::failed(StreamError::Failed("original".into()))
3858 .recover(|_| None)
3859 .run_collect();
3860 assert_eq!(unrecovered, Err(StreamError::Failed("original".into())));
3861
3862 let recovered_with = Source::<i32>::failed(StreamError::Failed("boom".into()))
3863 .recover_with_retries(1, |_| Some(Source::from_iter([1, 2])))
3864 .run_collect()
3865 .unwrap();
3866 assert_eq!(recovered_with, vec![1, 2]);
3867
3868 let declined_recover_with = Source::<i32>::failed(StreamError::Failed("declined".into()))
3869 .recover_with_retries(1, |_| None)
3870 .run_collect();
3871 assert_eq!(
3872 declined_recover_with,
3873 Err(StreamError::Failed("declined".into()))
3874 );
3875
3876 let completed = Source::from_factory(|| {
3877 Box::new(vec![Ok(1), Err(StreamError::Failed("ignored".into())), Ok(2)].into_iter())
3878 })
3879 .on_error_complete()
3880 .run_collect()
3881 .unwrap();
3882 assert_eq!(completed, vec![1]);
3883 }
3884
3885 #[test]
3886 fn sliding_matches_akka_window_semantics() {
3887 assert_eq!(
3889 Source::from_iter(1..=4)
3890 .sliding(3, 1)
3891 .run_collect()
3892 .unwrap(),
3893 vec![vec![1, 2, 3], vec![2, 3, 4]]
3894 );
3895 assert_eq!(
3896 Source::from_iter(1..=4)
3897 .sliding(2, 1)
3898 .run_collect()
3899 .unwrap(),
3900 vec![vec![1, 2], vec![2, 3], vec![3, 4]]
3901 );
3902 assert_eq!(
3904 Source::from_iter(1..=3)
3905 .sliding(3, 1)
3906 .run_collect()
3907 .unwrap(),
3908 vec![vec![1, 2, 3]]
3909 );
3910 assert_eq!(
3912 Source::from_iter(1..=2)
3913 .sliding(3, 1)
3914 .run_collect()
3915 .unwrap(),
3916 vec![vec![1, 2]]
3917 );
3918 assert_eq!(
3920 Source::from_iter(1..=3)
3921 .sliding(1, 1)
3922 .run_collect()
3923 .unwrap(),
3924 vec![vec![1], vec![2], vec![3]]
3925 );
3926 assert_eq!(
3928 Source::from_iter(1..=6)
3929 .sliding(2, 3)
3930 .run_collect()
3931 .unwrap(),
3932 vec![vec![1, 2], vec![4, 5]]
3933 );
3934 assert_eq!(
3936 Source::from_iter(1..=3)
3937 .sliding(2, 4)
3938 .run_collect()
3939 .unwrap(),
3940 vec![vec![1, 2]]
3941 );
3942 }
3943
3944 #[test]
3945 fn recover_with_retries_indefinitely_like_akka() {
3946 let attempts = StdArc::new(StdAtomicUsize::new(0));
3947 let attempts_in_stage = StdArc::clone(&attempts);
3948 let recovered = Source::<i32>::failed(StreamError::Failed("boom".into()))
3951 .recover_with(move |_error| {
3952 if attempts_in_stage.fetch_add(1, StdOrdering::SeqCst) < 5 {
3953 Some(Source::<i32>::failed(StreamError::Failed("again".into())))
3954 } else {
3955 Some(Source::from_iter([42]))
3956 }
3957 })
3958 .run_collect()
3959 .unwrap();
3960 assert_eq!(recovered, vec![42]);
3961 assert_eq!(attempts.load(StdOrdering::SeqCst), 6);
3962 }
3963
3964 #[test]
3965 fn many_concurrent_streams_do_not_starve_the_pool() {
3966 let materializer = Materializer::new();
3975 let busy = 6_usize;
3976
3977 let mut held = Vec::with_capacity(busy);
3978 for _ in 0..busy {
3979 held.push(
3980 Source::single(1_u64)
3981 .run_with_materializer(Sink::never(), &materializer)
3982 .unwrap(),
3983 );
3984 }
3985
3986 for _ in 0..400 {
3987 if materializer.active_streams() >= busy {
3988 break;
3989 }
3990 thread::sleep(Duration::from_millis(5));
3991 }
3992 assert_eq!(materializer.active_streams(), busy);
3993
3994 let sum = Source::from_iter(0_u64..5)
3997 .run_with_materializer(Sink::fold(0_u64, |acc, item| acc + item), &materializer)
3998 .unwrap();
3999 assert_eq!(sum.wait().unwrap(), 10);
4000
4001 materializer.shutdown();
4002 for completion in held {
4003 assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
4004 }
4005 }
4006}