Skip to main content

datum/stream/
sink.rs

1use super::flow;
2use super::*;
3use crate::Attributes;
4use crate::stream::error::{decide_supervision, panic_stream_error};
5
6/// A materialized child of `Sink::combine`: the feed channel plus the child's
7/// materialized value. The mat is held (type-erased) until the parent runner
8/// finishes — dropping it early would trip `StreamCompletion`'s
9/// cancel-on-drop and silently cancel the child before it consumes anything.
10type CombinedSinkChild<In> = (
11    std::sync::mpsc::SyncSender<CombinedSinkMessage<In>>,
12    Box<dyn std::any::Any + Send>,
13);
14
15type CombinedSinkRunner<In> =
16    dyn Fn(&Materializer) -> StreamResult<CombinedSinkChild<In>> + Send + Sync;
17type DeferredSinkFactory<In, Mat> =
18    dyn Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync;
19
20enum CombinedSinkMessage<In> {
21    Item(StreamResult<In>),
22    Flush(std::sync::mpsc::SyncSender<()>),
23    Close,
24}
25
26const TERMINAL_CONSUMER_BATCH: usize = 64;
27
28pub struct Sink<In, Mat> {
29    runner: Arc<SinkRunner<In, Mat>>,
30    inline_runner: Option<Arc<SinkRunner<In, Mat>>>,
31    hinted_runner: Option<Arc<HintedSinkRunner<In, Mat>>>,
32    raw_hinted_runner: Option<Arc<HintedSinkRunner<In, Mat>>>,
33    attributes: Attributes,
34    deferred_factory: Option<Arc<DeferredSinkFactory<In, Mat>>>,
35    /// Set only for recognised fold/collect/ignore sinks. `None` for all other sinks.
36    pub(crate) fold_fp: Option<Arc<dyn FoldFastPathDyn<In>>>,
37}
38
39/// Non-generic helper for `Sink::map_materialized_value` — accepts an erased function so
40/// that only one monomorphization exists per `(In, Mat, NextMat)` triple. This prevents
41/// the deferred-factory recursion from producing an ever-growing closure type chain.
42fn map_mat_dyn<In, Mat, NextMat>(
43    sink: Sink<In, Mat>,
44    f: Arc<dyn Fn(Mat) -> NextMat + Send + Sync + 'static>,
45) -> Sink<In, NextMat>
46where
47    In: Send + 'static,
48    Mat: Send + 'static,
49    NextMat: Send + 'static,
50{
51    let Sink {
52        runner,
53        inline_runner,
54        hinted_runner,
55        raw_hinted_runner,
56        attributes,
57        deferred_factory,
58        fold_fp: _,
59    } = sink;
60    let mapped_runner = {
61        let f = Arc::clone(&f);
62        Arc::new(move |input, materializer: &Materializer| {
63            let mat = runner(input, materializer)?;
64            Ok(f(mat))
65        }) as Arc<SinkRunner<In, NextMat>>
66    };
67    let mapped_inline_runner = inline_runner.map(|ir| {
68        let f = Arc::clone(&f);
69        Arc::new(move |input, materializer: &Materializer| {
70            let result = ir(input, materializer)?;
71            Ok(f(result))
72        }) as Arc<SinkRunner<In, NextMat>>
73    });
74    let mapped_hinted_runner = hinted_runner.map(|hr| {
75        let f = Arc::clone(&f);
76        Arc::new(
77            move |input, materializer: &Materializer, hints: SourceRuntimeHints| {
78                let result = hr(input, materializer, hints)?;
79                Ok(f(result))
80            },
81        ) as Arc<HintedSinkRunner<In, NextMat>>
82    });
83    let mapped_raw_hinted_runner = raw_hinted_runner.map(|hr| {
84        let f = Arc::clone(&f);
85        Arc::new(
86            move |input, materializer: &Materializer, hints: SourceRuntimeHints| {
87                let result = hr(input, materializer, hints)?;
88                Ok(f(result))
89            },
90        ) as Arc<HintedSinkRunner<In, NextMat>>
91    });
92    let mapped_factory = deferred_factory.map(|factory| {
93        let f = Arc::clone(&f);
94        Arc::new(move |materializer: &Materializer, attrs: &Attributes| {
95            map_mat_dyn(factory(materializer, attrs), Arc::clone(&f))
96        }) as Arc<DeferredSinkFactory<In, NextMat>>
97    });
98    Sink {
99        runner: mapped_runner,
100        inline_runner: mapped_inline_runner,
101        hinted_runner: mapped_hinted_runner,
102        raw_hinted_runner: mapped_raw_hinted_runner,
103        attributes,
104        deferred_factory: mapped_factory,
105        fold_fp: None,
106    }
107}
108
109impl<In, Mat> Clone for Sink<In, Mat> {
110    fn clone(&self) -> Self {
111        Self {
112            runner: Arc::clone(&self.runner),
113            inline_runner: self.inline_runner.as_ref().map(Arc::clone),
114            hinted_runner: self.hinted_runner.as_ref().map(Arc::clone),
115            raw_hinted_runner: self.raw_hinted_runner.as_ref().map(Arc::clone),
116            attributes: self.attributes.clone(),
117            deferred_factory: self.deferred_factory.as_ref().map(Arc::clone),
118            fold_fp: self.fold_fp.as_ref().map(Arc::clone),
119        }
120    }
121}
122
123impl<In: Send + 'static, Mat: Send + 'static> Sink<In, Mat> {
124    pub(crate) fn from_runner<F>(runner: F) -> Self
125    where
126        F: Fn(BoxStream<In>, &Materializer) -> StreamResult<Mat> + Send + Sync + 'static,
127    {
128        Self::from_runner_parts(Arc::new(runner), None)
129    }
130
131    pub(crate) fn from_runner_parts(
132        runner: Arc<SinkRunner<In, Mat>>,
133        inline_runner: Option<Arc<SinkRunner<In, Mat>>>,
134    ) -> Self {
135        Self {
136            runner,
137            inline_runner,
138            hinted_runner: None,
139            raw_hinted_runner: None,
140            attributes: Attributes::default(),
141            deferred_factory: None,
142            fold_fp: None,
143        }
144    }
145
146    pub(crate) fn from_hinted_runner<F>(runner: F) -> Self
147    where
148        F: Fn(BoxStream<In>, &Materializer, SourceRuntimeHints) -> StreamResult<Mat>
149            + Send
150            + Sync
151            + 'static,
152    {
153        let hinted_runner: Arc<HintedSinkRunner<In, Mat>> = Arc::new(runner);
154        let fallback = {
155            let hinted_runner = Arc::clone(&hinted_runner);
156            Arc::new(move |input, materializer: &Materializer| {
157                hinted_runner(input, materializer, SourceRuntimeHints::default())
158            }) as Arc<SinkRunner<In, Mat>>
159        };
160        Self {
161            runner: fallback,
162            inline_runner: None,
163            hinted_runner: Some(hinted_runner),
164            raw_hinted_runner: None,
165            attributes: Attributes::default(),
166            deferred_factory: None,
167            fold_fp: None,
168        }
169    }
170
171    pub(crate) fn from_raw_hinted_runner<F>(runner: F) -> Self
172    where
173        F: Fn(BoxStream<In>, &Materializer, SourceRuntimeHints) -> StreamResult<Mat>
174            + Send
175            + Sync
176            + 'static,
177    {
178        let raw_hinted_runner: Arc<HintedSinkRunner<In, Mat>> = Arc::new(runner);
179        let fallback = {
180            let raw_hinted_runner = Arc::clone(&raw_hinted_runner);
181            Arc::new(move |input, materializer: &Materializer| {
182                let input =
183                    runtime_checked_stream(input, Arc::clone(&materializer.inner.state), None);
184                raw_hinted_runner(input, materializer, SourceRuntimeHints::default())
185            }) as Arc<SinkRunner<In, Mat>>
186        };
187        Self {
188            runner: fallback,
189            inline_runner: None,
190            hinted_runner: None,
191            raw_hinted_runner: Some(raw_hinted_runner),
192            attributes: Attributes::default(),
193            deferred_factory: None,
194            fold_fp: None,
195        }
196    }
197
198    pub(super) fn run(
199        &self,
200        input: BoxStream<In>,
201        materializer: &Materializer,
202    ) -> StreamResult<Mat> {
203        self.run_with_source_hints(input, materializer, SourceRuntimeHints::default())
204    }
205
206    pub(super) fn run_with_source_hints(
207        &self,
208        input: BoxStream<In>,
209        materializer: &Materializer,
210        hints: SourceRuntimeHints,
211    ) -> StreamResult<Mat> {
212        if let Some(factory) = &self.deferred_factory {
213            let attrs = materializer.effective_attributes(&self.attributes);
214            return factory(materializer, &attrs).run_with_source_hints(input, materializer, hints);
215        }
216        if let Some(hinted_runner) = &self.hinted_runner {
217            return hinted_runner(input, materializer, hints);
218        }
219        (self.runner)(input, materializer)
220    }
221
222    pub(super) fn run_from_source(
223        &self,
224        input: BoxStream<In>,
225        materializer: &Materializer,
226        hints: SourceRuntimeHints,
227    ) -> StreamResult<Mat> {
228        if let Some(factory) = &self.deferred_factory {
229            let attrs = materializer.effective_attributes(&self.attributes);
230            return factory(materializer, &attrs).run_from_source(input, materializer, hints);
231        }
232        if let Some(raw_hinted_runner) = &self.raw_hinted_runner {
233            return raw_hinted_runner(input, materializer, hints);
234        }
235        let input = runtime_checked_stream(input, Arc::clone(&materializer.inner.state), None);
236        self.run_with_source_hints(input, materializer, hints)
237    }
238
239    pub(super) fn can_inline(&self) -> bool {
240        self.inline_runner.is_some()
241    }
242
243    pub(super) fn run_inline(
244        &self,
245        input: BoxStream<In>,
246        materializer: &Materializer,
247    ) -> StreamResult<Mat> {
248        if let Some(factory) = &self.deferred_factory {
249            let attrs = materializer.effective_attributes(&self.attributes);
250            return factory(materializer, &attrs).run_inline(input, materializer);
251        }
252        (self
253            .inline_runner
254            .as_ref()
255            .expect("inline sink runner exists"))(input, materializer)
256    }
257
258    pub fn run_with<SourceMat: Send + 'static>(
259        self,
260        source: Source<In, SourceMat>,
261    ) -> StreamResult<SourceMat> {
262        source.to(self).run()
263    }
264
265    pub fn run_with_materializer<SourceMat: Send + 'static>(
266        self,
267        source: Source<In, SourceMat>,
268        materializer: &Materializer,
269    ) -> StreamResult<SourceMat> {
270        source.to(self).run_with_materializer(materializer)
271    }
272
273    #[must_use]
274    pub fn from_materializer<F>(factory: F) -> Self
275    where
276        F: Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync + 'static,
277    {
278        let factory = Arc::new(factory);
279        Self {
280            runner: Arc::new(|_input, _materializer| {
281                Err(StreamError::Failed(
282                    "deferred sink factory must be driven through Sink::run".into(),
283                ))
284            }),
285            inline_runner: None,
286            hinted_runner: None,
287            raw_hinted_runner: None,
288            attributes: Attributes::default(),
289            deferred_factory: Some(factory),
290            fold_fp: None,
291        }
292    }
293
294    #[must_use]
295    pub fn setup<F>(factory: F) -> Self
296    where
297        F: Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync + 'static,
298    {
299        Self::from_materializer(factory)
300    }
301
302    pub fn pre_materialize(
303        &self,
304        materializer: &Materializer,
305    ) -> StreamResult<(Mat, Sink<In, NotUsed>)> {
306        let (sender, receiver) = std::sync::mpsc::sync_channel(1);
307        let materialized = self.clone().run(
308            Box::new(std::iter::from_fn(move || receiver.recv().ok())),
309            materializer,
310        )?;
311        let sender = Arc::new(Mutex::new(Some(sender)));
312        let sink = Sink::from_runner(move |input, _materializer| {
313            let Some(sender) = sender
314                .lock()
315                .expect("pre-materialized sink poisoned")
316                .take()
317            else {
318                return Err(StreamError::Failed(
319                    "pre-materialized sink has already been materialized".into(),
320                ));
321            };
322            for item in input {
323                if sender.send(item).is_err() {
324                    break;
325                }
326            }
327            Ok(NotUsed)
328        });
329        Ok((materialized, sink.with_attributes(self.attributes.clone())))
330    }
331
332    #[must_use]
333    pub fn map_materialized_value<NextMat, F>(self, f: F) -> Sink<In, NextMat>
334    where
335        NextMat: Send + 'static,
336        F: Fn(Mat) -> NextMat + Send + Sync + 'static,
337    {
338        // Delegate to a non-generic helper that accepts `Arc<dyn Fn>`. This breaks
339        // the infinite monomorphization chain that Rust 1.96 rejects: the old inline
340        // implementation called `map_materialized_value` inside the deferred-factory
341        // closure, which produced a new instantiation for a strictly deeper closure type
342        // at each level (depth > 1024 triggered the recursion limit).
343        map_mat_dyn(self, Arc::new(f))
344    }
345
346    #[must_use]
347    pub fn attributes(&self) -> &Attributes {
348        &self.attributes
349    }
350
351    #[must_use]
352    pub fn with_attributes(mut self, attributes: Attributes) -> Self {
353        self.attributes = attributes;
354        self
355    }
356
357    #[must_use]
358    pub fn add_attributes(mut self, attributes: Attributes) -> Self {
359        self.attributes = self.attributes.and(attributes);
360        self
361    }
362
363    #[must_use]
364    pub fn named(self, name: impl Into<String>) -> Self {
365        self.add_attributes(Attributes::named(name))
366    }
367}
368
369#[derive(Clone)]
370pub struct RunnableGraph<Mat> {
371    pub(super) runner: Arc<RunnableGraphRunner<Mat>>,
372    attributes: Attributes,
373}
374
375#[derive(Debug, Clone, Copy, PartialEq, Eq)]
376pub enum SinkCombineStrategy {
377    Broadcast,
378    Balance,
379}
380
381impl<Mat: Send + 'static> RunnableGraph<Mat> {
382    pub(super) fn from_runner<F>(runner: F) -> Self
383    where
384        F: Fn(&Materializer) -> StreamResult<Mat> + Send + Sync + 'static,
385    {
386        Self {
387            runner: Arc::new(runner),
388            attributes: Attributes::default(),
389        }
390    }
391
392    pub fn run(&self) -> StreamResult<Mat> {
393        Materializer::new().materialize(self)
394    }
395
396    pub fn run_with_materializer(&self, materializer: &Materializer) -> StreamResult<Mat> {
397        materializer.materialize(self)
398    }
399
400    #[must_use]
401    pub fn map_materialized_value<Next, F>(self, f: F) -> RunnableGraph<Next>
402    where
403        Next: Send + 'static,
404        F: Fn(Mat) -> Next + Send + Sync + 'static,
405    {
406        let f = Arc::new(f);
407        RunnableGraph::from_runner(move |materializer| {
408            let mat = (self.runner)(materializer)?;
409            Ok(f(mat))
410        })
411    }
412
413    #[must_use]
414    pub fn attributes(&self) -> &Attributes {
415        &self.attributes
416    }
417
418    #[must_use]
419    pub fn with_attributes(mut self, attributes: Attributes) -> Self {
420        self.attributes = attributes;
421        self
422    }
423
424    #[must_use]
425    pub fn add_attributes(mut self, attributes: Attributes) -> Self {
426        self.attributes = self.attributes.and(attributes);
427        self
428    }
429
430    #[must_use]
431    pub fn named(self, name: impl Into<String>) -> Self {
432        self.add_attributes(Attributes::named(name))
433    }
434}
435
436impl<In: Clone + Send + 'static> Sink<In, NotUsed> {
437    #[must_use]
438    pub fn combine<M1, M2, MRest, I>(
439        first: Sink<In, M1>,
440        second: Sink<In, M2>,
441        rest: I,
442        strategy: SinkCombineStrategy,
443    ) -> Sink<In, NotUsed>
444    where
445        M1: Send + 'static,
446        M2: Send + 'static,
447        MRest: Send + 'static,
448        I: IntoIterator<Item = Sink<In, MRest>>,
449    {
450        let mut runners: Vec<Arc<CombinedSinkRunner<In>>> = vec![
451            Arc::new(move |materializer| {
452                let (sender, receiver) = std::sync::mpsc::sync_channel(0);
453                let mat = first.run(
454                    Box::new(std::iter::from_fn(move || {
455                        loop {
456                            match receiver.recv().ok()? {
457                                CombinedSinkMessage::Item(item) => return Some(item),
458                                CombinedSinkMessage::Flush(ack) => {
459                                    let _ = ack.send(());
460                                }
461                                CombinedSinkMessage::Close => return None,
462                            }
463                        }
464                    })),
465                    materializer,
466                )?;
467                Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
468            }),
469            Arc::new(move |materializer| {
470                let (sender, receiver) = std::sync::mpsc::sync_channel(0);
471                let mat = second.run(
472                    Box::new(std::iter::from_fn(move || {
473                        loop {
474                            match receiver.recv().ok()? {
475                                CombinedSinkMessage::Item(item) => return Some(item),
476                                CombinedSinkMessage::Flush(ack) => {
477                                    let _ = ack.send(());
478                                }
479                                CombinedSinkMessage::Close => return None,
480                            }
481                        }
482                    })),
483                    materializer,
484                )?;
485                Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
486            }),
487        ];
488        runners.extend(rest.into_iter().map(|sink| {
489            Arc::new(move |materializer: &Materializer| {
490                let (sender, receiver) = std::sync::mpsc::sync_channel(0);
491                let mat = sink.run(
492                    Box::new(std::iter::from_fn(move || {
493                        loop {
494                            match receiver.recv().ok()? {
495                                CombinedSinkMessage::Item(item) => return Some(item),
496                                CombinedSinkMessage::Flush(ack) => {
497                                    let _ = ack.send(());
498                                }
499                                CombinedSinkMessage::Close => return None,
500                            }
501                        }
502                    })),
503                    materializer,
504                )?;
505                Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
506            }) as Arc<CombinedSinkRunner<In>>
507        }));
508
509        Sink::from_runner(move |mut input: BoxStream<In>, materializer| {
510            // Each child's materialized value is held until the feed loop is
511            // done: dropping it would trip cancel-on-drop and kill the child.
512            let mut children = runners
513                .iter()
514                .map(|runner| runner(materializer))
515                .collect::<StreamResult<Vec<_>>>()?;
516            let mut next = 0usize;
517            for item in input.by_ref() {
518                match item {
519                    Ok(value) => match strategy {
520                        SinkCombineStrategy::Broadcast => {
521                            children.retain(|(sender, _)| {
522                                sender
523                                    .send(CombinedSinkMessage::Item(Ok(value.clone())))
524                                    .is_ok()
525                            });
526                            if children.is_empty() {
527                                break;
528                            }
529                        }
530                        SinkCombineStrategy::Balance => {
531                            while !children.is_empty() {
532                                let index = next % children.len();
533                                next = next.wrapping_add(1);
534                                match children[index]
535                                    .0
536                                    .send(CombinedSinkMessage::Item(Ok(value.clone())))
537                                {
538                                    Ok(()) => break,
539                                    Err(_) => {
540                                        children.remove(index);
541                                    }
542                                }
543                            }
544                            if children.is_empty() {
545                                break;
546                            }
547                        }
548                    },
549                    Err(error) => {
550                        for (sender, _) in &children {
551                            let _ = sender.send(CombinedSinkMessage::Item(Err(error.clone())));
552                        }
553                        return Err(error);
554                    }
555                }
556            }
557            // Wait for each child to request the next element after the final
558            // handoff so terminal side effects have definitely run before we
559            // release its materialized value.
560            for (sender, _) in &children {
561                let (ack_sender, ack_receiver) = std::sync::mpsc::sync_channel(0);
562                if sender.send(CombinedSinkMessage::Flush(ack_sender)).is_ok() {
563                    let _ = ack_receiver.recv();
564                }
565            }
566            // Then close every feed channel so each child observes end-of-stream
567            // and only afterwards release the held mats.
568            let mats: Vec<_> = children
569                .into_iter()
570                .map(|(sender, mat)| {
571                    let _ = sender.send(CombinedSinkMessage::Close);
572                    mat
573                })
574                .collect();
575            drop(mats);
576            Ok(NotUsed)
577        })
578    }
579}
580
581impl<In: Send + 'static, Mat: Send + 'static> Sink<In, StreamCompletion<Mat>> {
582    fn from_task_runner<F>(runner: F) -> Self
583    where
584        F: Fn(BoxStream<In>) -> StreamResult<Mat> + Send + Sync + 'static,
585    {
586        Self::from_task_runner_with_inline(runner, false)
587    }
588
589    fn from_raw_terminal_task_runner<F>(runner: F) -> Self
590    where
591        F: Fn(
592                BoxStream<In>,
593                Materializer,
594                Arc<AtomicBool>,
595                SourceRuntimeHints,
596            ) -> StreamResult<Mat>
597            + Send
598            + Sync
599            + 'static,
600    {
601        let runner = Arc::new(runner);
602        let async_runner = {
603            let runner = Arc::clone(&runner);
604            Arc::new(move |input, materializer: &Materializer| {
605                let runner = Arc::clone(&runner);
606                let worker_materializer =
607                    materializer.with_name_prefix(materializer.name_prefix().to_owned());
608                Ok(materializer.spawn_stream(move |cancelled| {
609                    runner(
610                        input,
611                        worker_materializer,
612                        cancelled,
613                        SourceRuntimeHints::default(),
614                    )
615                }))
616            }) as Arc<SinkRunner<In, StreamCompletion<Mat>>>
617        };
618        let raw_hinted_runner = {
619            let runner = Arc::clone(&runner);
620            Arc::new(
621                move |input, materializer: &Materializer, hints: SourceRuntimeHints| {
622                    let runner = Arc::clone(&runner);
623                    let worker_materializer =
624                        materializer.with_name_prefix(materializer.name_prefix().to_owned());
625                    Ok(materializer.spawn_stream(move |cancelled| {
626                        runner(input, worker_materializer, cancelled, hints)
627                    }))
628                },
629            ) as Arc<HintedSinkRunner<In, StreamCompletion<Mat>>>
630        };
631        Sink {
632            runner: async_runner,
633            inline_runner: None,
634            hinted_runner: None,
635            raw_hinted_runner: Some(raw_hinted_runner),
636            attributes: Attributes::default(),
637            deferred_factory: None,
638            fold_fp: None,
639        }
640    }
641
642    fn from_task_runner_with_inline<F>(runner: F, inline: bool) -> Self
643    where
644        F: Fn(BoxStream<In>) -> StreamResult<Mat> + Send + Sync + 'static,
645    {
646        let runner = Arc::new(runner);
647        let async_runner = {
648            let runner = Arc::clone(&runner);
649            Arc::new(move |input, materializer: &Materializer| {
650                let runner = Arc::clone(&runner);
651                let state = Arc::clone(&materializer.inner.state);
652                Ok(materializer.spawn_stream(move |cancelled| {
653                    runner(runtime_checked_stream(input, state, Some(cancelled)))
654                }))
655            }) as Arc<SinkRunner<In, StreamCompletion<Mat>>>
656        };
657        let inline_runner = inline.then(|| {
658            let runner = Arc::clone(&runner);
659            Arc::new(move |input, materializer: &Materializer| {
660                let runner = Arc::clone(&runner);
661                let state = Arc::clone(&materializer.inner.state);
662                Ok(materializer.spawn_stream_inline(move |cancelled| {
663                    runner(runtime_checked_stream(input, state, Some(cancelled)))
664                }))
665            }) as Arc<SinkRunner<In, StreamCompletion<Mat>>>
666        });
667        Sink::from_runner_parts(async_runner, inline_runner)
668    }
669}
670
671impl<In: Send + 'static> Sink<In, StreamCompletion<Vec<In>>> {
672    #[must_use]
673    pub fn collect() -> Self {
674        let task_runner =
675            Sink::from_raw_terminal_task_runner(|input, materializer, cancelled, hints| {
676                run_collect_terminal(input, materializer, cancelled, hints)
677            });
678        let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::CollectDescriptor::<In> {
679            _phantom: std::marker::PhantomData,
680        });
681        Sink {
682            runner: task_runner.runner,
683            inline_runner: task_runner.inline_runner,
684            hinted_runner: task_runner.hinted_runner,
685            raw_hinted_runner: task_runner.raw_hinted_runner,
686            attributes: task_runner.attributes,
687            deferred_factory: task_runner.deferred_factory,
688            fold_fp: Some(fp),
689        }
690    }
691
692    #[must_use]
693    pub fn collection() -> Self {
694        Self::collect()
695    }
696
697    #[must_use]
698    pub fn take_last(n: usize) -> Self {
699        Sink::from_task_runner(move |input| {
700            if n == 0 {
701                for item in input {
702                    let _ = item?;
703                }
704                return Ok(Vec::new());
705            }
706            let mut buffer = VecDeque::with_capacity(n);
707            for item in input {
708                let item = item?;
709                if buffer.len() == n {
710                    buffer.pop_front();
711                }
712                buffer.push_back(item);
713            }
714            Ok(buffer.into_iter().collect())
715        })
716    }
717}
718
719impl<In: Send + 'static> Sink<In, StreamCompletion<NotUsed>> {
720    #[must_use]
721    pub fn ignore() -> Self {
722        let task_runner =
723            Sink::from_raw_terminal_task_runner(|input, materializer, cancelled, hints| {
724                run_ignore_terminal(input, materializer, cancelled, hints)
725            });
726        let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::IgnoreDescriptor::<In> {
727            _phantom: std::marker::PhantomData,
728        });
729        Sink {
730            runner: task_runner.runner,
731            inline_runner: task_runner.inline_runner,
732            hinted_runner: task_runner.hinted_runner,
733            raw_hinted_runner: task_runner.raw_hinted_runner,
734            attributes: task_runner.attributes,
735            deferred_factory: task_runner.deferred_factory,
736            fold_fp: Some(fp),
737        }
738    }
739
740    #[must_use]
741    pub fn on_complete<F>(callback: F) -> Self
742    where
743        F: FnOnce() + Send + Sync + 'static,
744    {
745        let callback = Arc::new(Mutex::new(Some(callback)));
746        Sink::from_task_runner(move |input| {
747            for item in input {
748                item?;
749            }
750            if let Some(cb) = callback.lock().expect("on_complete poisoned").take() {
751                cb();
752            }
753            Ok(NotUsed)
754        })
755    }
756
757    #[must_use]
758    pub fn never() -> Self {
759        Sink::from_runner(|input, materializer| {
760            let state = Arc::clone(&materializer.inner.state);
761            let shutdown_state = Arc::clone(&state);
762            Ok(materializer.spawn_stream(move |cancelled| {
763                let input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
764                for item in input {
765                    item?;
766                }
767                loop {
768                    if shutdown_state.shutdown.load(Ordering::SeqCst) {
769                        return Err(StreamError::AbruptTermination);
770                    }
771                    if cancelled.load(Ordering::SeqCst) {
772                        return Err(StreamError::Cancelled);
773                    }
774                    thread::sleep(Duration::from_millis(1));
775                }
776            }))
777        })
778    }
779
780    #[must_use]
781    pub fn foreach<F>(f: F) -> Self
782    where
783        F: Fn(In) + Send + Sync + 'static,
784    {
785        Sink::from_task_runner(move |input| {
786            for item in input {
787                f(item?);
788            }
789            Ok(NotUsed)
790        })
791    }
792
793    #[must_use]
794    pub fn foreach_async<F, Fut>(parallelism: usize, f: F) -> Self
795    where
796        F: Fn(In) -> Fut + Send + Sync + 'static,
797        Fut: Future<Output = StreamResult<()>> + Send + 'static,
798    {
799        Flow::identity()
800            .map_async_unordered(parallelism, f)
801            .to_mat(Sink::ignore(), Keep::right)
802    }
803
804    #[must_use]
805    pub fn foreach_result<F>(f: F) -> Self
806    where
807        F: Fn(In) -> StreamResult<()> + Send + Sync + 'static,
808    {
809        Sink::from_task_runner(move |input| {
810            for item in input {
811                f(item?)?;
812            }
813            Ok(NotUsed)
814        })
815    }
816
817    #[must_use]
818    pub fn foreach_result_with_supervision<F>(f: F, decider: SupervisionDecider) -> Self
819    where
820        F: Fn(In) -> StreamResult<()> + Send + Sync + 'static,
821    {
822        Sink::from_task_runner(move |input| {
823            for item in input {
824                let item = item?;
825                match catch_unwind(AssertUnwindSafe(|| f(item)))
826                    .unwrap_or_else(|_| Err(panic_stream_error("foreach_result callback")))
827                {
828                    Ok(()) => {}
829                    Err(error) => match decide_supervision(&decider, &error) {
830                        SupervisionDirective::Stop => return Err(error),
831                        SupervisionDirective::Resume | SupervisionDirective::Restart => {}
832                    },
833                }
834            }
835            Ok(NotUsed)
836        })
837    }
838}
839
840impl<In: Send + 'static> Sink<In, StreamCompletion<In>> {
841    /// Materializes a sink that completes with the stream's first element, or
842    /// fails with [`StreamError::EmptyStream`] if the stream is empty.
843    ///
844    /// Fed by a synchronous bounded eager source (`Source::single`/`from_iter`/
845    /// `empty`/`failed`, optionally through inline-preserving synchronous flows
846    /// such as `map`/`filter`/`identity`), this terminal takes the inline head
847    /// fast path: the first element is produced on the calling thread *during*
848    /// materialization, without spawning a worker or a oneshot channel. As a
849    /// result `run_with_materializer()` blocks until that element is available and
850    /// the returned [`StreamCompletion`] is already resolved, so any work a caller
851    /// expected to overlap between `run_with_materializer()` returning and
852    /// `.wait()` is already done for these sources. Other sources — and any chain
853    /// that passes through a non-preserving operator, including `ActorFlow::ask`
854    /// (whose blocking cross-thread reply wait must not run inline on the caller) —
855    /// still drain on a runtime worker as before, keeping the returned
856    /// [`StreamCompletion`] non-blocking/awaitable.
857    #[must_use]
858    pub fn head() -> Self {
859        Sink::from_task_runner_with_inline(
860            |mut input| input.next().unwrap_or(Err(StreamError::EmptyStream)),
861            true,
862        )
863    }
864
865    #[must_use]
866    pub fn last() -> Self {
867        Sink::from_task_runner(|input| {
868            let mut last = None;
869            for item in input {
870                last = Some(item?);
871            }
872            last.ok_or(StreamError::EmptyStream)
873        })
874    }
875
876    #[must_use]
877    pub fn reduce<F>(f: F) -> Self
878    where
879        F: Fn(In, In) -> In + Send + Sync + 'static,
880    {
881        Sink::from_task_runner(move |mut input| {
882            let mut acc = input.next().unwrap_or(Err(StreamError::EmptyStream))?;
883            for item in input {
884                acc = f(acc, item?);
885            }
886            Ok(acc)
887        })
888    }
889
890    #[must_use]
891    pub fn reduce_result<F>(f: F) -> Self
892    where
893        F: Fn(In, In) -> StreamResult<In> + Send + Sync + 'static,
894    {
895        Sink::from_task_runner(move |mut input| {
896            let mut acc = input.next().unwrap_or(Err(StreamError::EmptyStream))?;
897            for item in input {
898                acc = f(acc, item?)?;
899            }
900            Ok(acc)
901        })
902    }
903
904    #[must_use]
905    pub fn reduce_result_with_supervision<F>(f: F, decider: SupervisionDecider) -> Self
906    where
907        In: Clone,
908        F: Fn(In, In) -> StreamResult<In> + Send + Sync + 'static,
909    {
910        Sink::from_task_runner(move |mut input: BoxStream<In>| {
911            let mut acc = Some(input.next().unwrap_or(Err(StreamError::EmptyStream))?);
912            for item in input {
913                let item = item?;
914                let Some(previous) = acc.take() else {
915                    acc = Some(item);
916                    continue;
917                };
918                match catch_unwind(AssertUnwindSafe(|| f(previous.clone(), item)))
919                    .unwrap_or_else(|_| Err(panic_stream_error("reduce_result callback")))
920                {
921                    Ok(next) => acc = Some(next),
922                    Err(error) => match decide_supervision(&decider, &error) {
923                        SupervisionDirective::Stop => return Err(error),
924                        SupervisionDirective::Resume => acc = Some(previous),
925                        SupervisionDirective::Restart => acc = None,
926                    },
927                }
928            }
929            acc.ok_or(StreamError::EmptyStream)
930        })
931    }
932}
933
934impl<In: Send + 'static> Sink<In, StreamCompletion<Option<In>>> {
935    /// Materializes a sink that completes with `Some(first element)`, or `None`
936    /// if the stream is empty.
937    ///
938    /// Like [`Sink::head`], fed by a synchronous bounded eager source
939    /// (`Source::single`/`from_iter`/`empty`/`failed`, optionally through
940    /// inline-preserving synchronous flows such as `map`/`filter`/`identity`) this
941    /// terminal takes the inline head fast path: the result is produced on the
942    /// calling thread *during* materialization without spawning a worker or a
943    /// oneshot channel, so `run_with_materializer()` blocks until it is available
944    /// and the returned [`StreamCompletion`] is already resolved. Any work a caller
945    /// expected to overlap between `run_with_materializer()` returning and
946    /// `.wait()` is already done for these sources; other sources — and any chain
947    /// through a non-preserving operator, including `ActorFlow::ask` (whose
948    /// blocking cross-thread reply wait must not run inline) — still drain on a
949    /// runtime worker, keeping the returned [`StreamCompletion`]
950    /// non-blocking/awaitable.
951    #[must_use]
952    pub fn head_option() -> Self {
953        Sink::from_task_runner_with_inline(
954            |mut input| match input.next() {
955                Some(Ok(item)) => Ok(Some(item)),
956                Some(Err(error)) => Err(error),
957                None => Ok(None),
958            },
959            true,
960        )
961    }
962
963    #[must_use]
964    pub fn last_option() -> Self {
965        Sink::from_task_runner(|input| {
966            let mut last = None;
967            for item in input {
968                last = Some(item?);
969            }
970            Ok(last)
971        })
972    }
973}
974
975impl<In: Send + 'static> Sink<In, NotUsed> {
976    #[must_use]
977    pub fn cancelled() -> Self {
978        Sink::from_runner(|_input, _materializer| Ok(NotUsed))
979    }
980
981    #[must_use]
982    pub fn future_sink<InnerMat, F, Fut>(future: F) -> Sink<In, StreamCompletion<InnerMat>>
983    where
984        InnerMat: Send + 'static,
985        F: Fn() -> Fut + Send + Sync + 'static,
986        Fut: Future<Output = StreamResult<Sink<In, InnerMat>>> + Send + 'static,
987    {
988        Self::lazy_future_sink(future)
989    }
990
991    #[must_use]
992    pub fn lazy_sink<InnerMat, F>(create: F) -> Sink<In, StreamCompletion<InnerMat>>
993    where
994        InnerMat: Send + 'static,
995        F: Fn() -> Sink<In, InnerMat> + Send + Sync + 'static,
996    {
997        let create = Arc::new(create);
998        Sink::from_runner(move |input, materializer| {
999            let create = Arc::clone(&create);
1000            let state = Arc::clone(&materializer.inner.state);
1001            let worker_materializer =
1002                materializer.with_name_prefix(materializer.name_prefix().to_owned());
1003            Ok(materializer.spawn_stream(move |cancelled| {
1004                let input = runtime_checked_stream(input, state, Some(cancelled));
1005                run_lazy_sink(input, &worker_materializer, move || {
1006                    catch_unwind_failed("lazy_sink factory", || create())
1007                })
1008            }))
1009        })
1010    }
1011
1012    #[must_use]
1013    pub fn lazy_future_sink<InnerMat, F, Fut>(create: F) -> Sink<In, StreamCompletion<InnerMat>>
1014    where
1015        InnerMat: Send + 'static,
1016        F: Fn() -> Fut + Send + Sync + 'static,
1017        Fut: Future<Output = StreamResult<Sink<In, InnerMat>>> + Send + 'static,
1018    {
1019        let create = Arc::new(create);
1020        Sink::from_runner(move |input, materializer| {
1021            let create = Arc::clone(&create);
1022            let state = Arc::clone(&materializer.inner.state);
1023            let worker_materializer =
1024                materializer.with_name_prefix(materializer.name_prefix().to_owned());
1025            Ok(materializer.spawn_stream(move |cancelled| {
1026                let input = runtime_checked_stream(input, state, Some(cancelled));
1027                run_lazy_sink(input, &worker_materializer, move || {
1028                    catch_unwind_failed("lazy_future_sink factory", || create())
1029                        .and_then(flow::run_future_inline_or_spawn)
1030                })
1031            }))
1032        })
1033    }
1034
1035    #[must_use]
1036    pub fn fold<Acc, F>(zero: Acc, f: F) -> Sink<In, StreamCompletion<Acc>>
1037    where
1038        Acc: Clone + Send + Sync + 'static,
1039        F: Fn(Acc, In) -> Acc + Send + Sync + 'static,
1040    {
1041        let f_arc = Arc::new(f);
1042        let zero_clone = zero.clone();
1043        let f_arc2 = Arc::clone(&f_arc);
1044        let task_runner = {
1045            let zero = zero;
1046            Sink::from_raw_terminal_task_runner(move |input, materializer, cancelled, hints| {
1047                run_fold_terminal(
1048                    input,
1049                    materializer,
1050                    cancelled,
1051                    hints,
1052                    zero.clone(),
1053                    f_arc.as_ref(),
1054                )
1055            })
1056        };
1057        let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::FoldDescriptor {
1058            zero: zero_clone,
1059            f: f_arc2,
1060        });
1061        Sink {
1062            runner: task_runner.runner,
1063            inline_runner: task_runner.inline_runner,
1064            hinted_runner: task_runner.hinted_runner,
1065            raw_hinted_runner: task_runner.raw_hinted_runner,
1066            attributes: task_runner.attributes,
1067            deferred_factory: task_runner.deferred_factory,
1068            fold_fp: Some(fp),
1069        }
1070    }
1071
1072    #[must_use]
1073    pub fn fold_result<Acc, F>(zero: Acc, f: F) -> Sink<In, StreamCompletion<Acc>>
1074    where
1075        Acc: Clone + Send + Sync + 'static,
1076        F: Fn(Acc, In) -> StreamResult<Acc> + Send + Sync + 'static,
1077    {
1078        let f_arc = Arc::new(f);
1079        let zero_clone = zero.clone();
1080        let f_arc2 = Arc::clone(&f_arc);
1081        let task_runner = {
1082            let zero = zero;
1083            Sink::from_raw_terminal_task_runner(move |input, materializer, cancelled, hints| {
1084                run_fold_result_terminal(
1085                    input,
1086                    materializer,
1087                    cancelled,
1088                    hints,
1089                    zero.clone(),
1090                    f_arc.as_ref(),
1091                )
1092            })
1093        };
1094        let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::FoldResultDescriptor {
1095            zero: zero_clone,
1096            f: f_arc2,
1097        });
1098        Sink {
1099            runner: task_runner.runner,
1100            inline_runner: task_runner.inline_runner,
1101            hinted_runner: task_runner.hinted_runner,
1102            raw_hinted_runner: task_runner.raw_hinted_runner,
1103            attributes: task_runner.attributes,
1104            deferred_factory: task_runner.deferred_factory,
1105            fold_fp: Some(fp),
1106        }
1107    }
1108
1109    #[must_use]
1110    pub fn fold_result_with_supervision<Acc, F>(
1111        zero: Acc,
1112        f: F,
1113        decider: SupervisionDecider,
1114    ) -> Sink<In, StreamCompletion<Acc>>
1115    where
1116        Acc: Clone + Send + Sync + 'static,
1117        F: Fn(Acc, In) -> StreamResult<Acc> + Send + Sync + 'static,
1118    {
1119        Sink::from_task_runner(move |input| {
1120            let mut acc = zero.clone();
1121            for item in input {
1122                let item = item?;
1123                let previous = acc;
1124                match catch_unwind(AssertUnwindSafe(|| f(previous.clone(), item)))
1125                    .unwrap_or_else(|_| Err(panic_stream_error("fold_result callback")))
1126                {
1127                    Ok(next) => acc = next,
1128                    Err(error) => match decide_supervision(&decider, &error) {
1129                        SupervisionDirective::Stop => return Err(error),
1130                        SupervisionDirective::Resume => acc = previous,
1131                        SupervisionDirective::Restart => acc = zero.clone(),
1132                    },
1133                }
1134            }
1135            Ok(acc)
1136        })
1137    }
1138}
1139
1140fn run_lazy_sink<In, InnerMat, F>(
1141    mut input: BoxStream<In>,
1142    materializer: &Materializer,
1143    create: F,
1144) -> StreamResult<InnerMat>
1145where
1146    In: Send + 'static,
1147    InnerMat: Send + 'static,
1148    F: FnOnce() -> StreamResult<Sink<In, InnerMat>>,
1149{
1150    let first = match input.next() {
1151        Some(Ok(item)) => item,
1152        Some(Err(error)) => return Err(error),
1153        None => {
1154            return Err(StreamError::Failed(
1155                "lazy sink was never materialized".into(),
1156            ));
1157        }
1158    };
1159    let sink = create()?;
1160    sink.run(prepend_first_stream(first, input), materializer)
1161}
1162
1163fn prepend_first_stream<In>(first: In, mut rest: BoxStream<In>) -> BoxStream<In>
1164where
1165    In: Send + 'static,
1166{
1167    let mut first = Some(first);
1168    Box::new(std::iter::from_fn(move || {
1169        if let Some(item) = first.take() {
1170            Some(Ok(item))
1171        } else {
1172            rest.next()
1173        }
1174    }))
1175}
1176
1177fn terminal_consumer_status(
1178    materializer: &Materializer,
1179    cancelled: &Arc<AtomicBool>,
1180) -> StreamResult<()> {
1181    if materializer.is_shutdown() {
1182        Err(StreamError::AbruptTermination)
1183    } else if cancelled.load(Ordering::SeqCst) {
1184        Err(StreamError::Cancelled)
1185    } else {
1186        Ok(())
1187    }
1188}
1189
1190fn run_collect_terminal<In: Send + 'static>(
1191    mut input: BoxStream<In>,
1192    materializer: Materializer,
1193    cancelled: Arc<AtomicBool>,
1194    hints: SourceRuntimeHints,
1195) -> StreamResult<Vec<In>> {
1196    if !hints.terminal_consumer_batch {
1197        let input = runtime_checked_stream(
1198            input,
1199            Arc::clone(&materializer.inner.state),
1200            Some(cancelled),
1201        );
1202        return input.collect();
1203    }
1204
1205    let mut items = Vec::with_capacity(hints.inline_micro_max_success_items.unwrap_or(0));
1206    loop {
1207        terminal_consumer_status(&materializer, &cancelled)?;
1208        {
1209            let _cancel_scope = set_current_stream_cancelled(&cancelled);
1210            for _ in 0..TERMINAL_CONSUMER_BATCH {
1211                match input.next() {
1212                    Some(Ok(item)) => items.push(item),
1213                    Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
1214                        Ok(()) => return Err(error),
1215                        Err(status) => return Err(status),
1216                    },
1217                    None => {
1218                        return terminal_consumer_status(&materializer, &cancelled).map(|()| items);
1219                    }
1220                }
1221            }
1222        }
1223    }
1224}
1225
1226fn run_ignore_terminal<In: Send + 'static>(
1227    input: BoxStream<In>,
1228    materializer: Materializer,
1229    cancelled: Arc<AtomicBool>,
1230    hints: SourceRuntimeHints,
1231) -> StreamResult<NotUsed> {
1232    if !hints.terminal_consumer_batch {
1233        let input = runtime_checked_stream(
1234            input,
1235            Arc::clone(&materializer.inner.state),
1236            Some(cancelled),
1237        );
1238        for item in input {
1239            item?;
1240        }
1241        return Ok(NotUsed);
1242    }
1243
1244    let mut input = input;
1245    loop {
1246        terminal_consumer_status(&materializer, &cancelled)?;
1247        {
1248            let _cancel_scope = set_current_stream_cancelled(&cancelled);
1249            for _ in 0..TERMINAL_CONSUMER_BATCH {
1250                match input.next() {
1251                    Some(Ok(_)) => {}
1252                    Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
1253                        Ok(()) => return Err(error),
1254                        Err(status) => return Err(status),
1255                    },
1256                    None => {
1257                        return terminal_consumer_status(&materializer, &cancelled)
1258                            .map(|()| NotUsed);
1259                    }
1260                }
1261            }
1262        }
1263    }
1264}
1265
1266fn run_fold_terminal<In, Acc, F>(
1267    input: BoxStream<In>,
1268    materializer: Materializer,
1269    cancelled: Arc<AtomicBool>,
1270    hints: SourceRuntimeHints,
1271    zero: Acc,
1272    f: &F,
1273) -> StreamResult<Acc>
1274where
1275    In: Send + 'static,
1276    Acc: Send + 'static,
1277    F: Fn(Acc, In) -> Acc,
1278{
1279    if !hints.terminal_consumer_batch {
1280        let input = runtime_checked_stream(
1281            input,
1282            Arc::clone(&materializer.inner.state),
1283            Some(cancelled),
1284        );
1285        let mut acc = zero;
1286        for item in input {
1287            acc = f(acc, item?);
1288        }
1289        return Ok(acc);
1290    }
1291
1292    let mut input = input;
1293    let mut acc = zero;
1294    loop {
1295        terminal_consumer_status(&materializer, &cancelled)?;
1296        {
1297            let _cancel_scope = set_current_stream_cancelled(&cancelled);
1298            for _ in 0..TERMINAL_CONSUMER_BATCH {
1299                match input.next() {
1300                    Some(Ok(item)) => acc = f(acc, item),
1301                    Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
1302                        Ok(()) => return Err(error),
1303                        Err(status) => return Err(status),
1304                    },
1305                    None => {
1306                        return terminal_consumer_status(&materializer, &cancelled).map(|()| acc);
1307                    }
1308                }
1309            }
1310        }
1311    }
1312}
1313
1314fn run_fold_result_terminal<In, Acc, F>(
1315    input: BoxStream<In>,
1316    materializer: Materializer,
1317    cancelled: Arc<AtomicBool>,
1318    hints: SourceRuntimeHints,
1319    zero: Acc,
1320    f: &F,
1321) -> StreamResult<Acc>
1322where
1323    In: Send + 'static,
1324    Acc: Send + 'static,
1325    F: Fn(Acc, In) -> StreamResult<Acc>,
1326{
1327    if !hints.terminal_consumer_batch {
1328        let input = runtime_checked_stream(
1329            input,
1330            Arc::clone(&materializer.inner.state),
1331            Some(cancelled),
1332        );
1333        let mut acc = zero;
1334        for item in input {
1335            acc = f(acc, item?)?;
1336        }
1337        return Ok(acc);
1338    }
1339
1340    let mut input = input;
1341    let mut acc = Some(zero);
1342    loop {
1343        terminal_consumer_status(&materializer, &cancelled)?;
1344        {
1345            let _cancel_scope = set_current_stream_cancelled(&cancelled);
1346            for _ in 0..TERMINAL_CONSUMER_BATCH {
1347                match input.next() {
1348                    Some(Ok(item)) => {
1349                        let previous = acc.take().expect("fold accumulator present");
1350                        match f(previous, item) {
1351                            Ok(next) => acc = Some(next),
1352                            Err(error) => {
1353                                return match terminal_consumer_status(&materializer, &cancelled) {
1354                                    Ok(()) => Err(error),
1355                                    Err(status) => Err(status),
1356                                };
1357                            }
1358                        }
1359                    }
1360                    Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
1361                        Ok(()) => return Err(error),
1362                        Err(status) => return Err(status),
1363                    },
1364                    None => {
1365                        return terminal_consumer_status(&materializer, &cancelled)
1366                            .map(|()| acc.expect("fold accumulator present"));
1367                    }
1368                }
1369            }
1370        }
1371    }
1372}
1373
1374#[cfg(test)]
1375mod tests {
1376    use super::*;
1377    use crate::{Source, StreamCompletion};
1378    use std::time::Instant;
1379
1380    fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
1381        let deadline = Instant::now() + timeout;
1382        while Instant::now() < deadline {
1383            if condition() {
1384                return true;
1385            }
1386            thread::sleep(Duration::from_millis(5));
1387        }
1388        condition()
1389    }
1390
1391    #[test]
1392    fn map_materialized_value_on_deferred_sink_does_not_explode() {
1393        // Sink::setup sets deferred_factory: Some(_) — the branch that triggered the
1394        // monomorphization cascade on Rust 1.96 (infinite closure-type nesting).
1395        let sink = Sink::<u64, _>::setup(|_, _| Sink::fold(0u64, |acc, x| acc + x));
1396        // Two nested map_materialized_value calls mirror the failing bench/bin pattern.
1397        // Without the fix each call produced a new instantiation wrapping the previous
1398        // closure type, creating an infinite chain the compiler could not terminate.
1399        let sink = sink
1400            .map_materialized_value(|sc: StreamCompletion<u64>| sc)
1401            .map_materialized_value(|sc| sc);
1402
1403        let sum = Source::from_iter(1u64..=3)
1404            .run_with(sink)
1405            .unwrap()
1406            .wait()
1407            .unwrap();
1408        assert_eq!(sum, 6u64);
1409    }
1410
1411    #[test]
1412    fn batched_terminal_fold_observes_completion_drop_cancellation() {
1413        let materializer = Materializer::new();
1414        let completion = Source::repeat(1_u64)
1415            .run_with_materializer(Sink::fold(0_u64, |acc, item| acc + item), &materializer)
1416            .expect("fold terminal materializes");
1417
1418        assert!(wait_until(Duration::from_secs(1), || {
1419            materializer.active_streams() == 1
1420        }));
1421        drop(completion);
1422
1423        assert!(wait_until(Duration::from_secs(5), || {
1424            materializer.active_streams() == 0
1425        }));
1426    }
1427}