1use super::flow;
2use super::*;
3use crate::Attributes;
4use crate::stream::error::{decide_supervision, panic_stream_error};
5
6type CombinedSinkChild<In> = (
11 std::sync::mpsc::SyncSender<CombinedSinkMessage<In>>,
12 Box<dyn std::any::Any + Send>,
13);
14
15type CombinedSinkRunner<In> =
16 dyn Fn(&Materializer) -> StreamResult<CombinedSinkChild<In>> + Send + Sync;
17type DeferredSinkFactory<In, Mat> =
18 dyn Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync;
19
20enum CombinedSinkMessage<In> {
21 Item(StreamResult<In>),
22 Flush(std::sync::mpsc::SyncSender<()>),
23 Close,
24}
25
26const TERMINAL_CONSUMER_BATCH: usize = 64;
27
28pub struct Sink<In, Mat> {
29 runner: Arc<SinkRunner<In, Mat>>,
30 inline_runner: Option<Arc<SinkRunner<In, Mat>>>,
31 hinted_runner: Option<Arc<HintedSinkRunner<In, Mat>>>,
32 raw_hinted_runner: Option<Arc<HintedSinkRunner<In, Mat>>>,
33 attributes: Attributes,
34 deferred_factory: Option<Arc<DeferredSinkFactory<In, Mat>>>,
35 pub(crate) fold_fp: Option<Arc<dyn FoldFastPathDyn<In>>>,
37}
38
39fn map_mat_dyn<In, Mat, NextMat>(
43 sink: Sink<In, Mat>,
44 f: Arc<dyn Fn(Mat) -> NextMat + Send + Sync + 'static>,
45) -> Sink<In, NextMat>
46where
47 In: Send + 'static,
48 Mat: Send + 'static,
49 NextMat: Send + 'static,
50{
51 let Sink {
52 runner,
53 inline_runner,
54 hinted_runner,
55 raw_hinted_runner,
56 attributes,
57 deferred_factory,
58 fold_fp: _,
59 } = sink;
60 let mapped_runner = {
61 let f = Arc::clone(&f);
62 Arc::new(move |input, materializer: &Materializer| {
63 let mat = runner(input, materializer)?;
64 Ok(f(mat))
65 }) as Arc<SinkRunner<In, NextMat>>
66 };
67 let mapped_inline_runner = inline_runner.map(|ir| {
68 let f = Arc::clone(&f);
69 Arc::new(move |input, materializer: &Materializer| {
70 let result = ir(input, materializer)?;
71 Ok(f(result))
72 }) as Arc<SinkRunner<In, NextMat>>
73 });
74 let mapped_hinted_runner = hinted_runner.map(|hr| {
75 let f = Arc::clone(&f);
76 Arc::new(
77 move |input, materializer: &Materializer, hints: SourceRuntimeHints| {
78 let result = hr(input, materializer, hints)?;
79 Ok(f(result))
80 },
81 ) as Arc<HintedSinkRunner<In, NextMat>>
82 });
83 let mapped_raw_hinted_runner = raw_hinted_runner.map(|hr| {
84 let f = Arc::clone(&f);
85 Arc::new(
86 move |input, materializer: &Materializer, hints: SourceRuntimeHints| {
87 let result = hr(input, materializer, hints)?;
88 Ok(f(result))
89 },
90 ) as Arc<HintedSinkRunner<In, NextMat>>
91 });
92 let mapped_factory = deferred_factory.map(|factory| {
93 let f = Arc::clone(&f);
94 Arc::new(move |materializer: &Materializer, attrs: &Attributes| {
95 map_mat_dyn(factory(materializer, attrs), Arc::clone(&f))
96 }) as Arc<DeferredSinkFactory<In, NextMat>>
97 });
98 Sink {
99 runner: mapped_runner,
100 inline_runner: mapped_inline_runner,
101 hinted_runner: mapped_hinted_runner,
102 raw_hinted_runner: mapped_raw_hinted_runner,
103 attributes,
104 deferred_factory: mapped_factory,
105 fold_fp: None,
106 }
107}
108
109impl<In, Mat> Clone for Sink<In, Mat> {
110 fn clone(&self) -> Self {
111 Self {
112 runner: Arc::clone(&self.runner),
113 inline_runner: self.inline_runner.as_ref().map(Arc::clone),
114 hinted_runner: self.hinted_runner.as_ref().map(Arc::clone),
115 raw_hinted_runner: self.raw_hinted_runner.as_ref().map(Arc::clone),
116 attributes: self.attributes.clone(),
117 deferred_factory: self.deferred_factory.as_ref().map(Arc::clone),
118 fold_fp: self.fold_fp.as_ref().map(Arc::clone),
119 }
120 }
121}
122
123impl<In: Send + 'static, Mat: Send + 'static> Sink<In, Mat> {
124 pub(crate) fn from_runner<F>(runner: F) -> Self
125 where
126 F: Fn(BoxStream<In>, &Materializer) -> StreamResult<Mat> + Send + Sync + 'static,
127 {
128 Self::from_runner_parts(Arc::new(runner), None)
129 }
130
131 pub(crate) fn from_runner_parts(
132 runner: Arc<SinkRunner<In, Mat>>,
133 inline_runner: Option<Arc<SinkRunner<In, Mat>>>,
134 ) -> Self {
135 Self {
136 runner,
137 inline_runner,
138 hinted_runner: None,
139 raw_hinted_runner: None,
140 attributes: Attributes::default(),
141 deferred_factory: None,
142 fold_fp: None,
143 }
144 }
145
146 pub(crate) fn from_hinted_runner<F>(runner: F) -> Self
147 where
148 F: Fn(BoxStream<In>, &Materializer, SourceRuntimeHints) -> StreamResult<Mat>
149 + Send
150 + Sync
151 + 'static,
152 {
153 let hinted_runner: Arc<HintedSinkRunner<In, Mat>> = Arc::new(runner);
154 let fallback = {
155 let hinted_runner = Arc::clone(&hinted_runner);
156 Arc::new(move |input, materializer: &Materializer| {
157 hinted_runner(input, materializer, SourceRuntimeHints::default())
158 }) as Arc<SinkRunner<In, Mat>>
159 };
160 Self {
161 runner: fallback,
162 inline_runner: None,
163 hinted_runner: Some(hinted_runner),
164 raw_hinted_runner: None,
165 attributes: Attributes::default(),
166 deferred_factory: None,
167 fold_fp: None,
168 }
169 }
170
171 pub(crate) fn from_raw_hinted_runner<F>(runner: F) -> Self
172 where
173 F: Fn(BoxStream<In>, &Materializer, SourceRuntimeHints) -> StreamResult<Mat>
174 + Send
175 + Sync
176 + 'static,
177 {
178 let raw_hinted_runner: Arc<HintedSinkRunner<In, Mat>> = Arc::new(runner);
179 let fallback = {
180 let raw_hinted_runner = Arc::clone(&raw_hinted_runner);
181 Arc::new(move |input, materializer: &Materializer| {
182 let input =
183 runtime_checked_stream(input, Arc::clone(&materializer.inner.state), None);
184 raw_hinted_runner(input, materializer, SourceRuntimeHints::default())
185 }) as Arc<SinkRunner<In, Mat>>
186 };
187 Self {
188 runner: fallback,
189 inline_runner: None,
190 hinted_runner: None,
191 raw_hinted_runner: Some(raw_hinted_runner),
192 attributes: Attributes::default(),
193 deferred_factory: None,
194 fold_fp: None,
195 }
196 }
197
198 pub(super) fn run(
199 &self,
200 input: BoxStream<In>,
201 materializer: &Materializer,
202 ) -> StreamResult<Mat> {
203 self.run_with_source_hints(input, materializer, SourceRuntimeHints::default())
204 }
205
206 pub(super) fn run_with_source_hints(
207 &self,
208 input: BoxStream<In>,
209 materializer: &Materializer,
210 hints: SourceRuntimeHints,
211 ) -> StreamResult<Mat> {
212 if let Some(factory) = &self.deferred_factory {
213 let attrs = materializer.effective_attributes(&self.attributes);
214 return factory(materializer, &attrs).run_with_source_hints(input, materializer, hints);
215 }
216 if let Some(hinted_runner) = &self.hinted_runner {
217 return hinted_runner(input, materializer, hints);
218 }
219 (self.runner)(input, materializer)
220 }
221
222 pub(super) fn run_from_source(
223 &self,
224 input: BoxStream<In>,
225 materializer: &Materializer,
226 hints: SourceRuntimeHints,
227 ) -> StreamResult<Mat> {
228 if let Some(factory) = &self.deferred_factory {
229 let attrs = materializer.effective_attributes(&self.attributes);
230 return factory(materializer, &attrs).run_from_source(input, materializer, hints);
231 }
232 if let Some(raw_hinted_runner) = &self.raw_hinted_runner {
233 return raw_hinted_runner(input, materializer, hints);
234 }
235 let input = runtime_checked_stream(input, Arc::clone(&materializer.inner.state), None);
236 self.run_with_source_hints(input, materializer, hints)
237 }
238
239 pub(super) fn can_inline(&self) -> bool {
240 self.inline_runner.is_some()
241 }
242
243 pub(super) fn run_inline(
244 &self,
245 input: BoxStream<In>,
246 materializer: &Materializer,
247 ) -> StreamResult<Mat> {
248 if let Some(factory) = &self.deferred_factory {
249 let attrs = materializer.effective_attributes(&self.attributes);
250 return factory(materializer, &attrs).run_inline(input, materializer);
251 }
252 (self
253 .inline_runner
254 .as_ref()
255 .expect("inline sink runner exists"))(input, materializer)
256 }
257
258 pub fn run_with<SourceMat: Send + 'static>(
259 self,
260 source: Source<In, SourceMat>,
261 ) -> StreamResult<SourceMat> {
262 source.to(self).run()
263 }
264
265 pub fn run_with_materializer<SourceMat: Send + 'static>(
266 self,
267 source: Source<In, SourceMat>,
268 materializer: &Materializer,
269 ) -> StreamResult<SourceMat> {
270 source.to(self).run_with_materializer(materializer)
271 }
272
273 #[must_use]
274 pub fn from_materializer<F>(factory: F) -> Self
275 where
276 F: Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync + 'static,
277 {
278 let factory = Arc::new(factory);
279 Self {
280 runner: Arc::new(|_input, _materializer| {
281 Err(StreamError::Failed(
282 "deferred sink factory must be driven through Sink::run".into(),
283 ))
284 }),
285 inline_runner: None,
286 hinted_runner: None,
287 raw_hinted_runner: None,
288 attributes: Attributes::default(),
289 deferred_factory: Some(factory),
290 fold_fp: None,
291 }
292 }
293
294 #[must_use]
295 pub fn setup<F>(factory: F) -> Self
296 where
297 F: Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync + 'static,
298 {
299 Self::from_materializer(factory)
300 }
301
302 pub fn pre_materialize(
303 &self,
304 materializer: &Materializer,
305 ) -> StreamResult<(Mat, Sink<In, NotUsed>)> {
306 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
307 let materialized = self.clone().run(
308 Box::new(std::iter::from_fn(move || receiver.recv().ok())),
309 materializer,
310 )?;
311 let sender = Arc::new(Mutex::new(Some(sender)));
312 let sink = Sink::from_runner(move |input, _materializer| {
313 let Some(sender) = sender
314 .lock()
315 .expect("pre-materialized sink poisoned")
316 .take()
317 else {
318 return Err(StreamError::Failed(
319 "pre-materialized sink has already been materialized".into(),
320 ));
321 };
322 for item in input {
323 if sender.send(item).is_err() {
324 break;
325 }
326 }
327 Ok(NotUsed)
328 });
329 Ok((materialized, sink.with_attributes(self.attributes.clone())))
330 }
331
332 #[must_use]
333 pub fn map_materialized_value<NextMat, F>(self, f: F) -> Sink<In, NextMat>
334 where
335 NextMat: Send + 'static,
336 F: Fn(Mat) -> NextMat + Send + Sync + 'static,
337 {
338 map_mat_dyn(self, Arc::new(f))
344 }
345
346 #[must_use]
347 pub fn attributes(&self) -> &Attributes {
348 &self.attributes
349 }
350
351 #[must_use]
352 pub fn with_attributes(mut self, attributes: Attributes) -> Self {
353 self.attributes = attributes;
354 self
355 }
356
357 #[must_use]
358 pub fn add_attributes(mut self, attributes: Attributes) -> Self {
359 self.attributes = self.attributes.and(attributes);
360 self
361 }
362
363 #[must_use]
364 pub fn named(self, name: impl Into<String>) -> Self {
365 self.add_attributes(Attributes::named(name))
366 }
367}
368
369#[derive(Clone)]
370pub struct RunnableGraph<Mat> {
371 pub(super) runner: Arc<RunnableGraphRunner<Mat>>,
372 attributes: Attributes,
373}
374
375#[derive(Debug, Clone, Copy, PartialEq, Eq)]
376pub enum SinkCombineStrategy {
377 Broadcast,
378 Balance,
379}
380
381impl<Mat: Send + 'static> RunnableGraph<Mat> {
382 pub(super) fn from_runner<F>(runner: F) -> Self
383 where
384 F: Fn(&Materializer) -> StreamResult<Mat> + Send + Sync + 'static,
385 {
386 Self {
387 runner: Arc::new(runner),
388 attributes: Attributes::default(),
389 }
390 }
391
392 pub fn run(&self) -> StreamResult<Mat> {
393 Materializer::new().materialize(self)
394 }
395
396 pub fn run_with_materializer(&self, materializer: &Materializer) -> StreamResult<Mat> {
397 materializer.materialize(self)
398 }
399
400 #[must_use]
401 pub fn map_materialized_value<Next, F>(self, f: F) -> RunnableGraph<Next>
402 where
403 Next: Send + 'static,
404 F: Fn(Mat) -> Next + Send + Sync + 'static,
405 {
406 let f = Arc::new(f);
407 RunnableGraph::from_runner(move |materializer| {
408 let mat = (self.runner)(materializer)?;
409 Ok(f(mat))
410 })
411 }
412
413 #[must_use]
414 pub fn attributes(&self) -> &Attributes {
415 &self.attributes
416 }
417
418 #[must_use]
419 pub fn with_attributes(mut self, attributes: Attributes) -> Self {
420 self.attributes = attributes;
421 self
422 }
423
424 #[must_use]
425 pub fn add_attributes(mut self, attributes: Attributes) -> Self {
426 self.attributes = self.attributes.and(attributes);
427 self
428 }
429
430 #[must_use]
431 pub fn named(self, name: impl Into<String>) -> Self {
432 self.add_attributes(Attributes::named(name))
433 }
434}
435
436impl<In: Clone + Send + 'static> Sink<In, NotUsed> {
437 #[must_use]
438 pub fn combine<M1, M2, MRest, I>(
439 first: Sink<In, M1>,
440 second: Sink<In, M2>,
441 rest: I,
442 strategy: SinkCombineStrategy,
443 ) -> Sink<In, NotUsed>
444 where
445 M1: Send + 'static,
446 M2: Send + 'static,
447 MRest: Send + 'static,
448 I: IntoIterator<Item = Sink<In, MRest>>,
449 {
450 let mut runners: Vec<Arc<CombinedSinkRunner<In>>> = vec![
451 Arc::new(move |materializer| {
452 let (sender, receiver) = std::sync::mpsc::sync_channel(0);
453 let mat = first.run(
454 Box::new(std::iter::from_fn(move || {
455 loop {
456 match receiver.recv().ok()? {
457 CombinedSinkMessage::Item(item) => return Some(item),
458 CombinedSinkMessage::Flush(ack) => {
459 let _ = ack.send(());
460 }
461 CombinedSinkMessage::Close => return None,
462 }
463 }
464 })),
465 materializer,
466 )?;
467 Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
468 }),
469 Arc::new(move |materializer| {
470 let (sender, receiver) = std::sync::mpsc::sync_channel(0);
471 let mat = second.run(
472 Box::new(std::iter::from_fn(move || {
473 loop {
474 match receiver.recv().ok()? {
475 CombinedSinkMessage::Item(item) => return Some(item),
476 CombinedSinkMessage::Flush(ack) => {
477 let _ = ack.send(());
478 }
479 CombinedSinkMessage::Close => return None,
480 }
481 }
482 })),
483 materializer,
484 )?;
485 Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
486 }),
487 ];
488 runners.extend(rest.into_iter().map(|sink| {
489 Arc::new(move |materializer: &Materializer| {
490 let (sender, receiver) = std::sync::mpsc::sync_channel(0);
491 let mat = sink.run(
492 Box::new(std::iter::from_fn(move || {
493 loop {
494 match receiver.recv().ok()? {
495 CombinedSinkMessage::Item(item) => return Some(item),
496 CombinedSinkMessage::Flush(ack) => {
497 let _ = ack.send(());
498 }
499 CombinedSinkMessage::Close => return None,
500 }
501 }
502 })),
503 materializer,
504 )?;
505 Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
506 }) as Arc<CombinedSinkRunner<In>>
507 }));
508
509 Sink::from_runner(move |mut input: BoxStream<In>, materializer| {
510 let mut children = runners
513 .iter()
514 .map(|runner| runner(materializer))
515 .collect::<StreamResult<Vec<_>>>()?;
516 let mut next = 0usize;
517 for item in input.by_ref() {
518 match item {
519 Ok(value) => match strategy {
520 SinkCombineStrategy::Broadcast => {
521 children.retain(|(sender, _)| {
522 sender
523 .send(CombinedSinkMessage::Item(Ok(value.clone())))
524 .is_ok()
525 });
526 if children.is_empty() {
527 break;
528 }
529 }
530 SinkCombineStrategy::Balance => {
531 while !children.is_empty() {
532 let index = next % children.len();
533 next = next.wrapping_add(1);
534 match children[index]
535 .0
536 .send(CombinedSinkMessage::Item(Ok(value.clone())))
537 {
538 Ok(()) => break,
539 Err(_) => {
540 children.remove(index);
541 }
542 }
543 }
544 if children.is_empty() {
545 break;
546 }
547 }
548 },
549 Err(error) => {
550 for (sender, _) in &children {
551 let _ = sender.send(CombinedSinkMessage::Item(Err(error.clone())));
552 }
553 return Err(error);
554 }
555 }
556 }
557 for (sender, _) in &children {
561 let (ack_sender, ack_receiver) = std::sync::mpsc::sync_channel(0);
562 if sender.send(CombinedSinkMessage::Flush(ack_sender)).is_ok() {
563 let _ = ack_receiver.recv();
564 }
565 }
566 let mats: Vec<_> = children
569 .into_iter()
570 .map(|(sender, mat)| {
571 let _ = sender.send(CombinedSinkMessage::Close);
572 mat
573 })
574 .collect();
575 drop(mats);
576 Ok(NotUsed)
577 })
578 }
579}
580
581impl<In: Send + 'static, Mat: Send + 'static> Sink<In, StreamCompletion<Mat>> {
582 fn from_task_runner<F>(runner: F) -> Self
583 where
584 F: Fn(BoxStream<In>) -> StreamResult<Mat> + Send + Sync + 'static,
585 {
586 Self::from_task_runner_with_inline(runner, false)
587 }
588
589 fn from_raw_terminal_task_runner<F>(runner: F) -> Self
590 where
591 F: Fn(
592 BoxStream<In>,
593 Materializer,
594 Arc<AtomicBool>,
595 SourceRuntimeHints,
596 ) -> StreamResult<Mat>
597 + Send
598 + Sync
599 + 'static,
600 {
601 let runner = Arc::new(runner);
602 let async_runner = {
603 let runner = Arc::clone(&runner);
604 Arc::new(move |input, materializer: &Materializer| {
605 let runner = Arc::clone(&runner);
606 let worker_materializer =
607 materializer.with_name_prefix(materializer.name_prefix().to_owned());
608 Ok(materializer.spawn_stream(move |cancelled| {
609 runner(
610 input,
611 worker_materializer,
612 cancelled,
613 SourceRuntimeHints::default(),
614 )
615 }))
616 }) as Arc<SinkRunner<In, StreamCompletion<Mat>>>
617 };
618 let raw_hinted_runner = {
619 let runner = Arc::clone(&runner);
620 Arc::new(
621 move |input, materializer: &Materializer, hints: SourceRuntimeHints| {
622 let runner = Arc::clone(&runner);
623 let worker_materializer =
624 materializer.with_name_prefix(materializer.name_prefix().to_owned());
625 Ok(materializer.spawn_stream(move |cancelled| {
626 runner(input, worker_materializer, cancelled, hints)
627 }))
628 },
629 ) as Arc<HintedSinkRunner<In, StreamCompletion<Mat>>>
630 };
631 Sink {
632 runner: async_runner,
633 inline_runner: None,
634 hinted_runner: None,
635 raw_hinted_runner: Some(raw_hinted_runner),
636 attributes: Attributes::default(),
637 deferred_factory: None,
638 fold_fp: None,
639 }
640 }
641
642 fn from_task_runner_with_inline<F>(runner: F, inline: bool) -> Self
643 where
644 F: Fn(BoxStream<In>) -> StreamResult<Mat> + Send + Sync + 'static,
645 {
646 let runner = Arc::new(runner);
647 let async_runner = {
648 let runner = Arc::clone(&runner);
649 Arc::new(move |input, materializer: &Materializer| {
650 let runner = Arc::clone(&runner);
651 let state = Arc::clone(&materializer.inner.state);
652 Ok(materializer.spawn_stream(move |cancelled| {
653 runner(runtime_checked_stream(input, state, Some(cancelled)))
654 }))
655 }) as Arc<SinkRunner<In, StreamCompletion<Mat>>>
656 };
657 let inline_runner = inline.then(|| {
658 let runner = Arc::clone(&runner);
659 Arc::new(move |input, materializer: &Materializer| {
660 let runner = Arc::clone(&runner);
661 let state = Arc::clone(&materializer.inner.state);
662 Ok(materializer.spawn_stream_inline(move |cancelled| {
663 runner(runtime_checked_stream(input, state, Some(cancelled)))
664 }))
665 }) as Arc<SinkRunner<In, StreamCompletion<Mat>>>
666 });
667 Sink::from_runner_parts(async_runner, inline_runner)
668 }
669}
670
671impl<In: Send + 'static> Sink<In, StreamCompletion<Vec<In>>> {
672 #[must_use]
673 pub fn collect() -> Self {
674 let task_runner =
675 Sink::from_raw_terminal_task_runner(|input, materializer, cancelled, hints| {
676 run_collect_terminal(input, materializer, cancelled, hints)
677 });
678 let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::CollectDescriptor::<In> {
679 _phantom: std::marker::PhantomData,
680 });
681 Sink {
682 runner: task_runner.runner,
683 inline_runner: task_runner.inline_runner,
684 hinted_runner: task_runner.hinted_runner,
685 raw_hinted_runner: task_runner.raw_hinted_runner,
686 attributes: task_runner.attributes,
687 deferred_factory: task_runner.deferred_factory,
688 fold_fp: Some(fp),
689 }
690 }
691
692 #[must_use]
693 pub fn collection() -> Self {
694 Self::collect()
695 }
696
697 #[must_use]
698 pub fn take_last(n: usize) -> Self {
699 Sink::from_task_runner(move |input| {
700 if n == 0 {
701 for item in input {
702 let _ = item?;
703 }
704 return Ok(Vec::new());
705 }
706 let mut buffer = VecDeque::with_capacity(n);
707 for item in input {
708 let item = item?;
709 if buffer.len() == n {
710 buffer.pop_front();
711 }
712 buffer.push_back(item);
713 }
714 Ok(buffer.into_iter().collect())
715 })
716 }
717}
718
719impl<In: Send + 'static> Sink<In, StreamCompletion<NotUsed>> {
720 #[must_use]
721 pub fn ignore() -> Self {
722 let task_runner =
723 Sink::from_raw_terminal_task_runner(|input, materializer, cancelled, hints| {
724 run_ignore_terminal(input, materializer, cancelled, hints)
725 });
726 let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::IgnoreDescriptor::<In> {
727 _phantom: std::marker::PhantomData,
728 });
729 Sink {
730 runner: task_runner.runner,
731 inline_runner: task_runner.inline_runner,
732 hinted_runner: task_runner.hinted_runner,
733 raw_hinted_runner: task_runner.raw_hinted_runner,
734 attributes: task_runner.attributes,
735 deferred_factory: task_runner.deferred_factory,
736 fold_fp: Some(fp),
737 }
738 }
739
740 #[must_use]
741 pub fn on_complete<F>(callback: F) -> Self
742 where
743 F: FnOnce() + Send + Sync + 'static,
744 {
745 let callback = Arc::new(Mutex::new(Some(callback)));
746 Sink::from_task_runner(move |input| {
747 for item in input {
748 item?;
749 }
750 if let Some(cb) = callback.lock().expect("on_complete poisoned").take() {
751 cb();
752 }
753 Ok(NotUsed)
754 })
755 }
756
757 #[must_use]
758 pub fn never() -> Self {
759 Sink::from_runner(|input, materializer| {
760 let state = Arc::clone(&materializer.inner.state);
761 let shutdown_state = Arc::clone(&state);
762 Ok(materializer.spawn_stream(move |cancelled| {
763 let input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
764 for item in input {
765 item?;
766 }
767 loop {
768 if shutdown_state.shutdown.load(Ordering::SeqCst) {
769 return Err(StreamError::AbruptTermination);
770 }
771 if cancelled.load(Ordering::SeqCst) {
772 return Err(StreamError::Cancelled);
773 }
774 thread::sleep(Duration::from_millis(1));
775 }
776 }))
777 })
778 }
779
780 #[must_use]
781 pub fn foreach<F>(f: F) -> Self
782 where
783 F: Fn(In) + Send + Sync + 'static,
784 {
785 Sink::from_task_runner(move |input| {
786 for item in input {
787 f(item?);
788 }
789 Ok(NotUsed)
790 })
791 }
792
793 #[must_use]
794 pub fn foreach_async<F, Fut>(parallelism: usize, f: F) -> Self
795 where
796 F: Fn(In) -> Fut + Send + Sync + 'static,
797 Fut: Future<Output = StreamResult<()>> + Send + 'static,
798 {
799 Flow::identity()
800 .map_async_unordered(parallelism, f)
801 .to_mat(Sink::ignore(), Keep::right)
802 }
803
804 #[must_use]
805 pub fn foreach_result<F>(f: F) -> Self
806 where
807 F: Fn(In) -> StreamResult<()> + Send + Sync + 'static,
808 {
809 Sink::from_task_runner(move |input| {
810 for item in input {
811 f(item?)?;
812 }
813 Ok(NotUsed)
814 })
815 }
816
817 #[must_use]
818 pub fn foreach_result_with_supervision<F>(f: F, decider: SupervisionDecider) -> Self
819 where
820 F: Fn(In) -> StreamResult<()> + Send + Sync + 'static,
821 {
822 Sink::from_task_runner(move |input| {
823 for item in input {
824 let item = item?;
825 match catch_unwind(AssertUnwindSafe(|| f(item)))
826 .unwrap_or_else(|_| Err(panic_stream_error("foreach_result callback")))
827 {
828 Ok(()) => {}
829 Err(error) => match decide_supervision(&decider, &error) {
830 SupervisionDirective::Stop => return Err(error),
831 SupervisionDirective::Resume | SupervisionDirective::Restart => {}
832 },
833 }
834 }
835 Ok(NotUsed)
836 })
837 }
838}
839
840impl<In: Send + 'static> Sink<In, StreamCompletion<In>> {
841 #[must_use]
858 pub fn head() -> Self {
859 Sink::from_task_runner_with_inline(
860 |mut input| input.next().unwrap_or(Err(StreamError::EmptyStream)),
861 true,
862 )
863 }
864
865 #[must_use]
866 pub fn last() -> Self {
867 Sink::from_task_runner(|input| {
868 let mut last = None;
869 for item in input {
870 last = Some(item?);
871 }
872 last.ok_or(StreamError::EmptyStream)
873 })
874 }
875
876 #[must_use]
877 pub fn reduce<F>(f: F) -> Self
878 where
879 F: Fn(In, In) -> In + Send + Sync + 'static,
880 {
881 Sink::from_task_runner(move |mut input| {
882 let mut acc = input.next().unwrap_or(Err(StreamError::EmptyStream))?;
883 for item in input {
884 acc = f(acc, item?);
885 }
886 Ok(acc)
887 })
888 }
889
890 #[must_use]
891 pub fn reduce_result<F>(f: F) -> Self
892 where
893 F: Fn(In, In) -> StreamResult<In> + Send + Sync + 'static,
894 {
895 Sink::from_task_runner(move |mut input| {
896 let mut acc = input.next().unwrap_or(Err(StreamError::EmptyStream))?;
897 for item in input {
898 acc = f(acc, item?)?;
899 }
900 Ok(acc)
901 })
902 }
903
904 #[must_use]
905 pub fn reduce_result_with_supervision<F>(f: F, decider: SupervisionDecider) -> Self
906 where
907 In: Clone,
908 F: Fn(In, In) -> StreamResult<In> + Send + Sync + 'static,
909 {
910 Sink::from_task_runner(move |mut input: BoxStream<In>| {
911 let mut acc = Some(input.next().unwrap_or(Err(StreamError::EmptyStream))?);
912 for item in input {
913 let item = item?;
914 let Some(previous) = acc.take() else {
915 acc = Some(item);
916 continue;
917 };
918 match catch_unwind(AssertUnwindSafe(|| f(previous.clone(), item)))
919 .unwrap_or_else(|_| Err(panic_stream_error("reduce_result callback")))
920 {
921 Ok(next) => acc = Some(next),
922 Err(error) => match decide_supervision(&decider, &error) {
923 SupervisionDirective::Stop => return Err(error),
924 SupervisionDirective::Resume => acc = Some(previous),
925 SupervisionDirective::Restart => acc = None,
926 },
927 }
928 }
929 acc.ok_or(StreamError::EmptyStream)
930 })
931 }
932}
933
934impl<In: Send + 'static> Sink<In, StreamCompletion<Option<In>>> {
935 #[must_use]
952 pub fn head_option() -> Self {
953 Sink::from_task_runner_with_inline(
954 |mut input| match input.next() {
955 Some(Ok(item)) => Ok(Some(item)),
956 Some(Err(error)) => Err(error),
957 None => Ok(None),
958 },
959 true,
960 )
961 }
962
963 #[must_use]
964 pub fn last_option() -> Self {
965 Sink::from_task_runner(|input| {
966 let mut last = None;
967 for item in input {
968 last = Some(item?);
969 }
970 Ok(last)
971 })
972 }
973}
974
975impl<In: Send + 'static> Sink<In, NotUsed> {
976 #[must_use]
977 pub fn cancelled() -> Self {
978 Sink::from_runner(|_input, _materializer| Ok(NotUsed))
979 }
980
981 #[must_use]
982 pub fn future_sink<InnerMat, F, Fut>(future: F) -> Sink<In, StreamCompletion<InnerMat>>
983 where
984 InnerMat: Send + 'static,
985 F: Fn() -> Fut + Send + Sync + 'static,
986 Fut: Future<Output = StreamResult<Sink<In, InnerMat>>> + Send + 'static,
987 {
988 Self::lazy_future_sink(future)
989 }
990
991 #[must_use]
992 pub fn lazy_sink<InnerMat, F>(create: F) -> Sink<In, StreamCompletion<InnerMat>>
993 where
994 InnerMat: Send + 'static,
995 F: Fn() -> Sink<In, InnerMat> + Send + Sync + 'static,
996 {
997 let create = Arc::new(create);
998 Sink::from_runner(move |input, materializer| {
999 let create = Arc::clone(&create);
1000 let state = Arc::clone(&materializer.inner.state);
1001 let worker_materializer =
1002 materializer.with_name_prefix(materializer.name_prefix().to_owned());
1003 Ok(materializer.spawn_stream(move |cancelled| {
1004 let input = runtime_checked_stream(input, state, Some(cancelled));
1005 run_lazy_sink(input, &worker_materializer, move || {
1006 catch_unwind_failed("lazy_sink factory", || create())
1007 })
1008 }))
1009 })
1010 }
1011
1012 #[must_use]
1013 pub fn lazy_future_sink<InnerMat, F, Fut>(create: F) -> Sink<In, StreamCompletion<InnerMat>>
1014 where
1015 InnerMat: Send + 'static,
1016 F: Fn() -> Fut + Send + Sync + 'static,
1017 Fut: Future<Output = StreamResult<Sink<In, InnerMat>>> + Send + 'static,
1018 {
1019 let create = Arc::new(create);
1020 Sink::from_runner(move |input, materializer| {
1021 let create = Arc::clone(&create);
1022 let state = Arc::clone(&materializer.inner.state);
1023 let worker_materializer =
1024 materializer.with_name_prefix(materializer.name_prefix().to_owned());
1025 Ok(materializer.spawn_stream(move |cancelled| {
1026 let input = runtime_checked_stream(input, state, Some(cancelled));
1027 run_lazy_sink(input, &worker_materializer, move || {
1028 catch_unwind_failed("lazy_future_sink factory", || create())
1029 .and_then(flow::run_future_inline_or_spawn)
1030 })
1031 }))
1032 })
1033 }
1034
1035 #[must_use]
1036 pub fn fold<Acc, F>(zero: Acc, f: F) -> Sink<In, StreamCompletion<Acc>>
1037 where
1038 Acc: Clone + Send + Sync + 'static,
1039 F: Fn(Acc, In) -> Acc + Send + Sync + 'static,
1040 {
1041 let f_arc = Arc::new(f);
1042 let zero_clone = zero.clone();
1043 let f_arc2 = Arc::clone(&f_arc);
1044 let task_runner = {
1045 let zero = zero;
1046 Sink::from_raw_terminal_task_runner(move |input, materializer, cancelled, hints| {
1047 run_fold_terminal(
1048 input,
1049 materializer,
1050 cancelled,
1051 hints,
1052 zero.clone(),
1053 f_arc.as_ref(),
1054 )
1055 })
1056 };
1057 let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::FoldDescriptor {
1058 zero: zero_clone,
1059 f: f_arc2,
1060 });
1061 Sink {
1062 runner: task_runner.runner,
1063 inline_runner: task_runner.inline_runner,
1064 hinted_runner: task_runner.hinted_runner,
1065 raw_hinted_runner: task_runner.raw_hinted_runner,
1066 attributes: task_runner.attributes,
1067 deferred_factory: task_runner.deferred_factory,
1068 fold_fp: Some(fp),
1069 }
1070 }
1071
1072 #[must_use]
1073 pub fn fold_result<Acc, F>(zero: Acc, f: F) -> Sink<In, StreamCompletion<Acc>>
1074 where
1075 Acc: Clone + Send + Sync + 'static,
1076 F: Fn(Acc, In) -> StreamResult<Acc> + Send + Sync + 'static,
1077 {
1078 let f_arc = Arc::new(f);
1079 let zero_clone = zero.clone();
1080 let f_arc2 = Arc::clone(&f_arc);
1081 let task_runner = {
1082 let zero = zero;
1083 Sink::from_raw_terminal_task_runner(move |input, materializer, cancelled, hints| {
1084 run_fold_result_terminal(
1085 input,
1086 materializer,
1087 cancelled,
1088 hints,
1089 zero.clone(),
1090 f_arc.as_ref(),
1091 )
1092 })
1093 };
1094 let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::FoldResultDescriptor {
1095 zero: zero_clone,
1096 f: f_arc2,
1097 });
1098 Sink {
1099 runner: task_runner.runner,
1100 inline_runner: task_runner.inline_runner,
1101 hinted_runner: task_runner.hinted_runner,
1102 raw_hinted_runner: task_runner.raw_hinted_runner,
1103 attributes: task_runner.attributes,
1104 deferred_factory: task_runner.deferred_factory,
1105 fold_fp: Some(fp),
1106 }
1107 }
1108
1109 #[must_use]
1110 pub fn fold_result_with_supervision<Acc, F>(
1111 zero: Acc,
1112 f: F,
1113 decider: SupervisionDecider,
1114 ) -> Sink<In, StreamCompletion<Acc>>
1115 where
1116 Acc: Clone + Send + Sync + 'static,
1117 F: Fn(Acc, In) -> StreamResult<Acc> + Send + Sync + 'static,
1118 {
1119 Sink::from_task_runner(move |input| {
1120 let mut acc = zero.clone();
1121 for item in input {
1122 let item = item?;
1123 let previous = acc;
1124 match catch_unwind(AssertUnwindSafe(|| f(previous.clone(), item)))
1125 .unwrap_or_else(|_| Err(panic_stream_error("fold_result callback")))
1126 {
1127 Ok(next) => acc = next,
1128 Err(error) => match decide_supervision(&decider, &error) {
1129 SupervisionDirective::Stop => return Err(error),
1130 SupervisionDirective::Resume => acc = previous,
1131 SupervisionDirective::Restart => acc = zero.clone(),
1132 },
1133 }
1134 }
1135 Ok(acc)
1136 })
1137 }
1138}
1139
1140fn run_lazy_sink<In, InnerMat, F>(
1141 mut input: BoxStream<In>,
1142 materializer: &Materializer,
1143 create: F,
1144) -> StreamResult<InnerMat>
1145where
1146 In: Send + 'static,
1147 InnerMat: Send + 'static,
1148 F: FnOnce() -> StreamResult<Sink<In, InnerMat>>,
1149{
1150 let first = match input.next() {
1151 Some(Ok(item)) => item,
1152 Some(Err(error)) => return Err(error),
1153 None => {
1154 return Err(StreamError::Failed(
1155 "lazy sink was never materialized".into(),
1156 ));
1157 }
1158 };
1159 let sink = create()?;
1160 sink.run(prepend_first_stream(first, input), materializer)
1161}
1162
1163fn prepend_first_stream<In>(first: In, mut rest: BoxStream<In>) -> BoxStream<In>
1164where
1165 In: Send + 'static,
1166{
1167 let mut first = Some(first);
1168 Box::new(std::iter::from_fn(move || {
1169 if let Some(item) = first.take() {
1170 Some(Ok(item))
1171 } else {
1172 rest.next()
1173 }
1174 }))
1175}
1176
1177fn terminal_consumer_status(
1178 materializer: &Materializer,
1179 cancelled: &Arc<AtomicBool>,
1180) -> StreamResult<()> {
1181 if materializer.is_shutdown() {
1182 Err(StreamError::AbruptTermination)
1183 } else if cancelled.load(Ordering::SeqCst) {
1184 Err(StreamError::Cancelled)
1185 } else {
1186 Ok(())
1187 }
1188}
1189
1190fn run_collect_terminal<In: Send + 'static>(
1191 mut input: BoxStream<In>,
1192 materializer: Materializer,
1193 cancelled: Arc<AtomicBool>,
1194 hints: SourceRuntimeHints,
1195) -> StreamResult<Vec<In>> {
1196 if !hints.terminal_consumer_batch {
1197 let input = runtime_checked_stream(
1198 input,
1199 Arc::clone(&materializer.inner.state),
1200 Some(cancelled),
1201 );
1202 return input.collect();
1203 }
1204
1205 let mut items = Vec::with_capacity(hints.inline_micro_max_success_items.unwrap_or(0));
1206 loop {
1207 terminal_consumer_status(&materializer, &cancelled)?;
1208 {
1209 let _cancel_scope = set_current_stream_cancelled(&cancelled);
1210 for _ in 0..TERMINAL_CONSUMER_BATCH {
1211 match input.next() {
1212 Some(Ok(item)) => items.push(item),
1213 Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
1214 Ok(()) => return Err(error),
1215 Err(status) => return Err(status),
1216 },
1217 None => {
1218 return terminal_consumer_status(&materializer, &cancelled).map(|()| items);
1219 }
1220 }
1221 }
1222 }
1223 }
1224}
1225
1226fn run_ignore_terminal<In: Send + 'static>(
1227 input: BoxStream<In>,
1228 materializer: Materializer,
1229 cancelled: Arc<AtomicBool>,
1230 hints: SourceRuntimeHints,
1231) -> StreamResult<NotUsed> {
1232 if !hints.terminal_consumer_batch {
1233 let input = runtime_checked_stream(
1234 input,
1235 Arc::clone(&materializer.inner.state),
1236 Some(cancelled),
1237 );
1238 for item in input {
1239 item?;
1240 }
1241 return Ok(NotUsed);
1242 }
1243
1244 let mut input = input;
1245 loop {
1246 terminal_consumer_status(&materializer, &cancelled)?;
1247 {
1248 let _cancel_scope = set_current_stream_cancelled(&cancelled);
1249 for _ in 0..TERMINAL_CONSUMER_BATCH {
1250 match input.next() {
1251 Some(Ok(_)) => {}
1252 Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
1253 Ok(()) => return Err(error),
1254 Err(status) => return Err(status),
1255 },
1256 None => {
1257 return terminal_consumer_status(&materializer, &cancelled)
1258 .map(|()| NotUsed);
1259 }
1260 }
1261 }
1262 }
1263 }
1264}
1265
1266fn run_fold_terminal<In, Acc, F>(
1267 input: BoxStream<In>,
1268 materializer: Materializer,
1269 cancelled: Arc<AtomicBool>,
1270 hints: SourceRuntimeHints,
1271 zero: Acc,
1272 f: &F,
1273) -> StreamResult<Acc>
1274where
1275 In: Send + 'static,
1276 Acc: Send + 'static,
1277 F: Fn(Acc, In) -> Acc,
1278{
1279 if !hints.terminal_consumer_batch {
1280 let input = runtime_checked_stream(
1281 input,
1282 Arc::clone(&materializer.inner.state),
1283 Some(cancelled),
1284 );
1285 let mut acc = zero;
1286 for item in input {
1287 acc = f(acc, item?);
1288 }
1289 return Ok(acc);
1290 }
1291
1292 let mut input = input;
1293 let mut acc = zero;
1294 loop {
1295 terminal_consumer_status(&materializer, &cancelled)?;
1296 {
1297 let _cancel_scope = set_current_stream_cancelled(&cancelled);
1298 for _ in 0..TERMINAL_CONSUMER_BATCH {
1299 match input.next() {
1300 Some(Ok(item)) => acc = f(acc, item),
1301 Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
1302 Ok(()) => return Err(error),
1303 Err(status) => return Err(status),
1304 },
1305 None => {
1306 return terminal_consumer_status(&materializer, &cancelled).map(|()| acc);
1307 }
1308 }
1309 }
1310 }
1311 }
1312}
1313
1314fn run_fold_result_terminal<In, Acc, F>(
1315 input: BoxStream<In>,
1316 materializer: Materializer,
1317 cancelled: Arc<AtomicBool>,
1318 hints: SourceRuntimeHints,
1319 zero: Acc,
1320 f: &F,
1321) -> StreamResult<Acc>
1322where
1323 In: Send + 'static,
1324 Acc: Send + 'static,
1325 F: Fn(Acc, In) -> StreamResult<Acc>,
1326{
1327 if !hints.terminal_consumer_batch {
1328 let input = runtime_checked_stream(
1329 input,
1330 Arc::clone(&materializer.inner.state),
1331 Some(cancelled),
1332 );
1333 let mut acc = zero;
1334 for item in input {
1335 acc = f(acc, item?)?;
1336 }
1337 return Ok(acc);
1338 }
1339
1340 let mut input = input;
1341 let mut acc = Some(zero);
1342 loop {
1343 terminal_consumer_status(&materializer, &cancelled)?;
1344 {
1345 let _cancel_scope = set_current_stream_cancelled(&cancelled);
1346 for _ in 0..TERMINAL_CONSUMER_BATCH {
1347 match input.next() {
1348 Some(Ok(item)) => {
1349 let previous = acc.take().expect("fold accumulator present");
1350 match f(previous, item) {
1351 Ok(next) => acc = Some(next),
1352 Err(error) => {
1353 return match terminal_consumer_status(&materializer, &cancelled) {
1354 Ok(()) => Err(error),
1355 Err(status) => Err(status),
1356 };
1357 }
1358 }
1359 }
1360 Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
1361 Ok(()) => return Err(error),
1362 Err(status) => return Err(status),
1363 },
1364 None => {
1365 return terminal_consumer_status(&materializer, &cancelled)
1366 .map(|()| acc.expect("fold accumulator present"));
1367 }
1368 }
1369 }
1370 }
1371 }
1372}
1373
1374#[cfg(test)]
1375mod tests {
1376 use super::*;
1377 use crate::{Source, StreamCompletion};
1378 use std::time::Instant;
1379
1380 fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
1381 let deadline = Instant::now() + timeout;
1382 while Instant::now() < deadline {
1383 if condition() {
1384 return true;
1385 }
1386 thread::sleep(Duration::from_millis(5));
1387 }
1388 condition()
1389 }
1390
1391 #[test]
1392 fn map_materialized_value_on_deferred_sink_does_not_explode() {
1393 let sink = Sink::<u64, _>::setup(|_, _| Sink::fold(0u64, |acc, x| acc + x));
1396 let sink = sink
1400 .map_materialized_value(|sc: StreamCompletion<u64>| sc)
1401 .map_materialized_value(|sc| sc);
1402
1403 let sum = Source::from_iter(1u64..=3)
1404 .run_with(sink)
1405 .unwrap()
1406 .wait()
1407 .unwrap();
1408 assert_eq!(sum, 6u64);
1409 }
1410
1411 #[test]
1412 fn batched_terminal_fold_observes_completion_drop_cancellation() {
1413 let materializer = Materializer::new();
1414 let completion = Source::repeat(1_u64)
1415 .run_with_materializer(Sink::fold(0_u64, |acc, item| acc + item), &materializer)
1416 .expect("fold terminal materializes");
1417
1418 assert!(wait_until(Duration::from_secs(1), || {
1419 materializer.active_streams() == 1
1420 }));
1421 drop(completion);
1422
1423 assert!(wait_until(Duration::from_secs(5), || {
1424 materializer.active_streams() == 0
1425 }));
1426 }
1427}