1use super::flow;
9use super::*;
10use crate::Attributes;
11use crate::stream::error::{decide_supervision, panic_stream_error};
12
13type CombinedSinkChild<In> = (
18 std::sync::mpsc::SyncSender<CombinedSinkMessage<In>>,
19 Box<dyn std::any::Any + Send>,
20);
21
22type CombinedSinkRunner<In> =
23 dyn Fn(&Materializer) -> StreamResult<CombinedSinkChild<In>> + Send + Sync;
24type DeferredSinkFactory<In, Mat> =
25 dyn Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync;
26
27enum CombinedSinkMessage<In> {
28 Item(StreamResult<In>),
29 Flush(std::sync::mpsc::SyncSender<()>),
30 Close,
31}
32
33const TERMINAL_CONSUMER_BATCH: usize = 64;
34
35pub struct Sink<In, Mat> {
36 runner: Arc<SinkRunner<In, Mat>>,
37 inline_runner: Option<Arc<SinkRunner<In, Mat>>>,
38 hinted_runner: Option<Arc<HintedSinkRunner<In, Mat>>>,
39 raw_hinted_runner: Option<Arc<HintedSinkRunner<In, Mat>>>,
40 attributes: Attributes,
41 deferred_factory: Option<Arc<DeferredSinkFactory<In, Mat>>>,
42 pub(crate) fold_fp: Option<Arc<dyn FoldFastPathDyn<In>>>,
44}
45
46fn map_mat_dyn<In, Mat, NextMat>(
50 sink: Sink<In, Mat>,
51 f: Arc<dyn Fn(Mat) -> NextMat + Send + Sync + 'static>,
52) -> Sink<In, NextMat>
53where
54 In: Send + 'static,
55 Mat: Send + 'static,
56 NextMat: Send + 'static,
57{
58 let Sink {
59 runner,
60 inline_runner,
61 hinted_runner,
62 raw_hinted_runner,
63 attributes,
64 deferred_factory,
65 fold_fp: _,
66 } = sink;
67 let mapped_runner = {
68 let f = Arc::clone(&f);
69 Arc::new(move |input, materializer: &Materializer| {
70 let mat = runner(input, materializer)?;
71 Ok(f(mat))
72 }) as Arc<SinkRunner<In, NextMat>>
73 };
74 let mapped_inline_runner = inline_runner.map(|ir| {
75 let f = Arc::clone(&f);
76 Arc::new(move |input, materializer: &Materializer| {
77 let result = ir(input, materializer)?;
78 Ok(f(result))
79 }) as Arc<SinkRunner<In, NextMat>>
80 });
81 let mapped_hinted_runner = hinted_runner.map(|hr| {
82 let f = Arc::clone(&f);
83 Arc::new(
84 move |input, materializer: &Materializer, hints: SourceRuntimeHints| {
85 let result = hr(input, materializer, hints)?;
86 Ok(f(result))
87 },
88 ) as Arc<HintedSinkRunner<In, NextMat>>
89 });
90 let mapped_raw_hinted_runner = raw_hinted_runner.map(|hr| {
91 let f = Arc::clone(&f);
92 Arc::new(
93 move |input, materializer: &Materializer, hints: SourceRuntimeHints| {
94 let result = hr(input, materializer, hints)?;
95 Ok(f(result))
96 },
97 ) as Arc<HintedSinkRunner<In, NextMat>>
98 });
99 let mapped_factory = deferred_factory.map(|factory| {
100 let f = Arc::clone(&f);
101 Arc::new(move |materializer: &Materializer, attrs: &Attributes| {
102 map_mat_dyn(factory(materializer, attrs), Arc::clone(&f))
103 }) as Arc<DeferredSinkFactory<In, NextMat>>
104 });
105 Sink {
106 runner: mapped_runner,
107 inline_runner: mapped_inline_runner,
108 hinted_runner: mapped_hinted_runner,
109 raw_hinted_runner: mapped_raw_hinted_runner,
110 attributes,
111 deferred_factory: mapped_factory,
112 fold_fp: None,
113 }
114}
115
116impl<In, Mat> Clone for Sink<In, Mat> {
117 fn clone(&self) -> Self {
118 Self {
119 runner: Arc::clone(&self.runner),
120 inline_runner: self.inline_runner.as_ref().map(Arc::clone),
121 hinted_runner: self.hinted_runner.as_ref().map(Arc::clone),
122 raw_hinted_runner: self.raw_hinted_runner.as_ref().map(Arc::clone),
123 attributes: self.attributes.clone(),
124 deferred_factory: self.deferred_factory.as_ref().map(Arc::clone),
125 fold_fp: self.fold_fp.as_ref().map(Arc::clone),
126 }
127 }
128}
129
130impl<In: Send + 'static, Mat: Send + 'static> Sink<In, Mat> {
131 pub(crate) fn from_runner<F>(runner: F) -> Self
132 where
133 F: Fn(BoxStream<In>, &Materializer) -> StreamResult<Mat> + Send + Sync + 'static,
134 {
135 Self::from_runner_parts(Arc::new(runner), None)
136 }
137
138 pub(crate) fn from_runner_parts(
139 runner: Arc<SinkRunner<In, Mat>>,
140 inline_runner: Option<Arc<SinkRunner<In, Mat>>>,
141 ) -> Self {
142 Self {
143 runner,
144 inline_runner,
145 hinted_runner: None,
146 raw_hinted_runner: None,
147 attributes: Attributes::default(),
148 deferred_factory: None,
149 fold_fp: None,
150 }
151 }
152
153 pub(crate) fn from_hinted_runner<F>(runner: F) -> Self
154 where
155 F: Fn(BoxStream<In>, &Materializer, SourceRuntimeHints) -> StreamResult<Mat>
156 + Send
157 + Sync
158 + 'static,
159 {
160 let hinted_runner: Arc<HintedSinkRunner<In, Mat>> = Arc::new(runner);
161 let fallback = {
162 let hinted_runner = Arc::clone(&hinted_runner);
163 Arc::new(move |input, materializer: &Materializer| {
164 hinted_runner(input, materializer, SourceRuntimeHints::default())
165 }) as Arc<SinkRunner<In, Mat>>
166 };
167 Self {
168 runner: fallback,
169 inline_runner: None,
170 hinted_runner: Some(hinted_runner),
171 raw_hinted_runner: None,
172 attributes: Attributes::default(),
173 deferred_factory: None,
174 fold_fp: None,
175 }
176 }
177
178 pub(crate) fn from_raw_hinted_runner<F>(runner: F) -> Self
179 where
180 F: Fn(BoxStream<In>, &Materializer, SourceRuntimeHints) -> StreamResult<Mat>
181 + Send
182 + Sync
183 + 'static,
184 {
185 let raw_hinted_runner: Arc<HintedSinkRunner<In, Mat>> = Arc::new(runner);
186 let fallback = {
187 let raw_hinted_runner = Arc::clone(&raw_hinted_runner);
188 Arc::new(move |input, materializer: &Materializer| {
189 let input =
190 runtime_checked_stream(input, Arc::clone(&materializer.inner.state), None);
191 raw_hinted_runner(input, materializer, SourceRuntimeHints::default())
192 }) as Arc<SinkRunner<In, Mat>>
193 };
194 Self {
195 runner: fallback,
196 inline_runner: None,
197 hinted_runner: None,
198 raw_hinted_runner: Some(raw_hinted_runner),
199 attributes: Attributes::default(),
200 deferred_factory: None,
201 fold_fp: None,
202 }
203 }
204
205 pub(super) fn run(
206 &self,
207 input: BoxStream<In>,
208 materializer: &Materializer,
209 ) -> StreamResult<Mat> {
210 self.run_with_source_hints(input, materializer, SourceRuntimeHints::default())
211 }
212
213 pub(super) fn run_with_source_hints(
214 &self,
215 input: BoxStream<In>,
216 materializer: &Materializer,
217 hints: SourceRuntimeHints,
218 ) -> StreamResult<Mat> {
219 if let Some(factory) = &self.deferred_factory {
220 let attrs = materializer.effective_attributes(&self.attributes);
221 return factory(materializer, &attrs).run_with_source_hints(input, materializer, hints);
222 }
223 if let Some(hinted_runner) = &self.hinted_runner {
224 return hinted_runner(input, materializer, hints);
225 }
226 (self.runner)(input, materializer)
227 }
228
229 pub(super) fn run_from_source(
230 &self,
231 input: BoxStream<In>,
232 materializer: &Materializer,
233 hints: SourceRuntimeHints,
234 ) -> StreamResult<Mat> {
235 if let Some(factory) = &self.deferred_factory {
236 let attrs = materializer.effective_attributes(&self.attributes);
237 return factory(materializer, &attrs).run_from_source(input, materializer, hints);
238 }
239 if let Some(raw_hinted_runner) = &self.raw_hinted_runner {
240 return raw_hinted_runner(input, materializer, hints);
241 }
242 let input = runtime_checked_stream(input, Arc::clone(&materializer.inner.state), None);
243 self.run_with_source_hints(input, materializer, hints)
244 }
245
246 pub(super) fn can_inline(&self) -> bool {
247 self.inline_runner.is_some()
248 }
249
250 pub(super) fn run_inline(
251 &self,
252 input: BoxStream<In>,
253 materializer: &Materializer,
254 ) -> StreamResult<Mat> {
255 if let Some(factory) = &self.deferred_factory {
256 let attrs = materializer.effective_attributes(&self.attributes);
257 return factory(materializer, &attrs).run_inline(input, materializer);
258 }
259 (self
260 .inline_runner
261 .as_ref()
262 .expect("inline sink runner exists"))(input, materializer)
263 }
264
265 pub fn run_with<SourceMat: Send + 'static>(
266 self,
267 source: Source<In, SourceMat>,
268 ) -> StreamResult<SourceMat> {
269 source.to(self).run()
270 }
271
272 pub fn run_with_materializer<SourceMat: Send + 'static>(
273 self,
274 source: Source<In, SourceMat>,
275 materializer: &Materializer,
276 ) -> StreamResult<SourceMat> {
277 source.to(self).run_with_materializer(materializer)
278 }
279
280 #[must_use]
281 pub fn from_materializer<F>(factory: F) -> Self
282 where
283 F: Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync + 'static,
284 {
285 let factory = Arc::new(factory);
286 Self {
287 runner: Arc::new(|_input, _materializer| {
288 Err(StreamError::Failed(
289 "deferred sink factory must be driven through Sink::run".into(),
290 ))
291 }),
292 inline_runner: None,
293 hinted_runner: None,
294 raw_hinted_runner: None,
295 attributes: Attributes::default(),
296 deferred_factory: Some(factory),
297 fold_fp: None,
298 }
299 }
300
301 #[must_use]
302 pub fn setup<F>(factory: F) -> Self
303 where
304 F: Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync + 'static,
305 {
306 Self::from_materializer(factory)
307 }
308
309 pub fn pre_materialize(
310 &self,
311 materializer: &Materializer,
312 ) -> StreamResult<(Mat, Sink<In, NotUsed>)> {
313 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
314 let materialized = self.clone().run(
315 Box::new(std::iter::from_fn(move || receiver.recv().ok())),
316 materializer,
317 )?;
318 let sender = Arc::new(Mutex::new(Some(sender)));
319 let sink = Sink::from_runner(move |input, _materializer| {
320 let Some(sender) = sender
321 .lock()
322 .expect("pre-materialized sink poisoned")
323 .take()
324 else {
325 return Err(StreamError::Failed(
326 "pre-materialized sink has already been materialized".into(),
327 ));
328 };
329 for item in input {
330 if sender.send(item).is_err() {
331 break;
332 }
333 }
334 Ok(NotUsed)
335 });
336 Ok((materialized, sink.with_attributes(self.attributes.clone())))
337 }
338
339 #[must_use]
340 pub fn map_materialized_value<NextMat, F>(self, f: F) -> Sink<In, NextMat>
341 where
342 NextMat: Send + 'static,
343 F: Fn(Mat) -> NextMat + Send + Sync + 'static,
344 {
345 map_mat_dyn(self, Arc::new(f))
351 }
352
353 #[must_use]
354 pub fn attributes(&self) -> &Attributes {
355 &self.attributes
356 }
357
358 #[must_use]
359 pub fn with_attributes(mut self, attributes: Attributes) -> Self {
360 self.attributes = attributes;
361 self
362 }
363
364 #[must_use]
365 pub fn add_attributes(mut self, attributes: Attributes) -> Self {
366 self.attributes = self.attributes.and(attributes);
367 self
368 }
369
370 #[must_use]
371 pub fn named(self, name: impl Into<String>) -> Self {
372 self.add_attributes(Attributes::named(name))
373 }
374}
375
376#[derive(Clone)]
377pub struct RunnableGraph<Mat> {
378 pub(super) runner: Arc<RunnableGraphRunner<Mat>>,
379 attributes: Attributes,
380}
381
382#[derive(Debug, Clone, Copy, PartialEq, Eq)]
383pub enum SinkCombineStrategy {
384 Broadcast,
385 Balance,
386}
387
388impl<Mat: Send + 'static> RunnableGraph<Mat> {
389 pub(super) fn from_runner<F>(runner: F) -> Self
390 where
391 F: Fn(&Materializer) -> StreamResult<Mat> + Send + Sync + 'static,
392 {
393 Self {
394 runner: Arc::new(runner),
395 attributes: Attributes::default(),
396 }
397 }
398
399 pub fn run(&self) -> StreamResult<Mat> {
400 Materializer::new().materialize(self)
401 }
402
403 pub fn run_with_materializer(&self, materializer: &Materializer) -> StreamResult<Mat> {
404 materializer.materialize(self)
405 }
406
407 #[must_use]
408 pub fn map_materialized_value<Next, F>(self, f: F) -> RunnableGraph<Next>
409 where
410 Next: Send + 'static,
411 F: Fn(Mat) -> Next + Send + Sync + 'static,
412 {
413 let f = Arc::new(f);
414 RunnableGraph::from_runner(move |materializer| {
415 let mat = (self.runner)(materializer)?;
416 Ok(f(mat))
417 })
418 }
419
420 #[must_use]
421 pub fn attributes(&self) -> &Attributes {
422 &self.attributes
423 }
424
425 #[must_use]
426 pub fn with_attributes(mut self, attributes: Attributes) -> Self {
427 self.attributes = attributes;
428 self
429 }
430
431 #[must_use]
432 pub fn add_attributes(mut self, attributes: Attributes) -> Self {
433 self.attributes = self.attributes.and(attributes);
434 self
435 }
436
437 #[must_use]
438 pub fn named(self, name: impl Into<String>) -> Self {
439 self.add_attributes(Attributes::named(name))
440 }
441}
442
443impl<In: Clone + Send + 'static> Sink<In, NotUsed> {
444 #[must_use]
445 pub fn combine<M1, M2, MRest, I>(
446 first: Sink<In, M1>,
447 second: Sink<In, M2>,
448 rest: I,
449 strategy: SinkCombineStrategy,
450 ) -> Sink<In, NotUsed>
451 where
452 M1: Send + 'static,
453 M2: Send + 'static,
454 MRest: Send + 'static,
455 I: IntoIterator<Item = Sink<In, MRest>>,
456 {
457 let mut runners: Vec<Arc<CombinedSinkRunner<In>>> = vec![
458 Arc::new(move |materializer| {
459 let (sender, receiver) = std::sync::mpsc::sync_channel(0);
460 let mat = first.run(
461 Box::new(std::iter::from_fn(move || {
462 loop {
463 match receiver.recv().ok()? {
464 CombinedSinkMessage::Item(item) => return Some(item),
465 CombinedSinkMessage::Flush(ack) => {
466 let _ = ack.send(());
467 }
468 CombinedSinkMessage::Close => return None,
469 }
470 }
471 })),
472 materializer,
473 )?;
474 Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
475 }),
476 Arc::new(move |materializer| {
477 let (sender, receiver) = std::sync::mpsc::sync_channel(0);
478 let mat = second.run(
479 Box::new(std::iter::from_fn(move || {
480 loop {
481 match receiver.recv().ok()? {
482 CombinedSinkMessage::Item(item) => return Some(item),
483 CombinedSinkMessage::Flush(ack) => {
484 let _ = ack.send(());
485 }
486 CombinedSinkMessage::Close => return None,
487 }
488 }
489 })),
490 materializer,
491 )?;
492 Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
493 }),
494 ];
495 runners.extend(rest.into_iter().map(|sink| {
496 Arc::new(move |materializer: &Materializer| {
497 let (sender, receiver) = std::sync::mpsc::sync_channel(0);
498 let mat = sink.run(
499 Box::new(std::iter::from_fn(move || {
500 loop {
501 match receiver.recv().ok()? {
502 CombinedSinkMessage::Item(item) => return Some(item),
503 CombinedSinkMessage::Flush(ack) => {
504 let _ = ack.send(());
505 }
506 CombinedSinkMessage::Close => return None,
507 }
508 }
509 })),
510 materializer,
511 )?;
512 Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
513 }) as Arc<CombinedSinkRunner<In>>
514 }));
515
516 Sink::from_runner(move |mut input: BoxStream<In>, materializer| {
517 let mut children = runners
520 .iter()
521 .map(|runner| runner(materializer))
522 .collect::<StreamResult<Vec<_>>>()?;
523 let mut next = 0usize;
524 for item in input.by_ref() {
525 match item {
526 Ok(value) => match strategy {
527 SinkCombineStrategy::Broadcast => {
528 children.retain(|(sender, _)| {
529 sender
530 .send(CombinedSinkMessage::Item(Ok(value.clone())))
531 .is_ok()
532 });
533 if children.is_empty() {
534 break;
535 }
536 }
537 SinkCombineStrategy::Balance => {
538 while !children.is_empty() {
539 let index = next % children.len();
540 next = next.wrapping_add(1);
541 match children[index]
542 .0
543 .send(CombinedSinkMessage::Item(Ok(value.clone())))
544 {
545 Ok(()) => break,
546 Err(_) => {
547 children.remove(index);
548 }
549 }
550 }
551 if children.is_empty() {
552 break;
553 }
554 }
555 },
556 Err(error) => {
557 for (sender, _) in &children {
558 let _ = sender.send(CombinedSinkMessage::Item(Err(error.clone())));
559 }
560 return Err(error);
561 }
562 }
563 }
564 for (sender, _) in &children {
568 let (ack_sender, ack_receiver) = std::sync::mpsc::sync_channel(0);
569 if sender.send(CombinedSinkMessage::Flush(ack_sender)).is_ok() {
570 let _ = ack_receiver.recv();
571 }
572 }
573 let mats: Vec<_> = children
576 .into_iter()
577 .map(|(sender, mat)| {
578 let _ = sender.send(CombinedSinkMessage::Close);
579 mat
580 })
581 .collect();
582 drop(mats);
583 Ok(NotUsed)
584 })
585 }
586}
587
588impl<In: Send + 'static, Mat: Send + 'static> Sink<In, StreamCompletion<Mat>> {
589 fn from_task_runner<F>(runner: F) -> Self
590 where
591 F: Fn(BoxStream<In>) -> StreamResult<Mat> + Send + Sync + 'static,
592 {
593 Self::from_task_runner_with_inline(runner, false)
594 }
595
596 fn from_raw_terminal_task_runner<F>(runner: F) -> Self
597 where
598 F: Fn(
599 BoxStream<In>,
600 Materializer,
601 Arc<AtomicBool>,
602 SourceRuntimeHints,
603 ) -> StreamResult<Mat>
604 + Send
605 + Sync
606 + 'static,
607 {
608 let runner = Arc::new(runner);
609 let async_runner = {
610 let runner = Arc::clone(&runner);
611 Arc::new(move |input, materializer: &Materializer| {
612 let runner = Arc::clone(&runner);
613 let worker_materializer =
614 materializer.with_name_prefix(materializer.name_prefix().to_owned());
615 Ok(materializer.spawn_stream(move |cancelled| {
616 runner(
617 input,
618 worker_materializer,
619 cancelled,
620 SourceRuntimeHints::default(),
621 )
622 }))
623 }) as Arc<SinkRunner<In, StreamCompletion<Mat>>>
624 };
625 let raw_hinted_runner = {
626 let runner = Arc::clone(&runner);
627 Arc::new(
628 move |input, materializer: &Materializer, hints: SourceRuntimeHints| {
629 let runner = Arc::clone(&runner);
630 let worker_materializer =
631 materializer.with_name_prefix(materializer.name_prefix().to_owned());
632 Ok(materializer.spawn_stream(move |cancelled| {
633 runner(input, worker_materializer, cancelled, hints)
634 }))
635 },
636 ) as Arc<HintedSinkRunner<In, StreamCompletion<Mat>>>
637 };
638 Sink {
639 runner: async_runner,
640 inline_runner: None,
641 hinted_runner: None,
642 raw_hinted_runner: Some(raw_hinted_runner),
643 attributes: Attributes::default(),
644 deferred_factory: None,
645 fold_fp: None,
646 }
647 }
648
649 fn from_task_runner_with_inline<F>(runner: F, inline: bool) -> Self
650 where
651 F: Fn(BoxStream<In>) -> StreamResult<Mat> + Send + Sync + 'static,
652 {
653 let runner = Arc::new(runner);
654 let async_runner = {
655 let runner = Arc::clone(&runner);
656 Arc::new(move |input, materializer: &Materializer| {
657 let runner = Arc::clone(&runner);
658 let state = Arc::clone(&materializer.inner.state);
659 Ok(materializer.spawn_stream(move |cancelled| {
660 runner(runtime_checked_stream(input, state, Some(cancelled)))
661 }))
662 }) as Arc<SinkRunner<In, StreamCompletion<Mat>>>
663 };
664 let inline_runner = inline.then(|| {
665 let runner = Arc::clone(&runner);
666 Arc::new(move |input, materializer: &Materializer| {
667 let runner = Arc::clone(&runner);
668 let state = Arc::clone(&materializer.inner.state);
669 Ok(materializer.spawn_stream_inline(move |cancelled| {
670 runner(runtime_checked_stream(input, state, Some(cancelled)))
671 }))
672 }) as Arc<SinkRunner<In, StreamCompletion<Mat>>>
673 });
674 Sink::from_runner_parts(async_runner, inline_runner)
675 }
676}
677
678impl<In: Send + 'static> Sink<In, StreamCompletion<Vec<In>>> {
679 #[must_use]
680 pub fn collect() -> Self {
681 let task_runner =
682 Sink::from_raw_terminal_task_runner(|input, materializer, cancelled, hints| {
683 run_collect_terminal(input, materializer, cancelled, hints)
684 });
685 let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::CollectDescriptor::<In> {
686 _phantom: std::marker::PhantomData,
687 });
688 Sink {
689 runner: task_runner.runner,
690 inline_runner: task_runner.inline_runner,
691 hinted_runner: task_runner.hinted_runner,
692 raw_hinted_runner: task_runner.raw_hinted_runner,
693 attributes: task_runner.attributes,
694 deferred_factory: task_runner.deferred_factory,
695 fold_fp: Some(fp),
696 }
697 }
698
699 #[must_use]
700 pub fn collection() -> Self {
701 Self::collect()
702 }
703
704 #[must_use]
705 pub fn take_last(n: usize) -> Self {
706 Sink::from_task_runner(move |input| {
707 if n == 0 {
708 for item in input {
709 let _ = item?;
710 }
711 return Ok(Vec::new());
712 }
713 let mut buffer = VecDeque::with_capacity(n);
714 for item in input {
715 let item = item?;
716 if buffer.len() == n {
717 buffer.pop_front();
718 }
719 buffer.push_back(item);
720 }
721 Ok(buffer.into_iter().collect())
722 })
723 }
724}
725
726impl<In: Send + 'static> Sink<In, StreamCompletion<NotUsed>> {
727 #[must_use]
728 pub fn ignore() -> Self {
729 let task_runner =
730 Sink::from_raw_terminal_task_runner(|input, materializer, cancelled, hints| {
731 run_ignore_terminal(input, materializer, cancelled, hints)
732 });
733 let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::IgnoreDescriptor::<In> {
734 _phantom: std::marker::PhantomData,
735 });
736 Sink {
737 runner: task_runner.runner,
738 inline_runner: task_runner.inline_runner,
739 hinted_runner: task_runner.hinted_runner,
740 raw_hinted_runner: task_runner.raw_hinted_runner,
741 attributes: task_runner.attributes,
742 deferred_factory: task_runner.deferred_factory,
743 fold_fp: Some(fp),
744 }
745 }
746
747 #[must_use]
748 pub fn on_complete<F>(callback: F) -> Self
749 where
750 F: FnOnce() + Send + Sync + 'static,
751 {
752 let callback = Arc::new(Mutex::new(Some(callback)));
753 Sink::from_task_runner(move |input| {
754 for item in input {
755 item?;
756 }
757 if let Some(cb) = callback.lock().expect("on_complete poisoned").take() {
758 cb();
759 }
760 Ok(NotUsed)
761 })
762 }
763
764 #[must_use]
765 pub fn never() -> Self {
766 Sink::from_runner(|input, materializer| {
767 let state = Arc::clone(&materializer.inner.state);
768 let shutdown_state = Arc::clone(&state);
769 Ok(materializer.spawn_stream(move |cancelled| {
770 let input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
771 for item in input {
772 item?;
773 }
774 loop {
775 if shutdown_state.shutdown.load(Ordering::SeqCst) {
776 return Err(StreamError::AbruptTermination);
777 }
778 if cancelled.load(Ordering::SeqCst) {
779 return Err(StreamError::Cancelled);
780 }
781 thread::sleep(Duration::from_millis(1));
782 }
783 }))
784 })
785 }
786
787 #[must_use]
788 pub fn foreach<F>(f: F) -> Self
789 where
790 F: Fn(In) + Send + Sync + 'static,
791 {
792 let f_arc = Arc::new(f);
793 let f_runner = Arc::clone(&f_arc);
794 let task_runner = Sink::from_task_runner(move |input| {
795 for item in input {
796 f_runner(item?);
797 }
798 Ok(NotUsed)
799 });
800 let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::ForeachDescriptor { f: f_arc });
801 Sink {
802 runner: task_runner.runner,
803 inline_runner: task_runner.inline_runner,
804 hinted_runner: task_runner.hinted_runner,
805 raw_hinted_runner: task_runner.raw_hinted_runner,
806 attributes: task_runner.attributes,
807 deferred_factory: task_runner.deferred_factory,
808 fold_fp: Some(fp),
809 }
810 }
811
812 #[must_use]
813 pub fn foreach_async<F, Fut>(parallelism: usize, f: F) -> Self
814 where
815 F: Fn(In) -> Fut + Send + Sync + 'static,
816 Fut: Future<Output = StreamResult<()>> + Send + 'static,
817 {
818 Flow::identity()
819 .map_async_unordered(parallelism, f)
820 .to_mat(Sink::ignore(), Keep::right)
821 }
822
823 #[must_use]
824 pub fn foreach_result<F>(f: F) -> Self
825 where
826 F: Fn(In) -> StreamResult<()> + Send + Sync + 'static,
827 {
828 Sink::from_task_runner(move |input| {
829 for item in input {
830 f(item?)?;
831 }
832 Ok(NotUsed)
833 })
834 }
835
836 #[must_use]
837 pub fn foreach_result_with_supervision<F>(f: F, decider: SupervisionDecider) -> Self
838 where
839 F: Fn(In) -> StreamResult<()> + Send + Sync + 'static,
840 {
841 Sink::from_task_runner(move |input| {
842 for item in input {
843 let item = item?;
844 match catch_unwind(AssertUnwindSafe(|| f(item)))
845 .unwrap_or_else(|_| Err(panic_stream_error("foreach_result callback")))
846 {
847 Ok(()) => {}
848 Err(error) => match decide_supervision(&decider, &error) {
849 SupervisionDirective::Stop => return Err(error),
850 SupervisionDirective::Resume | SupervisionDirective::Restart => {}
851 },
852 }
853 }
854 Ok(NotUsed)
855 })
856 }
857}
858
859impl<In: Send + 'static> Sink<In, StreamCompletion<In>> {
860 #[must_use]
877 pub fn head() -> Self {
878 Sink::from_task_runner_with_inline(
879 |mut input| input.next().unwrap_or(Err(StreamError::EmptyStream)),
880 true,
881 )
882 }
883
884 #[must_use]
885 pub fn last() -> Self {
886 Sink::from_task_runner(|input| {
887 let mut last = None;
888 for item in input {
889 last = Some(item?);
890 }
891 last.ok_or(StreamError::EmptyStream)
892 })
893 }
894
895 #[must_use]
896 pub fn reduce<F>(f: F) -> Self
897 where
898 F: Fn(In, In) -> In + Send + Sync + 'static,
899 {
900 Sink::from_task_runner(move |mut input| {
901 let mut acc = input.next().unwrap_or(Err(StreamError::EmptyStream))?;
902 for item in input {
903 acc = f(acc, item?);
904 }
905 Ok(acc)
906 })
907 }
908
909 #[must_use]
910 pub fn reduce_result<F>(f: F) -> Self
911 where
912 F: Fn(In, In) -> StreamResult<In> + Send + Sync + 'static,
913 {
914 Sink::from_task_runner(move |mut input| {
915 let mut acc = input.next().unwrap_or(Err(StreamError::EmptyStream))?;
916 for item in input {
917 acc = f(acc, item?)?;
918 }
919 Ok(acc)
920 })
921 }
922
923 #[must_use]
924 pub fn reduce_result_with_supervision<F>(f: F, decider: SupervisionDecider) -> Self
925 where
926 In: Clone,
927 F: Fn(In, In) -> StreamResult<In> + Send + Sync + 'static,
928 {
929 Sink::from_task_runner(move |mut input: BoxStream<In>| {
930 let mut acc = Some(input.next().unwrap_or(Err(StreamError::EmptyStream))?);
931 for item in input {
932 let item = item?;
933 let Some(previous) = acc.take() else {
934 acc = Some(item);
935 continue;
936 };
937 match catch_unwind(AssertUnwindSafe(|| f(previous.clone(), item)))
938 .unwrap_or_else(|_| Err(panic_stream_error("reduce_result callback")))
939 {
940 Ok(next) => acc = Some(next),
941 Err(error) => match decide_supervision(&decider, &error) {
942 SupervisionDirective::Stop => return Err(error),
943 SupervisionDirective::Resume => acc = Some(previous),
944 SupervisionDirective::Restart => acc = None,
945 },
946 }
947 }
948 acc.ok_or(StreamError::EmptyStream)
949 })
950 }
951}
952
953impl<In: Send + 'static> Sink<In, StreamCompletion<Option<In>>> {
954 #[must_use]
971 pub fn head_option() -> Self {
972 Sink::from_task_runner_with_inline(
973 |mut input| match input.next() {
974 Some(Ok(item)) => Ok(Some(item)),
975 Some(Err(error)) => Err(error),
976 None => Ok(None),
977 },
978 true,
979 )
980 }
981
982 #[must_use]
983 pub fn last_option() -> Self {
984 Sink::from_task_runner(|input| {
985 let mut last = None;
986 for item in input {
987 last = Some(item?);
988 }
989 Ok(last)
990 })
991 }
992}
993
994impl<In: Send + 'static> Sink<In, NotUsed> {
995 #[must_use]
996 pub fn cancelled() -> Self {
997 Sink::from_runner(|_input, _materializer| Ok(NotUsed))
998 }
999
1000 #[must_use]
1001 pub fn future_sink<InnerMat, F, Fut>(future: F) -> Sink<In, StreamCompletion<InnerMat>>
1002 where
1003 InnerMat: Send + 'static,
1004 F: Fn() -> Fut + Send + Sync + 'static,
1005 Fut: Future<Output = StreamResult<Sink<In, InnerMat>>> + Send + 'static,
1006 {
1007 Self::lazy_future_sink(future)
1008 }
1009
1010 #[must_use]
1011 pub fn lazy_sink<InnerMat, F>(create: F) -> Sink<In, StreamCompletion<InnerMat>>
1012 where
1013 InnerMat: Send + 'static,
1014 F: Fn() -> Sink<In, InnerMat> + Send + Sync + 'static,
1015 {
1016 let create = Arc::new(create);
1017 Sink::from_runner(move |input, materializer| {
1018 let create = Arc::clone(&create);
1019 let state = Arc::clone(&materializer.inner.state);
1020 let worker_materializer =
1021 materializer.with_name_prefix(materializer.name_prefix().to_owned());
1022 Ok(materializer.spawn_stream(move |cancelled| {
1023 let input = runtime_checked_stream(input, state, Some(cancelled));
1024 run_lazy_sink(input, &worker_materializer, move || {
1025 catch_unwind_failed("lazy_sink factory", || create())
1026 })
1027 }))
1028 })
1029 }
1030
1031 #[must_use]
1032 pub fn lazy_future_sink<InnerMat, F, Fut>(create: F) -> Sink<In, StreamCompletion<InnerMat>>
1033 where
1034 InnerMat: Send + 'static,
1035 F: Fn() -> Fut + Send + Sync + 'static,
1036 Fut: Future<Output = StreamResult<Sink<In, InnerMat>>> + Send + 'static,
1037 {
1038 let create = Arc::new(create);
1039 Sink::from_runner(move |input, materializer| {
1040 let create = Arc::clone(&create);
1041 let state = Arc::clone(&materializer.inner.state);
1042 let worker_materializer =
1043 materializer.with_name_prefix(materializer.name_prefix().to_owned());
1044 Ok(materializer.spawn_stream(move |cancelled| {
1045 let input = runtime_checked_stream(input, state, Some(cancelled));
1046 run_lazy_sink(input, &worker_materializer, move || {
1047 catch_unwind_failed("lazy_future_sink factory", || create())
1048 .and_then(flow::run_future_inline_or_spawn)
1049 })
1050 }))
1051 })
1052 }
1053
1054 #[must_use]
1055 pub fn fold<Acc, F>(zero: Acc, f: F) -> Sink<In, StreamCompletion<Acc>>
1056 where
1057 Acc: Clone + Send + Sync + 'static,
1058 F: Fn(Acc, In) -> Acc + Send + Sync + 'static,
1059 {
1060 let f_arc = Arc::new(f);
1061 let zero_clone = zero.clone();
1062 let f_arc2 = Arc::clone(&f_arc);
1063 let task_runner = {
1064 let zero = zero;
1065 Sink::from_raw_terminal_task_runner(move |input, materializer, cancelled, hints| {
1066 run_fold_terminal(
1067 input,
1068 materializer,
1069 cancelled,
1070 hints,
1071 zero.clone(),
1072 f_arc.as_ref(),
1073 )
1074 })
1075 };
1076 let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::FoldDescriptor {
1077 zero: zero_clone,
1078 f: f_arc2,
1079 });
1080 Sink {
1081 runner: task_runner.runner,
1082 inline_runner: task_runner.inline_runner,
1083 hinted_runner: task_runner.hinted_runner,
1084 raw_hinted_runner: task_runner.raw_hinted_runner,
1085 attributes: task_runner.attributes,
1086 deferred_factory: task_runner.deferred_factory,
1087 fold_fp: Some(fp),
1088 }
1089 }
1090
1091 #[must_use]
1092 pub fn fold_result<Acc, F>(zero: Acc, f: F) -> Sink<In, StreamCompletion<Acc>>
1093 where
1094 Acc: Clone + Send + Sync + 'static,
1095 F: Fn(Acc, In) -> StreamResult<Acc> + Send + Sync + 'static,
1096 {
1097 let f_arc = Arc::new(f);
1098 let zero_clone = zero.clone();
1099 let f_arc2 = Arc::clone(&f_arc);
1100 let task_runner = {
1101 let zero = zero;
1102 Sink::from_raw_terminal_task_runner(move |input, materializer, cancelled, hints| {
1103 run_fold_result_terminal(
1104 input,
1105 materializer,
1106 cancelled,
1107 hints,
1108 zero.clone(),
1109 f_arc.as_ref(),
1110 )
1111 })
1112 };
1113 let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::FoldResultDescriptor {
1114 zero: zero_clone,
1115 f: f_arc2,
1116 });
1117 Sink {
1118 runner: task_runner.runner,
1119 inline_runner: task_runner.inline_runner,
1120 hinted_runner: task_runner.hinted_runner,
1121 raw_hinted_runner: task_runner.raw_hinted_runner,
1122 attributes: task_runner.attributes,
1123 deferred_factory: task_runner.deferred_factory,
1124 fold_fp: Some(fp),
1125 }
1126 }
1127
1128 #[must_use]
1129 pub fn fold_result_with_supervision<Acc, F>(
1130 zero: Acc,
1131 f: F,
1132 decider: SupervisionDecider,
1133 ) -> Sink<In, StreamCompletion<Acc>>
1134 where
1135 Acc: Clone + Send + Sync + 'static,
1136 F: Fn(Acc, In) -> StreamResult<Acc> + Send + Sync + 'static,
1137 {
1138 Sink::from_task_runner(move |input| {
1139 let mut acc = zero.clone();
1140 for item in input {
1141 let item = item?;
1142 let previous = acc;
1143 match catch_unwind(AssertUnwindSafe(|| f(previous.clone(), item)))
1144 .unwrap_or_else(|_| Err(panic_stream_error("fold_result callback")))
1145 {
1146 Ok(next) => acc = next,
1147 Err(error) => match decide_supervision(&decider, &error) {
1148 SupervisionDirective::Stop => return Err(error),
1149 SupervisionDirective::Resume => acc = previous,
1150 SupervisionDirective::Restart => acc = zero.clone(),
1151 },
1152 }
1153 }
1154 Ok(acc)
1155 })
1156 }
1157}
1158
1159fn run_lazy_sink<In, InnerMat, F>(
1160 mut input: BoxStream<In>,
1161 materializer: &Materializer,
1162 create: F,
1163) -> StreamResult<InnerMat>
1164where
1165 In: Send + 'static,
1166 InnerMat: Send + 'static,
1167 F: FnOnce() -> StreamResult<Sink<In, InnerMat>>,
1168{
1169 let first = match input.next() {
1170 Some(Ok(item)) => item,
1171 Some(Err(error)) => return Err(error),
1172 None => {
1173 return Err(StreamError::Failed(
1174 "lazy sink was never materialized".into(),
1175 ));
1176 }
1177 };
1178 let sink = create()?;
1179 sink.run(prepend_first_stream(first, input), materializer)
1180}
1181
1182fn prepend_first_stream<In>(first: In, mut rest: BoxStream<In>) -> BoxStream<In>
1183where
1184 In: Send + 'static,
1185{
1186 let mut first = Some(first);
1187 Box::new(std::iter::from_fn(move || {
1188 if let Some(item) = first.take() {
1189 Some(Ok(item))
1190 } else {
1191 rest.next()
1192 }
1193 }))
1194}
1195
1196fn terminal_consumer_status(
1197 materializer: &Materializer,
1198 cancelled: &Arc<AtomicBool>,
1199) -> StreamResult<()> {
1200 if materializer.is_shutdown() {
1201 Err(StreamError::AbruptTermination)
1202 } else if cancelled.load(Ordering::SeqCst) {
1203 Err(StreamError::Cancelled)
1204 } else {
1205 Ok(())
1206 }
1207}
1208
1209fn run_collect_terminal<In: Send + 'static>(
1210 mut input: BoxStream<In>,
1211 materializer: Materializer,
1212 cancelled: Arc<AtomicBool>,
1213 hints: SourceRuntimeHints,
1214) -> StreamResult<Vec<In>> {
1215 if !hints.terminal_consumer_batch {
1216 let input = runtime_checked_stream(
1217 input,
1218 Arc::clone(&materializer.inner.state),
1219 Some(cancelled),
1220 );
1221 return input.collect();
1222 }
1223
1224 let mut items = Vec::with_capacity(hints.inline_micro_max_success_items.unwrap_or(0));
1225 loop {
1226 terminal_consumer_status(&materializer, &cancelled)?;
1227 {
1228 let _cancel_scope = set_current_stream_cancelled(&cancelled);
1229 for _ in 0..TERMINAL_CONSUMER_BATCH {
1230 match input.next() {
1231 Some(Ok(item)) => items.push(item),
1232 Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
1233 Ok(()) => return Err(error),
1234 Err(status) => return Err(status),
1235 },
1236 None => {
1237 return terminal_consumer_status(&materializer, &cancelled).map(|()| items);
1238 }
1239 }
1240 }
1241 }
1242 }
1243}
1244
1245fn run_ignore_terminal<In: Send + 'static>(
1246 input: BoxStream<In>,
1247 materializer: Materializer,
1248 cancelled: Arc<AtomicBool>,
1249 hints: SourceRuntimeHints,
1250) -> StreamResult<NotUsed> {
1251 if !hints.terminal_consumer_batch {
1252 let input = runtime_checked_stream(
1253 input,
1254 Arc::clone(&materializer.inner.state),
1255 Some(cancelled),
1256 );
1257 for item in input {
1258 item?;
1259 }
1260 return Ok(NotUsed);
1261 }
1262
1263 let mut input = input;
1264 loop {
1265 terminal_consumer_status(&materializer, &cancelled)?;
1266 {
1267 let _cancel_scope = set_current_stream_cancelled(&cancelled);
1268 for _ in 0..TERMINAL_CONSUMER_BATCH {
1269 match input.next() {
1270 Some(Ok(_)) => {}
1271 Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
1272 Ok(()) => return Err(error),
1273 Err(status) => return Err(status),
1274 },
1275 None => {
1276 return terminal_consumer_status(&materializer, &cancelled)
1277 .map(|()| NotUsed);
1278 }
1279 }
1280 }
1281 }
1282 }
1283}
1284
1285fn run_fold_terminal<In, Acc, F>(
1286 input: BoxStream<In>,
1287 materializer: Materializer,
1288 cancelled: Arc<AtomicBool>,
1289 hints: SourceRuntimeHints,
1290 zero: Acc,
1291 f: &F,
1292) -> StreamResult<Acc>
1293where
1294 In: Send + 'static,
1295 Acc: Send + 'static,
1296 F: Fn(Acc, In) -> Acc,
1297{
1298 if !hints.terminal_consumer_batch {
1299 let input = runtime_checked_stream(
1300 input,
1301 Arc::clone(&materializer.inner.state),
1302 Some(cancelled),
1303 );
1304 let mut acc = zero;
1305 for item in input {
1306 acc = f(acc, item?);
1307 }
1308 return Ok(acc);
1309 }
1310
1311 let mut input = input;
1312 let mut acc = zero;
1313 loop {
1314 terminal_consumer_status(&materializer, &cancelled)?;
1315 {
1316 let _cancel_scope = set_current_stream_cancelled(&cancelled);
1317 for _ in 0..TERMINAL_CONSUMER_BATCH {
1318 match input.next() {
1319 Some(Ok(item)) => acc = f(acc, item),
1320 Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
1321 Ok(()) => return Err(error),
1322 Err(status) => return Err(status),
1323 },
1324 None => {
1325 return terminal_consumer_status(&materializer, &cancelled).map(|()| acc);
1326 }
1327 }
1328 }
1329 }
1330 }
1331}
1332
1333fn run_fold_result_terminal<In, Acc, F>(
1334 input: BoxStream<In>,
1335 materializer: Materializer,
1336 cancelled: Arc<AtomicBool>,
1337 hints: SourceRuntimeHints,
1338 zero: Acc,
1339 f: &F,
1340) -> StreamResult<Acc>
1341where
1342 In: Send + 'static,
1343 Acc: Send + 'static,
1344 F: Fn(Acc, In) -> StreamResult<Acc>,
1345{
1346 if !hints.terminal_consumer_batch {
1347 let input = runtime_checked_stream(
1348 input,
1349 Arc::clone(&materializer.inner.state),
1350 Some(cancelled),
1351 );
1352 let mut acc = zero;
1353 for item in input {
1354 acc = f(acc, item?)?;
1355 }
1356 return Ok(acc);
1357 }
1358
1359 let mut input = input;
1360 let mut acc = Some(zero);
1361 loop {
1362 terminal_consumer_status(&materializer, &cancelled)?;
1363 {
1364 let _cancel_scope = set_current_stream_cancelled(&cancelled);
1365 for _ in 0..TERMINAL_CONSUMER_BATCH {
1366 match input.next() {
1367 Some(Ok(item)) => {
1368 let previous = acc.take().expect("fold accumulator present");
1369 match f(previous, item) {
1370 Ok(next) => acc = Some(next),
1371 Err(error) => {
1372 return match terminal_consumer_status(&materializer, &cancelled) {
1373 Ok(()) => Err(error),
1374 Err(status) => Err(status),
1375 };
1376 }
1377 }
1378 }
1379 Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
1380 Ok(()) => return Err(error),
1381 Err(status) => return Err(status),
1382 },
1383 None => {
1384 return terminal_consumer_status(&materializer, &cancelled)
1385 .map(|()| acc.expect("fold accumulator present"));
1386 }
1387 }
1388 }
1389 }
1390 }
1391}
1392
1393#[cfg(test)]
1394mod tests {
1395 use super::*;
1396 use crate::{Source, StreamCompletion};
1397 use std::time::Instant;
1398
1399 fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
1400 let deadline = Instant::now() + timeout;
1401 while Instant::now() < deadline {
1402 if condition() {
1403 return true;
1404 }
1405 thread::sleep(Duration::from_millis(5));
1406 }
1407 condition()
1408 }
1409
1410 #[test]
1411 fn map_materialized_value_on_deferred_sink_does_not_explode() {
1412 let sink = Sink::<u64, _>::setup(|_, _| Sink::fold(0u64, |acc, x| acc + x));
1415 let sink = sink
1419 .map_materialized_value(|sc: StreamCompletion<u64>| sc)
1420 .map_materialized_value(|sc| sc);
1421
1422 let sum = Source::from_iter(1u64..=3)
1423 .run_with(sink)
1424 .unwrap()
1425 .wait()
1426 .unwrap();
1427 assert_eq!(sum, 6u64);
1428 }
1429
1430 #[test]
1431 fn batched_terminal_fold_observes_completion_drop_cancellation() {
1432 let materializer = Materializer::new();
1433 let completion = Source::repeat(1_u64)
1434 .run_with_materializer(Sink::fold(0_u64, |acc, item| acc + item), &materializer)
1435 .expect("fold terminal materializes");
1436
1437 assert!(wait_until(Duration::from_secs(1), || {
1438 materializer.active_streams() == 1
1439 }));
1440 drop(completion);
1441
1442 assert!(wait_until(Duration::from_secs(5), || {
1443 materializer.active_streams() == 0
1444 }));
1445 }
1446}