1use super::flow;
2use super::*;
3use crate::Attributes;
4use crate::stream::error::{decide_supervision, panic_stream_error};
5
6type CombinedSinkChild<In> = (
11 std::sync::mpsc::SyncSender<CombinedSinkMessage<In>>,
12 Box<dyn std::any::Any + Send>,
13);
14
15type CombinedSinkRunner<In> =
16 dyn Fn(&Materializer) -> StreamResult<CombinedSinkChild<In>> + Send + Sync;
17type DeferredSinkFactory<In, Mat> =
18 dyn Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync;
19
20enum CombinedSinkMessage<In> {
21 Item(StreamResult<In>),
22 Flush(std::sync::mpsc::SyncSender<()>),
23 Close,
24}
25
26const TERMINAL_CONSUMER_BATCH: usize = 64;
27
28pub struct Sink<In, Mat> {
29 runner: Arc<SinkRunner<In, Mat>>,
30 inline_runner: Option<Arc<SinkRunner<In, Mat>>>,
31 hinted_runner: Option<Arc<HintedSinkRunner<In, Mat>>>,
32 raw_hinted_runner: Option<Arc<HintedSinkRunner<In, Mat>>>,
33 attributes: Attributes,
34 deferred_factory: Option<Arc<DeferredSinkFactory<In, Mat>>>,
35 pub(crate) fold_fp: Option<Arc<dyn FoldFastPathDyn<In>>>,
37}
38
39fn map_mat_dyn<In, Mat, NextMat>(
43 sink: Sink<In, Mat>,
44 f: Arc<dyn Fn(Mat) -> NextMat + Send + Sync + 'static>,
45) -> Sink<In, NextMat>
46where
47 In: Send + 'static,
48 Mat: Send + 'static,
49 NextMat: Send + 'static,
50{
51 let Sink {
52 runner,
53 inline_runner,
54 hinted_runner,
55 raw_hinted_runner,
56 attributes,
57 deferred_factory,
58 fold_fp: _,
59 } = sink;
60 let mapped_runner = {
61 let f = Arc::clone(&f);
62 Arc::new(move |input, materializer: &Materializer| {
63 let mat = runner(input, materializer)?;
64 Ok(f(mat))
65 }) as Arc<SinkRunner<In, NextMat>>
66 };
67 let mapped_inline_runner = inline_runner.map(|ir| {
68 let f = Arc::clone(&f);
69 Arc::new(move |input, materializer: &Materializer| {
70 let result = ir(input, materializer)?;
71 Ok(f(result))
72 }) as Arc<SinkRunner<In, NextMat>>
73 });
74 let mapped_hinted_runner = hinted_runner.map(|hr| {
75 let f = Arc::clone(&f);
76 Arc::new(
77 move |input, materializer: &Materializer, hints: SourceRuntimeHints| {
78 let result = hr(input, materializer, hints)?;
79 Ok(f(result))
80 },
81 ) as Arc<HintedSinkRunner<In, NextMat>>
82 });
83 let mapped_raw_hinted_runner = raw_hinted_runner.map(|hr| {
84 let f = Arc::clone(&f);
85 Arc::new(
86 move |input, materializer: &Materializer, hints: SourceRuntimeHints| {
87 let result = hr(input, materializer, hints)?;
88 Ok(f(result))
89 },
90 ) as Arc<HintedSinkRunner<In, NextMat>>
91 });
92 let mapped_factory = deferred_factory.map(|factory| {
93 let f = Arc::clone(&f);
94 Arc::new(move |materializer: &Materializer, attrs: &Attributes| {
95 map_mat_dyn(factory(materializer, attrs), Arc::clone(&f))
96 }) as Arc<DeferredSinkFactory<In, NextMat>>
97 });
98 Sink {
99 runner: mapped_runner,
100 inline_runner: mapped_inline_runner,
101 hinted_runner: mapped_hinted_runner,
102 raw_hinted_runner: mapped_raw_hinted_runner,
103 attributes,
104 deferred_factory: mapped_factory,
105 fold_fp: None,
106 }
107}
108
109impl<In, Mat> Clone for Sink<In, Mat> {
110 fn clone(&self) -> Self {
111 Self {
112 runner: Arc::clone(&self.runner),
113 inline_runner: self.inline_runner.as_ref().map(Arc::clone),
114 hinted_runner: self.hinted_runner.as_ref().map(Arc::clone),
115 raw_hinted_runner: self.raw_hinted_runner.as_ref().map(Arc::clone),
116 attributes: self.attributes.clone(),
117 deferred_factory: self.deferred_factory.as_ref().map(Arc::clone),
118 fold_fp: self.fold_fp.as_ref().map(Arc::clone),
119 }
120 }
121}
122
123impl<In: Send + 'static, Mat: Send + 'static> Sink<In, Mat> {
124 pub(crate) fn from_runner<F>(runner: F) -> Self
125 where
126 F: Fn(BoxStream<In>, &Materializer) -> StreamResult<Mat> + Send + Sync + 'static,
127 {
128 Self::from_runner_parts(Arc::new(runner), None)
129 }
130
131 pub(crate) fn from_runner_parts(
132 runner: Arc<SinkRunner<In, Mat>>,
133 inline_runner: Option<Arc<SinkRunner<In, Mat>>>,
134 ) -> Self {
135 Self {
136 runner,
137 inline_runner,
138 hinted_runner: None,
139 raw_hinted_runner: None,
140 attributes: Attributes::default(),
141 deferred_factory: None,
142 fold_fp: None,
143 }
144 }
145
146 pub(crate) fn from_hinted_runner<F>(runner: F) -> Self
147 where
148 F: Fn(BoxStream<In>, &Materializer, SourceRuntimeHints) -> StreamResult<Mat>
149 + Send
150 + Sync
151 + 'static,
152 {
153 let hinted_runner: Arc<HintedSinkRunner<In, Mat>> = Arc::new(runner);
154 let fallback = {
155 let hinted_runner = Arc::clone(&hinted_runner);
156 Arc::new(move |input, materializer: &Materializer| {
157 hinted_runner(input, materializer, SourceRuntimeHints::default())
158 }) as Arc<SinkRunner<In, Mat>>
159 };
160 Self {
161 runner: fallback,
162 inline_runner: None,
163 hinted_runner: Some(hinted_runner),
164 raw_hinted_runner: None,
165 attributes: Attributes::default(),
166 deferred_factory: None,
167 fold_fp: None,
168 }
169 }
170
171 pub(crate) fn from_raw_hinted_runner<F>(runner: F) -> Self
172 where
173 F: Fn(BoxStream<In>, &Materializer, SourceRuntimeHints) -> StreamResult<Mat>
174 + Send
175 + Sync
176 + 'static,
177 {
178 let raw_hinted_runner: Arc<HintedSinkRunner<In, Mat>> = Arc::new(runner);
179 let fallback = {
180 let raw_hinted_runner = Arc::clone(&raw_hinted_runner);
181 Arc::new(move |input, materializer: &Materializer| {
182 let input =
183 runtime_checked_stream(input, Arc::clone(&materializer.inner.state), None);
184 raw_hinted_runner(input, materializer, SourceRuntimeHints::default())
185 }) as Arc<SinkRunner<In, Mat>>
186 };
187 Self {
188 runner: fallback,
189 inline_runner: None,
190 hinted_runner: None,
191 raw_hinted_runner: Some(raw_hinted_runner),
192 attributes: Attributes::default(),
193 deferred_factory: None,
194 fold_fp: None,
195 }
196 }
197
198 pub(super) fn run(
199 &self,
200 input: BoxStream<In>,
201 materializer: &Materializer,
202 ) -> StreamResult<Mat> {
203 self.run_with_source_hints(input, materializer, SourceRuntimeHints::default())
204 }
205
206 pub(super) fn run_with_source_hints(
207 &self,
208 input: BoxStream<In>,
209 materializer: &Materializer,
210 hints: SourceRuntimeHints,
211 ) -> StreamResult<Mat> {
212 if let Some(factory) = &self.deferred_factory {
213 let attrs = materializer.effective_attributes(&self.attributes);
214 return factory(materializer, &attrs).run_with_source_hints(input, materializer, hints);
215 }
216 if let Some(hinted_runner) = &self.hinted_runner {
217 return hinted_runner(input, materializer, hints);
218 }
219 (self.runner)(input, materializer)
220 }
221
222 pub(super) fn run_from_source(
223 &self,
224 input: BoxStream<In>,
225 materializer: &Materializer,
226 hints: SourceRuntimeHints,
227 ) -> StreamResult<Mat> {
228 if let Some(factory) = &self.deferred_factory {
229 let attrs = materializer.effective_attributes(&self.attributes);
230 return factory(materializer, &attrs).run_from_source(input, materializer, hints);
231 }
232 if let Some(raw_hinted_runner) = &self.raw_hinted_runner {
233 return raw_hinted_runner(input, materializer, hints);
234 }
235 let input = runtime_checked_stream(input, Arc::clone(&materializer.inner.state), None);
236 self.run_with_source_hints(input, materializer, hints)
237 }
238
239 pub(super) fn can_inline(&self) -> bool {
240 self.inline_runner.is_some()
241 }
242
243 pub(super) fn run_inline(
244 &self,
245 input: BoxStream<In>,
246 materializer: &Materializer,
247 ) -> StreamResult<Mat> {
248 if let Some(factory) = &self.deferred_factory {
249 let attrs = materializer.effective_attributes(&self.attributes);
250 return factory(materializer, &attrs).run_inline(input, materializer);
251 }
252 (self
253 .inline_runner
254 .as_ref()
255 .expect("inline sink runner exists"))(input, materializer)
256 }
257
258 pub fn run_with<SourceMat: Send + 'static>(
259 self,
260 source: Source<In, SourceMat>,
261 ) -> StreamResult<SourceMat> {
262 source.to(self).run()
263 }
264
265 pub fn run_with_materializer<SourceMat: Send + 'static>(
266 self,
267 source: Source<In, SourceMat>,
268 materializer: &Materializer,
269 ) -> StreamResult<SourceMat> {
270 source.to(self).run_with_materializer(materializer)
271 }
272
273 #[must_use]
274 pub fn from_materializer<F>(factory: F) -> Self
275 where
276 F: Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync + 'static,
277 {
278 let factory = Arc::new(factory);
279 Self {
280 runner: Arc::new(|_input, _materializer| {
281 Err(StreamError::Failed(
282 "deferred sink factory must be driven through Sink::run".into(),
283 ))
284 }),
285 inline_runner: None,
286 hinted_runner: None,
287 raw_hinted_runner: None,
288 attributes: Attributes::default(),
289 deferred_factory: Some(factory),
290 fold_fp: None,
291 }
292 }
293
294 #[must_use]
295 pub fn setup<F>(factory: F) -> Self
296 where
297 F: Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync + 'static,
298 {
299 Self::from_materializer(factory)
300 }
301
302 pub fn pre_materialize(
303 &self,
304 materializer: &Materializer,
305 ) -> StreamResult<(Mat, Sink<In, NotUsed>)> {
306 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
307 let materialized = self.clone().run(
308 Box::new(std::iter::from_fn(move || receiver.recv().ok())),
309 materializer,
310 )?;
311 let sender = Arc::new(Mutex::new(Some(sender)));
312 let sink = Sink::from_runner(move |input, _materializer| {
313 let Some(sender) = sender
314 .lock()
315 .expect("pre-materialized sink poisoned")
316 .take()
317 else {
318 return Err(StreamError::Failed(
319 "pre-materialized sink has already been materialized".into(),
320 ));
321 };
322 for item in input {
323 if sender.send(item).is_err() {
324 break;
325 }
326 }
327 Ok(NotUsed)
328 });
329 Ok((materialized, sink.with_attributes(self.attributes.clone())))
330 }
331
332 #[must_use]
333 pub fn map_materialized_value<NextMat, F>(self, f: F) -> Sink<In, NextMat>
334 where
335 NextMat: Send + 'static,
336 F: Fn(Mat) -> NextMat + Send + Sync + 'static,
337 {
338 map_mat_dyn(self, Arc::new(f))
344 }
345
346 #[must_use]
347 pub fn attributes(&self) -> &Attributes {
348 &self.attributes
349 }
350
351 #[must_use]
352 pub fn with_attributes(mut self, attributes: Attributes) -> Self {
353 self.attributes = attributes;
354 self
355 }
356
357 #[must_use]
358 pub fn add_attributes(mut self, attributes: Attributes) -> Self {
359 self.attributes = self.attributes.and(attributes);
360 self
361 }
362
363 #[must_use]
364 pub fn named(self, name: impl Into<String>) -> Self {
365 self.add_attributes(Attributes::named(name))
366 }
367}
368
369#[derive(Clone)]
370pub struct RunnableGraph<Mat> {
371 pub(super) runner: Arc<RunnableGraphRunner<Mat>>,
372 attributes: Attributes,
373}
374
375#[derive(Debug, Clone, Copy, PartialEq, Eq)]
376pub enum SinkCombineStrategy {
377 Broadcast,
378 Balance,
379}
380
381impl<Mat: Send + 'static> RunnableGraph<Mat> {
382 pub(super) fn from_runner<F>(runner: F) -> Self
383 where
384 F: Fn(&Materializer) -> StreamResult<Mat> + Send + Sync + 'static,
385 {
386 Self {
387 runner: Arc::new(runner),
388 attributes: Attributes::default(),
389 }
390 }
391
392 pub fn run(&self) -> StreamResult<Mat> {
393 Materializer::new().materialize(self)
394 }
395
396 pub fn run_with_materializer(&self, materializer: &Materializer) -> StreamResult<Mat> {
397 materializer.materialize(self)
398 }
399
400 #[must_use]
401 pub fn map_materialized_value<Next, F>(self, f: F) -> RunnableGraph<Next>
402 where
403 Next: Send + 'static,
404 F: Fn(Mat) -> Next + Send + Sync + 'static,
405 {
406 let f = Arc::new(f);
407 RunnableGraph::from_runner(move |materializer| {
408 let mat = (self.runner)(materializer)?;
409 Ok(f(mat))
410 })
411 }
412
413 #[must_use]
414 pub fn attributes(&self) -> &Attributes {
415 &self.attributes
416 }
417
418 #[must_use]
419 pub fn with_attributes(mut self, attributes: Attributes) -> Self {
420 self.attributes = attributes;
421 self
422 }
423
424 #[must_use]
425 pub fn add_attributes(mut self, attributes: Attributes) -> Self {
426 self.attributes = self.attributes.and(attributes);
427 self
428 }
429
430 #[must_use]
431 pub fn named(self, name: impl Into<String>) -> Self {
432 self.add_attributes(Attributes::named(name))
433 }
434}
435
436impl<In: Clone + Send + 'static> Sink<In, NotUsed> {
437 #[must_use]
438 pub fn combine<M1, M2, MRest, I>(
439 first: Sink<In, M1>,
440 second: Sink<In, M2>,
441 rest: I,
442 strategy: SinkCombineStrategy,
443 ) -> Sink<In, NotUsed>
444 where
445 M1: Send + 'static,
446 M2: Send + 'static,
447 MRest: Send + 'static,
448 I: IntoIterator<Item = Sink<In, MRest>>,
449 {
450 let mut runners: Vec<Arc<CombinedSinkRunner<In>>> = vec![
451 Arc::new(move |materializer| {
452 let (sender, receiver) = std::sync::mpsc::sync_channel(0);
453 let mat = first.run(
454 Box::new(std::iter::from_fn(move || {
455 loop {
456 match receiver.recv().ok()? {
457 CombinedSinkMessage::Item(item) => return Some(item),
458 CombinedSinkMessage::Flush(ack) => {
459 let _ = ack.send(());
460 }
461 CombinedSinkMessage::Close => return None,
462 }
463 }
464 })),
465 materializer,
466 )?;
467 Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
468 }),
469 Arc::new(move |materializer| {
470 let (sender, receiver) = std::sync::mpsc::sync_channel(0);
471 let mat = second.run(
472 Box::new(std::iter::from_fn(move || {
473 loop {
474 match receiver.recv().ok()? {
475 CombinedSinkMessage::Item(item) => return Some(item),
476 CombinedSinkMessage::Flush(ack) => {
477 let _ = ack.send(());
478 }
479 CombinedSinkMessage::Close => return None,
480 }
481 }
482 })),
483 materializer,
484 )?;
485 Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
486 }),
487 ];
488 runners.extend(rest.into_iter().map(|sink| {
489 Arc::new(move |materializer: &Materializer| {
490 let (sender, receiver) = std::sync::mpsc::sync_channel(0);
491 let mat = sink.run(
492 Box::new(std::iter::from_fn(move || {
493 loop {
494 match receiver.recv().ok()? {
495 CombinedSinkMessage::Item(item) => return Some(item),
496 CombinedSinkMessage::Flush(ack) => {
497 let _ = ack.send(());
498 }
499 CombinedSinkMessage::Close => return None,
500 }
501 }
502 })),
503 materializer,
504 )?;
505 Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
506 }) as Arc<CombinedSinkRunner<In>>
507 }));
508
509 Sink::from_runner(move |mut input: BoxStream<In>, materializer| {
510 let mut children = runners
513 .iter()
514 .map(|runner| runner(materializer))
515 .collect::<StreamResult<Vec<_>>>()?;
516 let mut next = 0usize;
517 for item in input.by_ref() {
518 match item {
519 Ok(value) => match strategy {
520 SinkCombineStrategy::Broadcast => {
521 children.retain(|(sender, _)| {
522 sender
523 .send(CombinedSinkMessage::Item(Ok(value.clone())))
524 .is_ok()
525 });
526 if children.is_empty() {
527 break;
528 }
529 }
530 SinkCombineStrategy::Balance => {
531 while !children.is_empty() {
532 let index = next % children.len();
533 next = next.wrapping_add(1);
534 match children[index]
535 .0
536 .send(CombinedSinkMessage::Item(Ok(value.clone())))
537 {
538 Ok(()) => break,
539 Err(_) => {
540 children.remove(index);
541 }
542 }
543 }
544 if children.is_empty() {
545 break;
546 }
547 }
548 },
549 Err(error) => {
550 for (sender, _) in &children {
551 let _ = sender.send(CombinedSinkMessage::Item(Err(error.clone())));
552 }
553 return Err(error);
554 }
555 }
556 }
557 for (sender, _) in &children {
561 let (ack_sender, ack_receiver) = std::sync::mpsc::sync_channel(0);
562 if sender.send(CombinedSinkMessage::Flush(ack_sender)).is_ok() {
563 let _ = ack_receiver.recv();
564 }
565 }
566 let mats: Vec<_> = children
569 .into_iter()
570 .map(|(sender, mat)| {
571 let _ = sender.send(CombinedSinkMessage::Close);
572 mat
573 })
574 .collect();
575 drop(mats);
576 Ok(NotUsed)
577 })
578 }
579}
580
581impl<In: Send + 'static, Mat: Send + 'static> Sink<In, StreamCompletion<Mat>> {
582 fn from_task_runner<F>(runner: F) -> Self
583 where
584 F: Fn(BoxStream<In>) -> StreamResult<Mat> + Send + Sync + 'static,
585 {
586 Self::from_task_runner_with_inline(runner, false)
587 }
588
589 fn from_raw_terminal_task_runner<F>(runner: F) -> Self
590 where
591 F: Fn(
592 BoxStream<In>,
593 Materializer,
594 Arc<AtomicBool>,
595 SourceRuntimeHints,
596 ) -> StreamResult<Mat>
597 + Send
598 + Sync
599 + 'static,
600 {
601 let runner = Arc::new(runner);
602 let async_runner = {
603 let runner = Arc::clone(&runner);
604 Arc::new(move |input, materializer: &Materializer| {
605 let runner = Arc::clone(&runner);
606 let worker_materializer =
607 materializer.with_name_prefix(materializer.name_prefix().to_owned());
608 Ok(materializer.spawn_stream(move |cancelled| {
609 runner(
610 input,
611 worker_materializer,
612 cancelled,
613 SourceRuntimeHints::default(),
614 )
615 }))
616 }) as Arc<SinkRunner<In, StreamCompletion<Mat>>>
617 };
618 let raw_hinted_runner = {
619 let runner = Arc::clone(&runner);
620 Arc::new(
621 move |input, materializer: &Materializer, hints: SourceRuntimeHints| {
622 let runner = Arc::clone(&runner);
623 let worker_materializer =
624 materializer.with_name_prefix(materializer.name_prefix().to_owned());
625 Ok(materializer.spawn_stream(move |cancelled| {
626 runner(input, worker_materializer, cancelled, hints)
627 }))
628 },
629 ) as Arc<HintedSinkRunner<In, StreamCompletion<Mat>>>
630 };
631 Sink {
632 runner: async_runner,
633 inline_runner: None,
634 hinted_runner: None,
635 raw_hinted_runner: Some(raw_hinted_runner),
636 attributes: Attributes::default(),
637 deferred_factory: None,
638 fold_fp: None,
639 }
640 }
641
642 fn from_task_runner_with_inline<F>(runner: F, inline: bool) -> Self
643 where
644 F: Fn(BoxStream<In>) -> StreamResult<Mat> + Send + Sync + 'static,
645 {
646 let runner = Arc::new(runner);
647 let async_runner = {
648 let runner = Arc::clone(&runner);
649 Arc::new(move |input, materializer: &Materializer| {
650 let runner = Arc::clone(&runner);
651 let state = Arc::clone(&materializer.inner.state);
652 Ok(materializer.spawn_stream(move |cancelled| {
653 runner(runtime_checked_stream(input, state, Some(cancelled)))
654 }))
655 }) as Arc<SinkRunner<In, StreamCompletion<Mat>>>
656 };
657 let inline_runner = inline.then(|| {
658 let runner = Arc::clone(&runner);
659 Arc::new(move |input, materializer: &Materializer| {
660 let runner = Arc::clone(&runner);
661 let state = Arc::clone(&materializer.inner.state);
662 Ok(materializer.spawn_stream_inline(move |cancelled| {
663 runner(runtime_checked_stream(input, state, Some(cancelled)))
664 }))
665 }) as Arc<SinkRunner<In, StreamCompletion<Mat>>>
666 });
667 Sink::from_runner_parts(async_runner, inline_runner)
668 }
669}
670
671impl<In: Send + 'static> Sink<In, StreamCompletion<Vec<In>>> {
672 #[must_use]
673 pub fn collect() -> Self {
674 let task_runner =
675 Sink::from_raw_terminal_task_runner(|input, materializer, cancelled, hints| {
676 run_collect_terminal(input, materializer, cancelled, hints)
677 });
678 let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::CollectDescriptor::<In> {
679 _phantom: std::marker::PhantomData,
680 });
681 Sink {
682 runner: task_runner.runner,
683 inline_runner: task_runner.inline_runner,
684 hinted_runner: task_runner.hinted_runner,
685 raw_hinted_runner: task_runner.raw_hinted_runner,
686 attributes: task_runner.attributes,
687 deferred_factory: task_runner.deferred_factory,
688 fold_fp: Some(fp),
689 }
690 }
691
692 #[must_use]
693 pub fn collection() -> Self {
694 Self::collect()
695 }
696
697 #[must_use]
698 pub fn take_last(n: usize) -> Self {
699 Sink::from_task_runner(move |input| {
700 if n == 0 {
701 for item in input {
702 let _ = item?;
703 }
704 return Ok(Vec::new());
705 }
706 let mut buffer = VecDeque::with_capacity(n);
707 for item in input {
708 let item = item?;
709 if buffer.len() == n {
710 buffer.pop_front();
711 }
712 buffer.push_back(item);
713 }
714 Ok(buffer.into_iter().collect())
715 })
716 }
717}
718
719impl<In: Send + 'static> Sink<In, StreamCompletion<NotUsed>> {
720 #[must_use]
721 pub fn ignore() -> Self {
722 let task_runner =
723 Sink::from_raw_terminal_task_runner(|input, materializer, cancelled, hints| {
724 run_ignore_terminal(input, materializer, cancelled, hints)
725 });
726 let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::IgnoreDescriptor::<In> {
727 _phantom: std::marker::PhantomData,
728 });
729 Sink {
730 runner: task_runner.runner,
731 inline_runner: task_runner.inline_runner,
732 hinted_runner: task_runner.hinted_runner,
733 raw_hinted_runner: task_runner.raw_hinted_runner,
734 attributes: task_runner.attributes,
735 deferred_factory: task_runner.deferred_factory,
736 fold_fp: Some(fp),
737 }
738 }
739
740 #[must_use]
741 pub fn on_complete<F>(callback: F) -> Self
742 where
743 F: FnOnce() + Send + Sync + 'static,
744 {
745 let callback = Arc::new(Mutex::new(Some(callback)));
746 Sink::from_task_runner(move |input| {
747 for item in input {
748 item?;
749 }
750 if let Some(cb) = callback.lock().expect("on_complete poisoned").take() {
751 cb();
752 }
753 Ok(NotUsed)
754 })
755 }
756
757 #[must_use]
758 pub fn never() -> Self {
759 Sink::from_runner(|input, materializer| {
760 let state = Arc::clone(&materializer.inner.state);
761 let shutdown_state = Arc::clone(&state);
762 Ok(materializer.spawn_stream(move |cancelled| {
763 let input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
764 for item in input {
765 item?;
766 }
767 loop {
768 if shutdown_state.shutdown.load(Ordering::SeqCst) {
769 return Err(StreamError::AbruptTermination);
770 }
771 if cancelled.load(Ordering::SeqCst) {
772 return Err(StreamError::Cancelled);
773 }
774 thread::sleep(Duration::from_millis(1));
775 }
776 }))
777 })
778 }
779
780 #[must_use]
781 pub fn foreach<F>(f: F) -> Self
782 where
783 F: Fn(In) + Send + Sync + 'static,
784 {
785 let f_arc = Arc::new(f);
786 let f_runner = Arc::clone(&f_arc);
787 let task_runner = Sink::from_task_runner(move |input| {
788 for item in input {
789 f_runner(item?);
790 }
791 Ok(NotUsed)
792 });
793 let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::ForeachDescriptor { f: f_arc });
794 Sink {
795 runner: task_runner.runner,
796 inline_runner: task_runner.inline_runner,
797 hinted_runner: task_runner.hinted_runner,
798 raw_hinted_runner: task_runner.raw_hinted_runner,
799 attributes: task_runner.attributes,
800 deferred_factory: task_runner.deferred_factory,
801 fold_fp: Some(fp),
802 }
803 }
804
805 #[must_use]
806 pub fn foreach_async<F, Fut>(parallelism: usize, f: F) -> Self
807 where
808 F: Fn(In) -> Fut + Send + Sync + 'static,
809 Fut: Future<Output = StreamResult<()>> + Send + 'static,
810 {
811 Flow::identity()
812 .map_async_unordered(parallelism, f)
813 .to_mat(Sink::ignore(), Keep::right)
814 }
815
816 #[must_use]
817 pub fn foreach_result<F>(f: F) -> Self
818 where
819 F: Fn(In) -> StreamResult<()> + Send + Sync + 'static,
820 {
821 Sink::from_task_runner(move |input| {
822 for item in input {
823 f(item?)?;
824 }
825 Ok(NotUsed)
826 })
827 }
828
829 #[must_use]
830 pub fn foreach_result_with_supervision<F>(f: F, decider: SupervisionDecider) -> Self
831 where
832 F: Fn(In) -> StreamResult<()> + Send + Sync + 'static,
833 {
834 Sink::from_task_runner(move |input| {
835 for item in input {
836 let item = item?;
837 match catch_unwind(AssertUnwindSafe(|| f(item)))
838 .unwrap_or_else(|_| Err(panic_stream_error("foreach_result callback")))
839 {
840 Ok(()) => {}
841 Err(error) => match decide_supervision(&decider, &error) {
842 SupervisionDirective::Stop => return Err(error),
843 SupervisionDirective::Resume | SupervisionDirective::Restart => {}
844 },
845 }
846 }
847 Ok(NotUsed)
848 })
849 }
850}
851
852impl<In: Send + 'static> Sink<In, StreamCompletion<In>> {
853 #[must_use]
870 pub fn head() -> Self {
871 Sink::from_task_runner_with_inline(
872 |mut input| input.next().unwrap_or(Err(StreamError::EmptyStream)),
873 true,
874 )
875 }
876
877 #[must_use]
878 pub fn last() -> Self {
879 Sink::from_task_runner(|input| {
880 let mut last = None;
881 for item in input {
882 last = Some(item?);
883 }
884 last.ok_or(StreamError::EmptyStream)
885 })
886 }
887
888 #[must_use]
889 pub fn reduce<F>(f: F) -> Self
890 where
891 F: Fn(In, In) -> In + Send + Sync + 'static,
892 {
893 Sink::from_task_runner(move |mut input| {
894 let mut acc = input.next().unwrap_or(Err(StreamError::EmptyStream))?;
895 for item in input {
896 acc = f(acc, item?);
897 }
898 Ok(acc)
899 })
900 }
901
902 #[must_use]
903 pub fn reduce_result<F>(f: F) -> Self
904 where
905 F: Fn(In, In) -> StreamResult<In> + Send + Sync + 'static,
906 {
907 Sink::from_task_runner(move |mut input| {
908 let mut acc = input.next().unwrap_or(Err(StreamError::EmptyStream))?;
909 for item in input {
910 acc = f(acc, item?)?;
911 }
912 Ok(acc)
913 })
914 }
915
916 #[must_use]
917 pub fn reduce_result_with_supervision<F>(f: F, decider: SupervisionDecider) -> Self
918 where
919 In: Clone,
920 F: Fn(In, In) -> StreamResult<In> + Send + Sync + 'static,
921 {
922 Sink::from_task_runner(move |mut input: BoxStream<In>| {
923 let mut acc = Some(input.next().unwrap_or(Err(StreamError::EmptyStream))?);
924 for item in input {
925 let item = item?;
926 let Some(previous) = acc.take() else {
927 acc = Some(item);
928 continue;
929 };
930 match catch_unwind(AssertUnwindSafe(|| f(previous.clone(), item)))
931 .unwrap_or_else(|_| Err(panic_stream_error("reduce_result callback")))
932 {
933 Ok(next) => acc = Some(next),
934 Err(error) => match decide_supervision(&decider, &error) {
935 SupervisionDirective::Stop => return Err(error),
936 SupervisionDirective::Resume => acc = Some(previous),
937 SupervisionDirective::Restart => acc = None,
938 },
939 }
940 }
941 acc.ok_or(StreamError::EmptyStream)
942 })
943 }
944}
945
946impl<In: Send + 'static> Sink<In, StreamCompletion<Option<In>>> {
947 #[must_use]
964 pub fn head_option() -> Self {
965 Sink::from_task_runner_with_inline(
966 |mut input| match input.next() {
967 Some(Ok(item)) => Ok(Some(item)),
968 Some(Err(error)) => Err(error),
969 None => Ok(None),
970 },
971 true,
972 )
973 }
974
975 #[must_use]
976 pub fn last_option() -> Self {
977 Sink::from_task_runner(|input| {
978 let mut last = None;
979 for item in input {
980 last = Some(item?);
981 }
982 Ok(last)
983 })
984 }
985}
986
987impl<In: Send + 'static> Sink<In, NotUsed> {
988 #[must_use]
989 pub fn cancelled() -> Self {
990 Sink::from_runner(|_input, _materializer| Ok(NotUsed))
991 }
992
993 #[must_use]
994 pub fn future_sink<InnerMat, F, Fut>(future: F) -> Sink<In, StreamCompletion<InnerMat>>
995 where
996 InnerMat: Send + 'static,
997 F: Fn() -> Fut + Send + Sync + 'static,
998 Fut: Future<Output = StreamResult<Sink<In, InnerMat>>> + Send + 'static,
999 {
1000 Self::lazy_future_sink(future)
1001 }
1002
1003 #[must_use]
1004 pub fn lazy_sink<InnerMat, F>(create: F) -> Sink<In, StreamCompletion<InnerMat>>
1005 where
1006 InnerMat: Send + 'static,
1007 F: Fn() -> Sink<In, InnerMat> + Send + Sync + 'static,
1008 {
1009 let create = Arc::new(create);
1010 Sink::from_runner(move |input, materializer| {
1011 let create = Arc::clone(&create);
1012 let state = Arc::clone(&materializer.inner.state);
1013 let worker_materializer =
1014 materializer.with_name_prefix(materializer.name_prefix().to_owned());
1015 Ok(materializer.spawn_stream(move |cancelled| {
1016 let input = runtime_checked_stream(input, state, Some(cancelled));
1017 run_lazy_sink(input, &worker_materializer, move || {
1018 catch_unwind_failed("lazy_sink factory", || create())
1019 })
1020 }))
1021 })
1022 }
1023
1024 #[must_use]
1025 pub fn lazy_future_sink<InnerMat, F, Fut>(create: F) -> Sink<In, StreamCompletion<InnerMat>>
1026 where
1027 InnerMat: Send + 'static,
1028 F: Fn() -> Fut + Send + Sync + 'static,
1029 Fut: Future<Output = StreamResult<Sink<In, InnerMat>>> + Send + 'static,
1030 {
1031 let create = Arc::new(create);
1032 Sink::from_runner(move |input, materializer| {
1033 let create = Arc::clone(&create);
1034 let state = Arc::clone(&materializer.inner.state);
1035 let worker_materializer =
1036 materializer.with_name_prefix(materializer.name_prefix().to_owned());
1037 Ok(materializer.spawn_stream(move |cancelled| {
1038 let input = runtime_checked_stream(input, state, Some(cancelled));
1039 run_lazy_sink(input, &worker_materializer, move || {
1040 catch_unwind_failed("lazy_future_sink factory", || create())
1041 .and_then(flow::run_future_inline_or_spawn)
1042 })
1043 }))
1044 })
1045 }
1046
1047 #[must_use]
1048 pub fn fold<Acc, F>(zero: Acc, f: F) -> Sink<In, StreamCompletion<Acc>>
1049 where
1050 Acc: Clone + Send + Sync + 'static,
1051 F: Fn(Acc, In) -> Acc + Send + Sync + 'static,
1052 {
1053 let f_arc = Arc::new(f);
1054 let zero_clone = zero.clone();
1055 let f_arc2 = Arc::clone(&f_arc);
1056 let task_runner = {
1057 let zero = zero;
1058 Sink::from_raw_terminal_task_runner(move |input, materializer, cancelled, hints| {
1059 run_fold_terminal(
1060 input,
1061 materializer,
1062 cancelled,
1063 hints,
1064 zero.clone(),
1065 f_arc.as_ref(),
1066 )
1067 })
1068 };
1069 let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::FoldDescriptor {
1070 zero: zero_clone,
1071 f: f_arc2,
1072 });
1073 Sink {
1074 runner: task_runner.runner,
1075 inline_runner: task_runner.inline_runner,
1076 hinted_runner: task_runner.hinted_runner,
1077 raw_hinted_runner: task_runner.raw_hinted_runner,
1078 attributes: task_runner.attributes,
1079 deferred_factory: task_runner.deferred_factory,
1080 fold_fp: Some(fp),
1081 }
1082 }
1083
1084 #[must_use]
1085 pub fn fold_result<Acc, F>(zero: Acc, f: F) -> Sink<In, StreamCompletion<Acc>>
1086 where
1087 Acc: Clone + Send + Sync + 'static,
1088 F: Fn(Acc, In) -> StreamResult<Acc> + Send + Sync + 'static,
1089 {
1090 let f_arc = Arc::new(f);
1091 let zero_clone = zero.clone();
1092 let f_arc2 = Arc::clone(&f_arc);
1093 let task_runner = {
1094 let zero = zero;
1095 Sink::from_raw_terminal_task_runner(move |input, materializer, cancelled, hints| {
1096 run_fold_result_terminal(
1097 input,
1098 materializer,
1099 cancelled,
1100 hints,
1101 zero.clone(),
1102 f_arc.as_ref(),
1103 )
1104 })
1105 };
1106 let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::FoldResultDescriptor {
1107 zero: zero_clone,
1108 f: f_arc2,
1109 });
1110 Sink {
1111 runner: task_runner.runner,
1112 inline_runner: task_runner.inline_runner,
1113 hinted_runner: task_runner.hinted_runner,
1114 raw_hinted_runner: task_runner.raw_hinted_runner,
1115 attributes: task_runner.attributes,
1116 deferred_factory: task_runner.deferred_factory,
1117 fold_fp: Some(fp),
1118 }
1119 }
1120
1121 #[must_use]
1122 pub fn fold_result_with_supervision<Acc, F>(
1123 zero: Acc,
1124 f: F,
1125 decider: SupervisionDecider,
1126 ) -> Sink<In, StreamCompletion<Acc>>
1127 where
1128 Acc: Clone + Send + Sync + 'static,
1129 F: Fn(Acc, In) -> StreamResult<Acc> + Send + Sync + 'static,
1130 {
1131 Sink::from_task_runner(move |input| {
1132 let mut acc = zero.clone();
1133 for item in input {
1134 let item = item?;
1135 let previous = acc;
1136 match catch_unwind(AssertUnwindSafe(|| f(previous.clone(), item)))
1137 .unwrap_or_else(|_| Err(panic_stream_error("fold_result callback")))
1138 {
1139 Ok(next) => acc = next,
1140 Err(error) => match decide_supervision(&decider, &error) {
1141 SupervisionDirective::Stop => return Err(error),
1142 SupervisionDirective::Resume => acc = previous,
1143 SupervisionDirective::Restart => acc = zero.clone(),
1144 },
1145 }
1146 }
1147 Ok(acc)
1148 })
1149 }
1150}
1151
1152fn run_lazy_sink<In, InnerMat, F>(
1153 mut input: BoxStream<In>,
1154 materializer: &Materializer,
1155 create: F,
1156) -> StreamResult<InnerMat>
1157where
1158 In: Send + 'static,
1159 InnerMat: Send + 'static,
1160 F: FnOnce() -> StreamResult<Sink<In, InnerMat>>,
1161{
1162 let first = match input.next() {
1163 Some(Ok(item)) => item,
1164 Some(Err(error)) => return Err(error),
1165 None => {
1166 return Err(StreamError::Failed(
1167 "lazy sink was never materialized".into(),
1168 ));
1169 }
1170 };
1171 let sink = create()?;
1172 sink.run(prepend_first_stream(first, input), materializer)
1173}
1174
1175fn prepend_first_stream<In>(first: In, mut rest: BoxStream<In>) -> BoxStream<In>
1176where
1177 In: Send + 'static,
1178{
1179 let mut first = Some(first);
1180 Box::new(std::iter::from_fn(move || {
1181 if let Some(item) = first.take() {
1182 Some(Ok(item))
1183 } else {
1184 rest.next()
1185 }
1186 }))
1187}
1188
1189fn terminal_consumer_status(
1190 materializer: &Materializer,
1191 cancelled: &Arc<AtomicBool>,
1192) -> StreamResult<()> {
1193 if materializer.is_shutdown() {
1194 Err(StreamError::AbruptTermination)
1195 } else if cancelled.load(Ordering::SeqCst) {
1196 Err(StreamError::Cancelled)
1197 } else {
1198 Ok(())
1199 }
1200}
1201
1202fn run_collect_terminal<In: Send + 'static>(
1203 mut input: BoxStream<In>,
1204 materializer: Materializer,
1205 cancelled: Arc<AtomicBool>,
1206 hints: SourceRuntimeHints,
1207) -> StreamResult<Vec<In>> {
1208 if !hints.terminal_consumer_batch {
1209 let input = runtime_checked_stream(
1210 input,
1211 Arc::clone(&materializer.inner.state),
1212 Some(cancelled),
1213 );
1214 return input.collect();
1215 }
1216
1217 let mut items = Vec::with_capacity(hints.inline_micro_max_success_items.unwrap_or(0));
1218 loop {
1219 terminal_consumer_status(&materializer, &cancelled)?;
1220 {
1221 let _cancel_scope = set_current_stream_cancelled(&cancelled);
1222 for _ in 0..TERMINAL_CONSUMER_BATCH {
1223 match input.next() {
1224 Some(Ok(item)) => items.push(item),
1225 Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
1226 Ok(()) => return Err(error),
1227 Err(status) => return Err(status),
1228 },
1229 None => {
1230 return terminal_consumer_status(&materializer, &cancelled).map(|()| items);
1231 }
1232 }
1233 }
1234 }
1235 }
1236}
1237
1238fn run_ignore_terminal<In: Send + 'static>(
1239 input: BoxStream<In>,
1240 materializer: Materializer,
1241 cancelled: Arc<AtomicBool>,
1242 hints: SourceRuntimeHints,
1243) -> StreamResult<NotUsed> {
1244 if !hints.terminal_consumer_batch {
1245 let input = runtime_checked_stream(
1246 input,
1247 Arc::clone(&materializer.inner.state),
1248 Some(cancelled),
1249 );
1250 for item in input {
1251 item?;
1252 }
1253 return Ok(NotUsed);
1254 }
1255
1256 let mut input = input;
1257 loop {
1258 terminal_consumer_status(&materializer, &cancelled)?;
1259 {
1260 let _cancel_scope = set_current_stream_cancelled(&cancelled);
1261 for _ in 0..TERMINAL_CONSUMER_BATCH {
1262 match input.next() {
1263 Some(Ok(_)) => {}
1264 Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
1265 Ok(()) => return Err(error),
1266 Err(status) => return Err(status),
1267 },
1268 None => {
1269 return terminal_consumer_status(&materializer, &cancelled)
1270 .map(|()| NotUsed);
1271 }
1272 }
1273 }
1274 }
1275 }
1276}
1277
1278fn run_fold_terminal<In, Acc, F>(
1279 input: BoxStream<In>,
1280 materializer: Materializer,
1281 cancelled: Arc<AtomicBool>,
1282 hints: SourceRuntimeHints,
1283 zero: Acc,
1284 f: &F,
1285) -> StreamResult<Acc>
1286where
1287 In: Send + 'static,
1288 Acc: Send + 'static,
1289 F: Fn(Acc, In) -> Acc,
1290{
1291 if !hints.terminal_consumer_batch {
1292 let input = runtime_checked_stream(
1293 input,
1294 Arc::clone(&materializer.inner.state),
1295 Some(cancelled),
1296 );
1297 let mut acc = zero;
1298 for item in input {
1299 acc = f(acc, item?);
1300 }
1301 return Ok(acc);
1302 }
1303
1304 let mut input = input;
1305 let mut acc = zero;
1306 loop {
1307 terminal_consumer_status(&materializer, &cancelled)?;
1308 {
1309 let _cancel_scope = set_current_stream_cancelled(&cancelled);
1310 for _ in 0..TERMINAL_CONSUMER_BATCH {
1311 match input.next() {
1312 Some(Ok(item)) => acc = f(acc, item),
1313 Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
1314 Ok(()) => return Err(error),
1315 Err(status) => return Err(status),
1316 },
1317 None => {
1318 return terminal_consumer_status(&materializer, &cancelled).map(|()| acc);
1319 }
1320 }
1321 }
1322 }
1323 }
1324}
1325
1326fn run_fold_result_terminal<In, Acc, F>(
1327 input: BoxStream<In>,
1328 materializer: Materializer,
1329 cancelled: Arc<AtomicBool>,
1330 hints: SourceRuntimeHints,
1331 zero: Acc,
1332 f: &F,
1333) -> StreamResult<Acc>
1334where
1335 In: Send + 'static,
1336 Acc: Send + 'static,
1337 F: Fn(Acc, In) -> StreamResult<Acc>,
1338{
1339 if !hints.terminal_consumer_batch {
1340 let input = runtime_checked_stream(
1341 input,
1342 Arc::clone(&materializer.inner.state),
1343 Some(cancelled),
1344 );
1345 let mut acc = zero;
1346 for item in input {
1347 acc = f(acc, item?)?;
1348 }
1349 return Ok(acc);
1350 }
1351
1352 let mut input = input;
1353 let mut acc = Some(zero);
1354 loop {
1355 terminal_consumer_status(&materializer, &cancelled)?;
1356 {
1357 let _cancel_scope = set_current_stream_cancelled(&cancelled);
1358 for _ in 0..TERMINAL_CONSUMER_BATCH {
1359 match input.next() {
1360 Some(Ok(item)) => {
1361 let previous = acc.take().expect("fold accumulator present");
1362 match f(previous, item) {
1363 Ok(next) => acc = Some(next),
1364 Err(error) => {
1365 return match terminal_consumer_status(&materializer, &cancelled) {
1366 Ok(()) => Err(error),
1367 Err(status) => Err(status),
1368 };
1369 }
1370 }
1371 }
1372 Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
1373 Ok(()) => return Err(error),
1374 Err(status) => return Err(status),
1375 },
1376 None => {
1377 return terminal_consumer_status(&materializer, &cancelled)
1378 .map(|()| acc.expect("fold accumulator present"));
1379 }
1380 }
1381 }
1382 }
1383 }
1384}
1385
1386#[cfg(test)]
1387mod tests {
1388 use super::*;
1389 use crate::{Source, StreamCompletion};
1390 use std::time::Instant;
1391
1392 fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
1393 let deadline = Instant::now() + timeout;
1394 while Instant::now() < deadline {
1395 if condition() {
1396 return true;
1397 }
1398 thread::sleep(Duration::from_millis(5));
1399 }
1400 condition()
1401 }
1402
1403 #[test]
1404 fn map_materialized_value_on_deferred_sink_does_not_explode() {
1405 let sink = Sink::<u64, _>::setup(|_, _| Sink::fold(0u64, |acc, x| acc + x));
1408 let sink = sink
1412 .map_materialized_value(|sc: StreamCompletion<u64>| sc)
1413 .map_materialized_value(|sc| sc);
1414
1415 let sum = Source::from_iter(1u64..=3)
1416 .run_with(sink)
1417 .unwrap()
1418 .wait()
1419 .unwrap();
1420 assert_eq!(sum, 6u64);
1421 }
1422
1423 #[test]
1424 fn batched_terminal_fold_observes_completion_drop_cancellation() {
1425 let materializer = Materializer::new();
1426 let completion = Source::repeat(1_u64)
1427 .run_with_materializer(Sink::fold(0_u64, |acc, item| acc + item), &materializer)
1428 .expect("fold terminal materializes");
1429
1430 assert!(wait_until(Duration::from_secs(1), || {
1431 materializer.active_streams() == 1
1432 }));
1433 drop(completion);
1434
1435 assert!(wait_until(Duration::from_secs(5), || {
1436 materializer.active_streams() == 0
1437 }));
1438 }
1439}