use super::flow;
use super::*;
use crate::Attributes;
use crate::stream::error::{decide_supervision, panic_stream_error};
type CombinedSinkChild<In> = (
std::sync::mpsc::SyncSender<CombinedSinkMessage<In>>,
Box<dyn std::any::Any + Send>,
);
type CombinedSinkRunner<In> =
dyn Fn(&Materializer) -> StreamResult<CombinedSinkChild<In>> + Send + Sync;
type DeferredSinkFactory<In, Mat> =
dyn Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync;
enum CombinedSinkMessage<In> {
Item(StreamResult<In>),
Flush(std::sync::mpsc::SyncSender<()>),
Close,
}
const TERMINAL_CONSUMER_BATCH: usize = 64;
pub struct Sink<In, Mat> {
runner: Arc<SinkRunner<In, Mat>>,
inline_runner: Option<Arc<SinkRunner<In, Mat>>>,
hinted_runner: Option<Arc<HintedSinkRunner<In, Mat>>>,
raw_hinted_runner: Option<Arc<HintedSinkRunner<In, Mat>>>,
attributes: Attributes,
deferred_factory: Option<Arc<DeferredSinkFactory<In, Mat>>>,
pub(crate) fold_fp: Option<Arc<dyn FoldFastPathDyn<In>>>,
}
fn map_mat_dyn<In, Mat, NextMat>(
sink: Sink<In, Mat>,
f: Arc<dyn Fn(Mat) -> NextMat + Send + Sync + 'static>,
) -> Sink<In, NextMat>
where
In: Send + 'static,
Mat: Send + 'static,
NextMat: Send + 'static,
{
let Sink {
runner,
inline_runner,
hinted_runner,
raw_hinted_runner,
attributes,
deferred_factory,
fold_fp: _,
} = sink;
let mapped_runner = {
let f = Arc::clone(&f);
Arc::new(move |input, materializer: &Materializer| {
let mat = runner(input, materializer)?;
Ok(f(mat))
}) as Arc<SinkRunner<In, NextMat>>
};
let mapped_inline_runner = inline_runner.map(|ir| {
let f = Arc::clone(&f);
Arc::new(move |input, materializer: &Materializer| {
let result = ir(input, materializer)?;
Ok(f(result))
}) as Arc<SinkRunner<In, NextMat>>
});
let mapped_hinted_runner = hinted_runner.map(|hr| {
let f = Arc::clone(&f);
Arc::new(
move |input, materializer: &Materializer, hints: SourceRuntimeHints| {
let result = hr(input, materializer, hints)?;
Ok(f(result))
},
) as Arc<HintedSinkRunner<In, NextMat>>
});
let mapped_raw_hinted_runner = raw_hinted_runner.map(|hr| {
let f = Arc::clone(&f);
Arc::new(
move |input, materializer: &Materializer, hints: SourceRuntimeHints| {
let result = hr(input, materializer, hints)?;
Ok(f(result))
},
) as Arc<HintedSinkRunner<In, NextMat>>
});
let mapped_factory = deferred_factory.map(|factory| {
let f = Arc::clone(&f);
Arc::new(move |materializer: &Materializer, attrs: &Attributes| {
map_mat_dyn(factory(materializer, attrs), Arc::clone(&f))
}) as Arc<DeferredSinkFactory<In, NextMat>>
});
Sink {
runner: mapped_runner,
inline_runner: mapped_inline_runner,
hinted_runner: mapped_hinted_runner,
raw_hinted_runner: mapped_raw_hinted_runner,
attributes,
deferred_factory: mapped_factory,
fold_fp: None,
}
}
impl<In, Mat> Clone for Sink<In, Mat> {
fn clone(&self) -> Self {
Self {
runner: Arc::clone(&self.runner),
inline_runner: self.inline_runner.as_ref().map(Arc::clone),
hinted_runner: self.hinted_runner.as_ref().map(Arc::clone),
raw_hinted_runner: self.raw_hinted_runner.as_ref().map(Arc::clone),
attributes: self.attributes.clone(),
deferred_factory: self.deferred_factory.as_ref().map(Arc::clone),
fold_fp: self.fold_fp.as_ref().map(Arc::clone),
}
}
}
impl<In: Send + 'static, Mat: Send + 'static> Sink<In, Mat> {
pub(crate) fn from_runner<F>(runner: F) -> Self
where
F: Fn(BoxStream<In>, &Materializer) -> StreamResult<Mat> + Send + Sync + 'static,
{
Self::from_runner_parts(Arc::new(runner), None)
}
pub(crate) fn from_runner_parts(
runner: Arc<SinkRunner<In, Mat>>,
inline_runner: Option<Arc<SinkRunner<In, Mat>>>,
) -> Self {
Self {
runner,
inline_runner,
hinted_runner: None,
raw_hinted_runner: None,
attributes: Attributes::default(),
deferred_factory: None,
fold_fp: None,
}
}
pub(crate) fn from_hinted_runner<F>(runner: F) -> Self
where
F: Fn(BoxStream<In>, &Materializer, SourceRuntimeHints) -> StreamResult<Mat>
+ Send
+ Sync
+ 'static,
{
let hinted_runner: Arc<HintedSinkRunner<In, Mat>> = Arc::new(runner);
let fallback = {
let hinted_runner = Arc::clone(&hinted_runner);
Arc::new(move |input, materializer: &Materializer| {
hinted_runner(input, materializer, SourceRuntimeHints::default())
}) as Arc<SinkRunner<In, Mat>>
};
Self {
runner: fallback,
inline_runner: None,
hinted_runner: Some(hinted_runner),
raw_hinted_runner: None,
attributes: Attributes::default(),
deferred_factory: None,
fold_fp: None,
}
}
pub(crate) fn from_raw_hinted_runner<F>(runner: F) -> Self
where
F: Fn(BoxStream<In>, &Materializer, SourceRuntimeHints) -> StreamResult<Mat>
+ Send
+ Sync
+ 'static,
{
let raw_hinted_runner: Arc<HintedSinkRunner<In, Mat>> = Arc::new(runner);
let fallback = {
let raw_hinted_runner = Arc::clone(&raw_hinted_runner);
Arc::new(move |input, materializer: &Materializer| {
let input =
runtime_checked_stream(input, Arc::clone(&materializer.inner.state), None);
raw_hinted_runner(input, materializer, SourceRuntimeHints::default())
}) as Arc<SinkRunner<In, Mat>>
};
Self {
runner: fallback,
inline_runner: None,
hinted_runner: None,
raw_hinted_runner: Some(raw_hinted_runner),
attributes: Attributes::default(),
deferred_factory: None,
fold_fp: None,
}
}
pub(super) fn run(
&self,
input: BoxStream<In>,
materializer: &Materializer,
) -> StreamResult<Mat> {
self.run_with_source_hints(input, materializer, SourceRuntimeHints::default())
}
pub(super) fn run_with_source_hints(
&self,
input: BoxStream<In>,
materializer: &Materializer,
hints: SourceRuntimeHints,
) -> StreamResult<Mat> {
if let Some(factory) = &self.deferred_factory {
let attrs = materializer.effective_attributes(&self.attributes);
return factory(materializer, &attrs).run_with_source_hints(input, materializer, hints);
}
if let Some(hinted_runner) = &self.hinted_runner {
return hinted_runner(input, materializer, hints);
}
(self.runner)(input, materializer)
}
pub(super) fn run_from_source(
&self,
input: BoxStream<In>,
materializer: &Materializer,
hints: SourceRuntimeHints,
) -> StreamResult<Mat> {
if let Some(factory) = &self.deferred_factory {
let attrs = materializer.effective_attributes(&self.attributes);
return factory(materializer, &attrs).run_from_source(input, materializer, hints);
}
if let Some(raw_hinted_runner) = &self.raw_hinted_runner {
return raw_hinted_runner(input, materializer, hints);
}
let input = runtime_checked_stream(input, Arc::clone(&materializer.inner.state), None);
self.run_with_source_hints(input, materializer, hints)
}
pub(super) fn can_inline(&self) -> bool {
self.inline_runner.is_some()
}
pub(super) fn run_inline(
&self,
input: BoxStream<In>,
materializer: &Materializer,
) -> StreamResult<Mat> {
if let Some(factory) = &self.deferred_factory {
let attrs = materializer.effective_attributes(&self.attributes);
return factory(materializer, &attrs).run_inline(input, materializer);
}
(self
.inline_runner
.as_ref()
.expect("inline sink runner exists"))(input, materializer)
}
pub fn run_with<SourceMat: Send + 'static>(
self,
source: Source<In, SourceMat>,
) -> StreamResult<SourceMat> {
source.to(self).run()
}
pub fn run_with_materializer<SourceMat: Send + 'static>(
self,
source: Source<In, SourceMat>,
materializer: &Materializer,
) -> StreamResult<SourceMat> {
source.to(self).run_with_materializer(materializer)
}
#[must_use]
pub fn from_materializer<F>(factory: F) -> Self
where
F: Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync + 'static,
{
let factory = Arc::new(factory);
Self {
runner: Arc::new(|_input, _materializer| {
Err(StreamError::Failed(
"deferred sink factory must be driven through Sink::run".into(),
))
}),
inline_runner: None,
hinted_runner: None,
raw_hinted_runner: None,
attributes: Attributes::default(),
deferred_factory: Some(factory),
fold_fp: None,
}
}
#[must_use]
pub fn setup<F>(factory: F) -> Self
where
F: Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync + 'static,
{
Self::from_materializer(factory)
}
pub fn pre_materialize(
&self,
materializer: &Materializer,
) -> StreamResult<(Mat, Sink<In, NotUsed>)> {
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
let materialized = self.clone().run(
Box::new(std::iter::from_fn(move || receiver.recv().ok())),
materializer,
)?;
let sender = Arc::new(Mutex::new(Some(sender)));
let sink = Sink::from_runner(move |input, _materializer| {
let Some(sender) = sender
.lock()
.expect("pre-materialized sink poisoned")
.take()
else {
return Err(StreamError::Failed(
"pre-materialized sink has already been materialized".into(),
));
};
for item in input {
if sender.send(item).is_err() {
break;
}
}
Ok(NotUsed)
});
Ok((materialized, sink.with_attributes(self.attributes.clone())))
}
#[must_use]
pub fn map_materialized_value<NextMat, F>(self, f: F) -> Sink<In, NextMat>
where
NextMat: Send + 'static,
F: Fn(Mat) -> NextMat + Send + Sync + 'static,
{
map_mat_dyn(self, Arc::new(f))
}
#[must_use]
pub fn attributes(&self) -> &Attributes {
&self.attributes
}
#[must_use]
pub fn with_attributes(mut self, attributes: Attributes) -> Self {
self.attributes = attributes;
self
}
#[must_use]
pub fn add_attributes(mut self, attributes: Attributes) -> Self {
self.attributes = self.attributes.and(attributes);
self
}
#[must_use]
pub fn named(self, name: impl Into<String>) -> Self {
self.add_attributes(Attributes::named(name))
}
}
#[derive(Clone)]
pub struct RunnableGraph<Mat> {
pub(super) runner: Arc<RunnableGraphRunner<Mat>>,
attributes: Attributes,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SinkCombineStrategy {
Broadcast,
Balance,
}
impl<Mat: Send + 'static> RunnableGraph<Mat> {
pub(super) fn from_runner<F>(runner: F) -> Self
where
F: Fn(&Materializer) -> StreamResult<Mat> + Send + Sync + 'static,
{
Self {
runner: Arc::new(runner),
attributes: Attributes::default(),
}
}
pub fn run(&self) -> StreamResult<Mat> {
Materializer::new().materialize(self)
}
pub fn run_with_materializer(&self, materializer: &Materializer) -> StreamResult<Mat> {
materializer.materialize(self)
}
#[must_use]
pub fn map_materialized_value<Next, F>(self, f: F) -> RunnableGraph<Next>
where
Next: Send + 'static,
F: Fn(Mat) -> Next + Send + Sync + 'static,
{
let f = Arc::new(f);
RunnableGraph::from_runner(move |materializer| {
let mat = (self.runner)(materializer)?;
Ok(f(mat))
})
}
#[must_use]
pub fn attributes(&self) -> &Attributes {
&self.attributes
}
#[must_use]
pub fn with_attributes(mut self, attributes: Attributes) -> Self {
self.attributes = attributes;
self
}
#[must_use]
pub fn add_attributes(mut self, attributes: Attributes) -> Self {
self.attributes = self.attributes.and(attributes);
self
}
#[must_use]
pub fn named(self, name: impl Into<String>) -> Self {
self.add_attributes(Attributes::named(name))
}
}
impl<In: Clone + Send + 'static> Sink<In, NotUsed> {
#[must_use]
pub fn combine<M1, M2, MRest, I>(
first: Sink<In, M1>,
second: Sink<In, M2>,
rest: I,
strategy: SinkCombineStrategy,
) -> Sink<In, NotUsed>
where
M1: Send + 'static,
M2: Send + 'static,
MRest: Send + 'static,
I: IntoIterator<Item = Sink<In, MRest>>,
{
let mut runners: Vec<Arc<CombinedSinkRunner<In>>> = vec![
Arc::new(move |materializer| {
let (sender, receiver) = std::sync::mpsc::sync_channel(0);
let mat = first.run(
Box::new(std::iter::from_fn(move || {
loop {
match receiver.recv().ok()? {
CombinedSinkMessage::Item(item) => return Some(item),
CombinedSinkMessage::Flush(ack) => {
let _ = ack.send(());
}
CombinedSinkMessage::Close => return None,
}
}
})),
materializer,
)?;
Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
}),
Arc::new(move |materializer| {
let (sender, receiver) = std::sync::mpsc::sync_channel(0);
let mat = second.run(
Box::new(std::iter::from_fn(move || {
loop {
match receiver.recv().ok()? {
CombinedSinkMessage::Item(item) => return Some(item),
CombinedSinkMessage::Flush(ack) => {
let _ = ack.send(());
}
CombinedSinkMessage::Close => return None,
}
}
})),
materializer,
)?;
Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
}),
];
runners.extend(rest.into_iter().map(|sink| {
Arc::new(move |materializer: &Materializer| {
let (sender, receiver) = std::sync::mpsc::sync_channel(0);
let mat = sink.run(
Box::new(std::iter::from_fn(move || {
loop {
match receiver.recv().ok()? {
CombinedSinkMessage::Item(item) => return Some(item),
CombinedSinkMessage::Flush(ack) => {
let _ = ack.send(());
}
CombinedSinkMessage::Close => return None,
}
}
})),
materializer,
)?;
Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
}) as Arc<CombinedSinkRunner<In>>
}));
Sink::from_runner(move |mut input: BoxStream<In>, materializer| {
let mut children = runners
.iter()
.map(|runner| runner(materializer))
.collect::<StreamResult<Vec<_>>>()?;
let mut next = 0usize;
for item in input.by_ref() {
match item {
Ok(value) => match strategy {
SinkCombineStrategy::Broadcast => {
children.retain(|(sender, _)| {
sender
.send(CombinedSinkMessage::Item(Ok(value.clone())))
.is_ok()
});
if children.is_empty() {
break;
}
}
SinkCombineStrategy::Balance => {
while !children.is_empty() {
let index = next % children.len();
next = next.wrapping_add(1);
match children[index]
.0
.send(CombinedSinkMessage::Item(Ok(value.clone())))
{
Ok(()) => break,
Err(_) => {
children.remove(index);
}
}
}
if children.is_empty() {
break;
}
}
},
Err(error) => {
for (sender, _) in &children {
let _ = sender.send(CombinedSinkMessage::Item(Err(error.clone())));
}
return Err(error);
}
}
}
for (sender, _) in &children {
let (ack_sender, ack_receiver) = std::sync::mpsc::sync_channel(0);
if sender.send(CombinedSinkMessage::Flush(ack_sender)).is_ok() {
let _ = ack_receiver.recv();
}
}
let mats: Vec<_> = children
.into_iter()
.map(|(sender, mat)| {
let _ = sender.send(CombinedSinkMessage::Close);
mat
})
.collect();
drop(mats);
Ok(NotUsed)
})
}
}
impl<In: Send + 'static, Mat: Send + 'static> Sink<In, StreamCompletion<Mat>> {
fn from_task_runner<F>(runner: F) -> Self
where
F: Fn(BoxStream<In>) -> StreamResult<Mat> + Send + Sync + 'static,
{
Self::from_task_runner_with_inline(runner, false)
}
fn from_raw_terminal_task_runner<F>(runner: F) -> Self
where
F: Fn(
BoxStream<In>,
Materializer,
Arc<AtomicBool>,
SourceRuntimeHints,
) -> StreamResult<Mat>
+ Send
+ Sync
+ 'static,
{
let runner = Arc::new(runner);
let async_runner = {
let runner = Arc::clone(&runner);
Arc::new(move |input, materializer: &Materializer| {
let runner = Arc::clone(&runner);
let worker_materializer =
materializer.with_name_prefix(materializer.name_prefix().to_owned());
Ok(materializer.spawn_stream(move |cancelled| {
runner(
input,
worker_materializer,
cancelled,
SourceRuntimeHints::default(),
)
}))
}) as Arc<SinkRunner<In, StreamCompletion<Mat>>>
};
let raw_hinted_runner = {
let runner = Arc::clone(&runner);
Arc::new(
move |input, materializer: &Materializer, hints: SourceRuntimeHints| {
let runner = Arc::clone(&runner);
let worker_materializer =
materializer.with_name_prefix(materializer.name_prefix().to_owned());
Ok(materializer.spawn_stream(move |cancelled| {
runner(input, worker_materializer, cancelled, hints)
}))
},
) as Arc<HintedSinkRunner<In, StreamCompletion<Mat>>>
};
Sink {
runner: async_runner,
inline_runner: None,
hinted_runner: None,
raw_hinted_runner: Some(raw_hinted_runner),
attributes: Attributes::default(),
deferred_factory: None,
fold_fp: None,
}
}
fn from_task_runner_with_inline<F>(runner: F, inline: bool) -> Self
where
F: Fn(BoxStream<In>) -> StreamResult<Mat> + Send + Sync + 'static,
{
let runner = Arc::new(runner);
let async_runner = {
let runner = Arc::clone(&runner);
Arc::new(move |input, materializer: &Materializer| {
let runner = Arc::clone(&runner);
let state = Arc::clone(&materializer.inner.state);
Ok(materializer.spawn_stream(move |cancelled| {
runner(runtime_checked_stream(input, state, Some(cancelled)))
}))
}) as Arc<SinkRunner<In, StreamCompletion<Mat>>>
};
let inline_runner = inline.then(|| {
let runner = Arc::clone(&runner);
Arc::new(move |input, materializer: &Materializer| {
let runner = Arc::clone(&runner);
let state = Arc::clone(&materializer.inner.state);
Ok(materializer.spawn_stream_inline(move |cancelled| {
runner(runtime_checked_stream(input, state, Some(cancelled)))
}))
}) as Arc<SinkRunner<In, StreamCompletion<Mat>>>
});
Sink::from_runner_parts(async_runner, inline_runner)
}
}
impl<In: Send + 'static> Sink<In, StreamCompletion<Vec<In>>> {
#[must_use]
pub fn collect() -> Self {
let task_runner =
Sink::from_raw_terminal_task_runner(|input, materializer, cancelled, hints| {
run_collect_terminal(input, materializer, cancelled, hints)
});
let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::CollectDescriptor::<In> {
_phantom: std::marker::PhantomData,
});
Sink {
runner: task_runner.runner,
inline_runner: task_runner.inline_runner,
hinted_runner: task_runner.hinted_runner,
raw_hinted_runner: task_runner.raw_hinted_runner,
attributes: task_runner.attributes,
deferred_factory: task_runner.deferred_factory,
fold_fp: Some(fp),
}
}
#[must_use]
pub fn collection() -> Self {
Self::collect()
}
#[must_use]
pub fn take_last(n: usize) -> Self {
Sink::from_task_runner(move |input| {
if n == 0 {
for item in input {
let _ = item?;
}
return Ok(Vec::new());
}
let mut buffer = VecDeque::with_capacity(n);
for item in input {
let item = item?;
if buffer.len() == n {
buffer.pop_front();
}
buffer.push_back(item);
}
Ok(buffer.into_iter().collect())
})
}
}
impl<In: Send + 'static> Sink<In, StreamCompletion<NotUsed>> {
#[must_use]
pub fn ignore() -> Self {
let task_runner =
Sink::from_raw_terminal_task_runner(|input, materializer, cancelled, hints| {
run_ignore_terminal(input, materializer, cancelled, hints)
});
let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::IgnoreDescriptor::<In> {
_phantom: std::marker::PhantomData,
});
Sink {
runner: task_runner.runner,
inline_runner: task_runner.inline_runner,
hinted_runner: task_runner.hinted_runner,
raw_hinted_runner: task_runner.raw_hinted_runner,
attributes: task_runner.attributes,
deferred_factory: task_runner.deferred_factory,
fold_fp: Some(fp),
}
}
#[must_use]
pub fn on_complete<F>(callback: F) -> Self
where
F: FnOnce() + Send + Sync + 'static,
{
let callback = Arc::new(Mutex::new(Some(callback)));
Sink::from_task_runner(move |input| {
for item in input {
item?;
}
if let Some(cb) = callback.lock().expect("on_complete poisoned").take() {
cb();
}
Ok(NotUsed)
})
}
#[must_use]
pub fn never() -> Self {
Sink::from_runner(|input, materializer| {
let state = Arc::clone(&materializer.inner.state);
let shutdown_state = Arc::clone(&state);
Ok(materializer.spawn_stream(move |cancelled| {
let input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
for item in input {
item?;
}
loop {
if shutdown_state.shutdown.load(Ordering::SeqCst) {
return Err(StreamError::AbruptTermination);
}
if cancelled.load(Ordering::SeqCst) {
return Err(StreamError::Cancelled);
}
thread::sleep(Duration::from_millis(1));
}
}))
})
}
#[must_use]
pub fn foreach<F>(f: F) -> Self
where
F: Fn(In) + Send + Sync + 'static,
{
Sink::from_task_runner(move |input| {
for item in input {
f(item?);
}
Ok(NotUsed)
})
}
#[must_use]
pub fn foreach_async<F, Fut>(parallelism: usize, f: F) -> Self
where
F: Fn(In) -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<()>> + Send + 'static,
{
Flow::identity()
.map_async_unordered(parallelism, f)
.to_mat(Sink::ignore(), Keep::right)
}
#[must_use]
pub fn foreach_result<F>(f: F) -> Self
where
F: Fn(In) -> StreamResult<()> + Send + Sync + 'static,
{
Sink::from_task_runner(move |input| {
for item in input {
f(item?)?;
}
Ok(NotUsed)
})
}
#[must_use]
pub fn foreach_result_with_supervision<F>(f: F, decider: SupervisionDecider) -> Self
where
F: Fn(In) -> StreamResult<()> + Send + Sync + 'static,
{
Sink::from_task_runner(move |input| {
for item in input {
let item = item?;
match catch_unwind(AssertUnwindSafe(|| f(item)))
.unwrap_or_else(|_| Err(panic_stream_error("foreach_result callback")))
{
Ok(()) => {}
Err(error) => match decide_supervision(&decider, &error) {
SupervisionDirective::Stop => return Err(error),
SupervisionDirective::Resume | SupervisionDirective::Restart => {}
},
}
}
Ok(NotUsed)
})
}
}
impl<In: Send + 'static> Sink<In, StreamCompletion<In>> {
#[must_use]
pub fn head() -> Self {
Sink::from_task_runner_with_inline(
|mut input| input.next().unwrap_or(Err(StreamError::EmptyStream)),
true,
)
}
#[must_use]
pub fn last() -> Self {
Sink::from_task_runner(|input| {
let mut last = None;
for item in input {
last = Some(item?);
}
last.ok_or(StreamError::EmptyStream)
})
}
#[must_use]
pub fn reduce<F>(f: F) -> Self
where
F: Fn(In, In) -> In + Send + Sync + 'static,
{
Sink::from_task_runner(move |mut input| {
let mut acc = input.next().unwrap_or(Err(StreamError::EmptyStream))?;
for item in input {
acc = f(acc, item?);
}
Ok(acc)
})
}
#[must_use]
pub fn reduce_result<F>(f: F) -> Self
where
F: Fn(In, In) -> StreamResult<In> + Send + Sync + 'static,
{
Sink::from_task_runner(move |mut input| {
let mut acc = input.next().unwrap_or(Err(StreamError::EmptyStream))?;
for item in input {
acc = f(acc, item?)?;
}
Ok(acc)
})
}
#[must_use]
pub fn reduce_result_with_supervision<F>(f: F, decider: SupervisionDecider) -> Self
where
In: Clone,
F: Fn(In, In) -> StreamResult<In> + Send + Sync + 'static,
{
Sink::from_task_runner(move |mut input: BoxStream<In>| {
let mut acc = Some(input.next().unwrap_or(Err(StreamError::EmptyStream))?);
for item in input {
let item = item?;
let Some(previous) = acc.take() else {
acc = Some(item);
continue;
};
match catch_unwind(AssertUnwindSafe(|| f(previous.clone(), item)))
.unwrap_or_else(|_| Err(panic_stream_error("reduce_result callback")))
{
Ok(next) => acc = Some(next),
Err(error) => match decide_supervision(&decider, &error) {
SupervisionDirective::Stop => return Err(error),
SupervisionDirective::Resume => acc = Some(previous),
SupervisionDirective::Restart => acc = None,
},
}
}
acc.ok_or(StreamError::EmptyStream)
})
}
}
impl<In: Send + 'static> Sink<In, StreamCompletion<Option<In>>> {
#[must_use]
pub fn head_option() -> Self {
Sink::from_task_runner_with_inline(
|mut input| match input.next() {
Some(Ok(item)) => Ok(Some(item)),
Some(Err(error)) => Err(error),
None => Ok(None),
},
true,
)
}
#[must_use]
pub fn last_option() -> Self {
Sink::from_task_runner(|input| {
let mut last = None;
for item in input {
last = Some(item?);
}
Ok(last)
})
}
}
impl<In: Send + 'static> Sink<In, NotUsed> {
#[must_use]
pub fn cancelled() -> Self {
Sink::from_runner(|_input, _materializer| Ok(NotUsed))
}
#[must_use]
pub fn future_sink<InnerMat, F, Fut>(future: F) -> Sink<In, StreamCompletion<InnerMat>>
where
InnerMat: Send + 'static,
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Sink<In, InnerMat>>> + Send + 'static,
{
Self::lazy_future_sink(future)
}
#[must_use]
pub fn lazy_sink<InnerMat, F>(create: F) -> Sink<In, StreamCompletion<InnerMat>>
where
InnerMat: Send + 'static,
F: Fn() -> Sink<In, InnerMat> + Send + Sync + 'static,
{
let create = Arc::new(create);
Sink::from_runner(move |input, materializer| {
let create = Arc::clone(&create);
let state = Arc::clone(&materializer.inner.state);
let worker_materializer =
materializer.with_name_prefix(materializer.name_prefix().to_owned());
Ok(materializer.spawn_stream(move |cancelled| {
let input = runtime_checked_stream(input, state, Some(cancelled));
run_lazy_sink(input, &worker_materializer, move || {
catch_unwind_failed("lazy_sink factory", || create())
})
}))
})
}
#[must_use]
pub fn lazy_future_sink<InnerMat, F, Fut>(create: F) -> Sink<In, StreamCompletion<InnerMat>>
where
InnerMat: Send + 'static,
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Sink<In, InnerMat>>> + Send + 'static,
{
let create = Arc::new(create);
Sink::from_runner(move |input, materializer| {
let create = Arc::clone(&create);
let state = Arc::clone(&materializer.inner.state);
let worker_materializer =
materializer.with_name_prefix(materializer.name_prefix().to_owned());
Ok(materializer.spawn_stream(move |cancelled| {
let input = runtime_checked_stream(input, state, Some(cancelled));
run_lazy_sink(input, &worker_materializer, move || {
catch_unwind_failed("lazy_future_sink factory", || create())
.and_then(flow::run_future_inline_or_spawn)
})
}))
})
}
#[must_use]
pub fn fold<Acc, F>(zero: Acc, f: F) -> Sink<In, StreamCompletion<Acc>>
where
Acc: Clone + Send + Sync + 'static,
F: Fn(Acc, In) -> Acc + Send + Sync + 'static,
{
let f_arc = Arc::new(f);
let zero_clone = zero.clone();
let f_arc2 = Arc::clone(&f_arc);
let task_runner = {
let zero = zero;
Sink::from_raw_terminal_task_runner(move |input, materializer, cancelled, hints| {
run_fold_terminal(
input,
materializer,
cancelled,
hints,
zero.clone(),
f_arc.as_ref(),
)
})
};
let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::FoldDescriptor {
zero: zero_clone,
f: f_arc2,
});
Sink {
runner: task_runner.runner,
inline_runner: task_runner.inline_runner,
hinted_runner: task_runner.hinted_runner,
raw_hinted_runner: task_runner.raw_hinted_runner,
attributes: task_runner.attributes,
deferred_factory: task_runner.deferred_factory,
fold_fp: Some(fp),
}
}
#[must_use]
pub fn fold_result<Acc, F>(zero: Acc, f: F) -> Sink<In, StreamCompletion<Acc>>
where
Acc: Clone + Send + Sync + 'static,
F: Fn(Acc, In) -> StreamResult<Acc> + Send + Sync + 'static,
{
let f_arc = Arc::new(f);
let zero_clone = zero.clone();
let f_arc2 = Arc::clone(&f_arc);
let task_runner = {
let zero = zero;
Sink::from_raw_terminal_task_runner(move |input, materializer, cancelled, hints| {
run_fold_result_terminal(
input,
materializer,
cancelled,
hints,
zero.clone(),
f_arc.as_ref(),
)
})
};
let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::FoldResultDescriptor {
zero: zero_clone,
f: f_arc2,
});
Sink {
runner: task_runner.runner,
inline_runner: task_runner.inline_runner,
hinted_runner: task_runner.hinted_runner,
raw_hinted_runner: task_runner.raw_hinted_runner,
attributes: task_runner.attributes,
deferred_factory: task_runner.deferred_factory,
fold_fp: Some(fp),
}
}
#[must_use]
pub fn fold_result_with_supervision<Acc, F>(
zero: Acc,
f: F,
decider: SupervisionDecider,
) -> Sink<In, StreamCompletion<Acc>>
where
Acc: Clone + Send + Sync + 'static,
F: Fn(Acc, In) -> StreamResult<Acc> + Send + Sync + 'static,
{
Sink::from_task_runner(move |input| {
let mut acc = zero.clone();
for item in input {
let item = item?;
let previous = acc;
match catch_unwind(AssertUnwindSafe(|| f(previous.clone(), item)))
.unwrap_or_else(|_| Err(panic_stream_error("fold_result callback")))
{
Ok(next) => acc = next,
Err(error) => match decide_supervision(&decider, &error) {
SupervisionDirective::Stop => return Err(error),
SupervisionDirective::Resume => acc = previous,
SupervisionDirective::Restart => acc = zero.clone(),
},
}
}
Ok(acc)
})
}
}
fn run_lazy_sink<In, InnerMat, F>(
mut input: BoxStream<In>,
materializer: &Materializer,
create: F,
) -> StreamResult<InnerMat>
where
In: Send + 'static,
InnerMat: Send + 'static,
F: FnOnce() -> StreamResult<Sink<In, InnerMat>>,
{
let first = match input.next() {
Some(Ok(item)) => item,
Some(Err(error)) => return Err(error),
None => {
return Err(StreamError::Failed(
"lazy sink was never materialized".into(),
));
}
};
let sink = create()?;
sink.run(prepend_first_stream(first, input), materializer)
}
fn prepend_first_stream<In>(first: In, mut rest: BoxStream<In>) -> BoxStream<In>
where
In: Send + 'static,
{
let mut first = Some(first);
Box::new(std::iter::from_fn(move || {
if let Some(item) = first.take() {
Some(Ok(item))
} else {
rest.next()
}
}))
}
fn terminal_consumer_status(
materializer: &Materializer,
cancelled: &Arc<AtomicBool>,
) -> StreamResult<()> {
if materializer.is_shutdown() {
Err(StreamError::AbruptTermination)
} else if cancelled.load(Ordering::SeqCst) {
Err(StreamError::Cancelled)
} else {
Ok(())
}
}
fn run_collect_terminal<In: Send + 'static>(
mut input: BoxStream<In>,
materializer: Materializer,
cancelled: Arc<AtomicBool>,
hints: SourceRuntimeHints,
) -> StreamResult<Vec<In>> {
if !hints.terminal_consumer_batch {
let input = runtime_checked_stream(
input,
Arc::clone(&materializer.inner.state),
Some(cancelled),
);
return input.collect();
}
let mut items = Vec::with_capacity(hints.inline_micro_max_success_items.unwrap_or(0));
loop {
terminal_consumer_status(&materializer, &cancelled)?;
{
let _cancel_scope = set_current_stream_cancelled(&cancelled);
for _ in 0..TERMINAL_CONSUMER_BATCH {
match input.next() {
Some(Ok(item)) => items.push(item),
Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
Ok(()) => return Err(error),
Err(status) => return Err(status),
},
None => {
return terminal_consumer_status(&materializer, &cancelled).map(|()| items);
}
}
}
}
}
}
fn run_ignore_terminal<In: Send + 'static>(
input: BoxStream<In>,
materializer: Materializer,
cancelled: Arc<AtomicBool>,
hints: SourceRuntimeHints,
) -> StreamResult<NotUsed> {
if !hints.terminal_consumer_batch {
let input = runtime_checked_stream(
input,
Arc::clone(&materializer.inner.state),
Some(cancelled),
);
for item in input {
item?;
}
return Ok(NotUsed);
}
let mut input = input;
loop {
terminal_consumer_status(&materializer, &cancelled)?;
{
let _cancel_scope = set_current_stream_cancelled(&cancelled);
for _ in 0..TERMINAL_CONSUMER_BATCH {
match input.next() {
Some(Ok(_)) => {}
Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
Ok(()) => return Err(error),
Err(status) => return Err(status),
},
None => {
return terminal_consumer_status(&materializer, &cancelled)
.map(|()| NotUsed);
}
}
}
}
}
}
fn run_fold_terminal<In, Acc, F>(
input: BoxStream<In>,
materializer: Materializer,
cancelled: Arc<AtomicBool>,
hints: SourceRuntimeHints,
zero: Acc,
f: &F,
) -> StreamResult<Acc>
where
In: Send + 'static,
Acc: Send + 'static,
F: Fn(Acc, In) -> Acc,
{
if !hints.terminal_consumer_batch {
let input = runtime_checked_stream(
input,
Arc::clone(&materializer.inner.state),
Some(cancelled),
);
let mut acc = zero;
for item in input {
acc = f(acc, item?);
}
return Ok(acc);
}
let mut input = input;
let mut acc = zero;
loop {
terminal_consumer_status(&materializer, &cancelled)?;
{
let _cancel_scope = set_current_stream_cancelled(&cancelled);
for _ in 0..TERMINAL_CONSUMER_BATCH {
match input.next() {
Some(Ok(item)) => acc = f(acc, item),
Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
Ok(()) => return Err(error),
Err(status) => return Err(status),
},
None => {
return terminal_consumer_status(&materializer, &cancelled).map(|()| acc);
}
}
}
}
}
}
fn run_fold_result_terminal<In, Acc, F>(
input: BoxStream<In>,
materializer: Materializer,
cancelled: Arc<AtomicBool>,
hints: SourceRuntimeHints,
zero: Acc,
f: &F,
) -> StreamResult<Acc>
where
In: Send + 'static,
Acc: Send + 'static,
F: Fn(Acc, In) -> StreamResult<Acc>,
{
if !hints.terminal_consumer_batch {
let input = runtime_checked_stream(
input,
Arc::clone(&materializer.inner.state),
Some(cancelled),
);
let mut acc = zero;
for item in input {
acc = f(acc, item?)?;
}
return Ok(acc);
}
let mut input = input;
let mut acc = Some(zero);
loop {
terminal_consumer_status(&materializer, &cancelled)?;
{
let _cancel_scope = set_current_stream_cancelled(&cancelled);
for _ in 0..TERMINAL_CONSUMER_BATCH {
match input.next() {
Some(Ok(item)) => {
let previous = acc.take().expect("fold accumulator present");
match f(previous, item) {
Ok(next) => acc = Some(next),
Err(error) => {
return match terminal_consumer_status(&materializer, &cancelled) {
Ok(()) => Err(error),
Err(status) => Err(status),
};
}
}
}
Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
Ok(()) => return Err(error),
Err(status) => return Err(status),
},
None => {
return terminal_consumer_status(&materializer, &cancelled)
.map(|()| acc.expect("fold accumulator present"));
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Source, StreamCompletion};
use std::time::Instant;
fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if condition() {
return true;
}
thread::sleep(Duration::from_millis(5));
}
condition()
}
#[test]
fn map_materialized_value_on_deferred_sink_does_not_explode() {
let sink = Sink::<u64, _>::setup(|_, _| Sink::fold(0u64, |acc, x| acc + x));
let sink = sink
.map_materialized_value(|sc: StreamCompletion<u64>| sc)
.map_materialized_value(|sc| sc);
let sum = Source::from_iter(1u64..=3)
.run_with(sink)
.unwrap()
.wait()
.unwrap();
assert_eq!(sum, 6u64);
}
#[test]
fn batched_terminal_fold_observes_completion_drop_cancellation() {
let materializer = Materializer::new();
let completion = Source::repeat(1_u64)
.run_with_materializer(Sink::fold(0_u64, |acc, item| acc + item), &materializer)
.expect("fold terminal materializes");
assert!(wait_until(Duration::from_secs(1), || {
materializer.active_streams() == 1
}));
drop(completion);
assert!(wait_until(Duration::from_secs(5), || {
materializer.active_streams() == 0
}));
}
}