use super::*;
use crate::Attributes;
use crate::context::FlowWithContext;
use crate::stream::error::{decide_supervision, panic_stream_error};
use futures::{FutureExt, task::noop_waker};
use std::any::Any;
use std::task::Context;
use std::{
collections::{HashMap, HashSet},
thread,
};
pub(super) enum FlowTransform<In, Out> {
Pure(PureTransform<In, Out>),
Runtime(RuntimeTransform<In, Out>),
}
pub struct Flow<In, Out, Mat = NotUsed> {
pub(super) transform: FlowTransform<In, Out>,
pub(super) materialize: Arc<dyn Fn() -> StreamResult<Mat> + Send + Sync>,
pub(super) hints: FlowHints,
pub(super) attributes: Attributes,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(super) enum GroupByBatchMode {
Immediate,
FiniteEagerNoRecreate,
}
#[derive(Clone)]
pub struct BidiFlow<I1, O1, I2, O2> {
top: Flow<I1, O1, NotUsed>,
bottom: Flow<I2, O2, NotUsed>,
attributes: Attributes,
}
impl<In, Out> Clone for FlowTransform<In, Out> {
fn clone(&self) -> Self {
match self {
Self::Pure(transform) => Self::Pure(Arc::clone(transform)),
Self::Runtime(transform) => Self::Runtime(Arc::clone(transform)),
}
}
}
impl<In, Out, Mat> Clone for Flow<In, Out, Mat> {
fn clone(&self) -> Self {
Self {
transform: self.transform.clone(),
materialize: Arc::clone(&self.materialize),
hints: self.hints,
attributes: self.attributes.clone(),
}
}
}
fn call_supervised<T, F>(context: &str, f: F) -> StreamResult<T>
where
F: FnOnce() -> StreamResult<T>,
{
catch_unwind(AssertUnwindSafe(f)).unwrap_or_else(|_| Err(panic_stream_error(context)))
}
impl<T: Send + 'static> Flow<T, T, NotUsed> {
#[must_use]
pub fn identity() -> Self {
Self::from_preserving_transform(|input| input)
}
}
impl<In: Send + 'static, Out: Send + 'static> Flow<In, Out, NotUsed> {
pub(crate) fn from_transform<F>(transform: F) -> Self
where
F: Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync + 'static,
{
Self::from_parts_with_hints(transform, || Ok(NotUsed), FlowHints::default())
}
pub(crate) fn from_preserving_transform<F>(transform: F) -> Self
where
F: Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync + 'static,
{
Self::from_parts_with_hints(
transform,
|| Ok(NotUsed),
FlowHints::PRESERVES_INLINE_HEAD_TERMINAL,
)
}
pub(crate) fn from_runtime_transform<F>(transform: F) -> Self
where
F: Fn(BoxStream<In>, &Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync + 'static,
{
Self::from_runtime_transform_with_hints(transform, FlowHints::default())
}
fn from_runtime_transform_with_hints<F>(transform: F, hints: FlowHints) -> Self
where
F: Fn(BoxStream<In>, &Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync + 'static,
{
Self {
transform: FlowTransform::Runtime(Arc::new(transform)),
materialize: Arc::new(|| Ok(NotUsed)),
hints,
attributes: Attributes::default(),
}
}
#[must_use]
pub fn from_sink_and_source<InMat, OutMat>(
sink: Sink<In, InMat>,
source: Source<Out, OutMat>,
) -> Self
where
InMat: Send + 'static,
OutMat: Send + 'static,
{
Self::from_runtime_transform(move |input, materializer| {
let sink_keepalive = Arc::new(MaterializedKeepalive::default());
let sink_input = Box::new(InputKeepaliveStream {
inner: input,
keepalive: Arc::clone(&sink_keepalive),
peer_keepalive: None,
});
let sink_mat = sink.clone().run(sink_input, materializer)?;
sink_keepalive.store(Box::new(sink_mat));
let (output, source_mat) = Arc::clone(&source.factory).create(materializer)?;
let source_keepalive = Arc::new(MaterializedKeepalive::default());
source_keepalive.store(Box::new(source_mat));
Ok(Box::new(CoupledStream {
inner: output,
source_keepalive,
sink_keepalive: None,
coupled: false,
}))
})
}
#[must_use]
pub fn from_sink_and_source_coupled<InMat, OutMat>(
sink: Sink<In, InMat>,
source: Source<Out, OutMat>,
) -> Self
where
InMat: Send + 'static,
OutMat: Send + 'static,
{
Self::from_runtime_transform(move |input, materializer| {
let source_keepalive = Arc::new(MaterializedKeepalive::default());
let sink_keepalive = Arc::new(MaterializedKeepalive::default());
let sink_input = Box::new(InputKeepaliveStream {
inner: input,
keepalive: Arc::clone(&sink_keepalive),
peer_keepalive: Some(Arc::clone(&source_keepalive)),
});
let sink_mat = sink.clone().run(sink_input, materializer)?;
sink_keepalive.store(Box::new(sink_mat));
let (output, source_mat) = Arc::clone(&source.factory).create(materializer)?;
source_keepalive.store(Box::new(source_mat));
Ok(Box::new(CoupledStream {
inner: output,
source_keepalive,
sink_keepalive: Some(sink_keepalive),
coupled: true,
}))
})
}
#[must_use]
pub fn future_flow<InnerMat, F, Fut>(future: F) -> Flow<In, Out, StreamCompletion<InnerMat>>
where
InnerMat: Send + 'static,
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
{
let future = Arc::new(future);
Flow::from_runtime_materialized_factory(move || {
let (sender, receiver) = oneshot::channel();
let sender = Arc::new(Mutex::new(Some(sender)));
let future = Arc::clone(&future);
let transform: RuntimeTransform<In, Out> =
Arc::new(move |input, materializer: &Materializer| {
let mat_sender = sender
.lock()
.expect("future_flow materialized sender poisoned")
.take()
.ok_or_else(|| {
StreamError::Failed("future_flow transform already materialized".into())
})?;
Ok(Box::new(FutureFlowStream {
future: Arc::clone(&future),
materializer: materializer
.with_name_prefix(materializer.name_prefix().to_owned()),
input: Some(input),
current: None,
mat_sender: Some(mat_sender),
initialized: false,
terminated: false,
_marker: PhantomData,
}) as BoxStream<Out>)
});
(transform, StreamCompletion::from_receiver(receiver, None))
})
}
#[must_use]
pub fn lazy_flow<InnerMat, F>(create: F) -> Flow<In, Out, StreamCompletion<InnerMat>>
where
InnerMat: Send + 'static,
F: Fn() -> Flow<In, Out, InnerMat> + Send + Sync + 'static,
{
let create = Arc::new(create);
Self::lazy_future_flow(move || {
let create = Arc::clone(&create);
async move { catch_unwind_failed("lazy_flow factory", || create()) }
})
}
#[must_use]
pub fn lazy_future_flow<InnerMat, F, Fut>(
create: F,
) -> Flow<In, Out, StreamCompletion<InnerMat>>
where
InnerMat: Send + 'static,
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
{
let create = Arc::new(create);
Flow::from_runtime_materialized_factory(move || {
let (sender, receiver) = oneshot::channel();
let sender = Arc::new(Mutex::new(Some(sender)));
let create = Arc::clone(&create);
let transform: RuntimeTransform<In, Out> =
Arc::new(move |input, materializer: &Materializer| {
let mat_sender = sender
.lock()
.expect("lazy_future_flow materialized sender poisoned")
.take()
.ok_or_else(|| {
StreamError::Failed(
"lazy_future_flow transform already materialized".into(),
)
})?;
Ok(Box::new(LazyFutureFlowStream {
create: Arc::clone(&create),
materializer: materializer
.with_name_prefix(materializer.name_prefix().to_owned()),
input: Some(input),
current: None,
mat_sender: Some(mat_sender),
initialized: false,
terminated: false,
_marker: PhantomData,
}) as BoxStream<Out>)
});
(transform, StreamCompletion::from_receiver(receiver, None))
})
}
}
#[derive(Default)]
struct MaterializedKeepalive {
released: AtomicBool,
value: Mutex<Option<Box<dyn Any + Send>>>,
}
impl MaterializedKeepalive {
fn store(&self, value: Box<dyn Any + Send>) {
let mut slot = self.value.lock().expect("materialized keepalive poisoned");
if !self.released.load(Ordering::SeqCst) {
*slot = Some(value);
return;
}
drop(slot);
release_materialized_value(value);
}
fn release(&self) {
self.released.store(true, Ordering::SeqCst);
if let Some(value) = self
.value
.lock()
.expect("materialized keepalive poisoned")
.take()
{
release_materialized_value(value);
}
}
fn is_released(&self) -> bool {
self.released.load(Ordering::SeqCst)
}
}
fn release_materialized_value(value: Box<dyn Any + Send>) {
match value.downcast::<Cancellable>() {
Ok(cancellable) => {
cancellable.cancel();
}
Err(value) => drop(value),
}
}
struct InputKeepaliveStream<In> {
inner: BoxStream<In>,
keepalive: Arc<MaterializedKeepalive>,
peer_keepalive: Option<Arc<MaterializedKeepalive>>,
}
impl<In> Iterator for InputKeepaliveStream<In> {
type Item = StreamResult<In>;
fn next(&mut self) -> Option<Self::Item> {
self.inner.next()
}
}
impl<In> Drop for InputKeepaliveStream<In> {
fn drop(&mut self) {
self.keepalive.release();
if let Some(peer_keepalive) = &self.peer_keepalive {
peer_keepalive.release();
}
}
}
struct CoupledStream<Out> {
inner: BoxStream<Out>,
source_keepalive: Arc<MaterializedKeepalive>,
sink_keepalive: Option<Arc<MaterializedKeepalive>>,
coupled: bool,
}
impl<Out> Iterator for CoupledStream<Out> {
type Item = StreamResult<Out>;
fn next(&mut self) -> Option<Self::Item> {
if self.coupled && self.source_keepalive.is_released() {
return None;
}
let next = self.inner.next();
if next.is_none() || next.as_ref().is_some_and(|item| item.is_err()) {
self.source_keepalive.release();
if self.coupled
&& let Some(sink_keepalive) = &self.sink_keepalive
{
sink_keepalive.release();
}
}
next
}
}
impl<Out> Drop for CoupledStream<Out> {
fn drop(&mut self) {
self.source_keepalive.release();
if self.coupled
&& let Some(sink_keepalive) = &self.sink_keepalive
{
sink_keepalive.release();
}
}
}
impl<In: Send + 'static, Out: Send + 'static, Mat: Send + 'static> Flow<In, Out, Mat> {
pub fn as_flow_with_context<U, CtxIn, CtxOut, Collapse, Extract>(
self,
collapse_context: Collapse,
extract_context: Extract,
) -> FlowWithContext<U, CtxIn, Out, CtxOut, Mat>
where
U: Send + 'static,
CtxIn: Send + 'static,
CtxOut: Send + 'static,
Collapse: Fn(U, CtxIn) -> In + Send + Sync + 'static,
Extract: Fn(&Out) -> CtxOut + Send + Sync + 'static,
{
FlowWithContext::from_flow(
Flow::identity()
.map(move |(value, context)| collapse_context(value, context))
.via_mat(self, |_, flow_mat| flow_mat)
.map(move |value| {
let context = extract_context(&value);
(value, context)
}),
)
}
pub(crate) fn from_parts<F, M>(transform: F, materialize: M) -> Self
where
F: Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync + 'static,
M: Fn() -> StreamResult<Mat> + Send + Sync + 'static,
{
Self::from_parts_with_hints(transform, materialize, FlowHints::default())
}
pub(crate) fn from_materialized_factory<F>(factory: F) -> Self
where
F: Fn() -> (PureTransform<In, Out>, Mat) + Send + Sync + 'static,
{
struct PendingSlot<In, Out, Mat> {
transform: Option<PureTransform<In, Out>>,
mat: Option<Mat>,
transform_taken: bool,
mat_taken: bool,
}
let pending = Arc::new(Mutex::new(HashMap::<
thread::ThreadId,
Vec<PendingSlot<In, Out, Mat>>,
>::new()));
let factory = Arc::new(factory);
let transform = {
let pending = Arc::clone(&pending);
let factory = Arc::clone(&factory);
move |input| {
let transform = {
let mut pending = pending.lock().expect("flow materialized factory poisoned");
let thread_id = thread::current().id();
let slots = pending.entry(thread_id).or_default();
let index = slots
.iter()
.position(|slot| !slot.transform_taken && slot.mat_taken)
.unwrap_or_else(|| {
let (transform, mat) = factory();
slots.push(PendingSlot {
transform: Some(transform),
mat: Some(mat),
transform_taken: false,
mat_taken: false,
});
slots.len() - 1
});
let slot = slots
.get_mut(index)
.expect("pending flow materialization slot exists");
slot.transform_taken = true;
let transform = slot
.transform
.take()
.expect("pending flow transform present");
if slot.transform_taken && slot.mat_taken {
slots.remove(index);
}
if slots.is_empty() {
pending.remove(&thread_id);
}
transform
};
transform(input)
}
};
let materialize = {
let pending = Arc::clone(&pending);
let factory = Arc::clone(&factory);
move || {
let mat = {
let mut pending = pending.lock().expect("flow materialized factory poisoned");
let thread_id = thread::current().id();
let slots = pending.entry(thread_id).or_default();
let index = slots
.iter()
.position(|slot| !slot.mat_taken && slot.transform_taken)
.unwrap_or_else(|| {
let (transform, mat) = factory();
slots.push(PendingSlot {
transform: Some(transform),
mat: Some(mat),
transform_taken: false,
mat_taken: false,
});
slots.len() - 1
});
let slot = slots
.get_mut(index)
.expect("pending flow materialization slot exists");
slot.mat_taken = true;
let mat = slot
.mat
.take()
.expect("pending flow materialized value present");
if slot.transform_taken && slot.mat_taken {
slots.remove(index);
}
if slots.is_empty() {
pending.remove(&thread_id);
}
mat
};
Ok(mat)
}
};
Self::from_parts_with_hints(transform, materialize, FlowHints::default())
}
pub(crate) fn from_runtime_materialized_factory<F>(factory: F) -> Self
where
F: Fn() -> (RuntimeTransform<In, Out>, Mat) + Send + Sync + 'static,
{
struct PendingSlot<In, Out, Mat> {
transform: Option<RuntimeTransform<In, Out>>,
mat: Option<Mat>,
transform_taken: bool,
mat_taken: bool,
}
let pending = Arc::new(Mutex::new(HashMap::<
thread::ThreadId,
Vec<PendingSlot<In, Out, Mat>>,
>::new()));
let factory = Arc::new(factory);
let transform = {
let pending = Arc::clone(&pending);
let factory = Arc::clone(&factory);
move |input, materializer: &Materializer| {
let thread_id = thread::current().id();
let transform = {
let mut pending = pending.lock().expect("flow materialized factory poisoned");
let slots = pending.entry(thread_id).or_default();
if let Some(index) = slots
.iter()
.position(|slot| !slot.transform_taken && slot.mat_taken)
{
let slot = slots
.get_mut(index)
.expect("pending flow materialization slot exists");
slot.transform_taken = true;
let transform = slot
.transform
.take()
.expect("pending flow transform present");
if slot.transform_taken && slot.mat_taken {
slots.remove(index);
}
if slots.is_empty() {
pending.remove(&thread_id);
}
Some(transform)
} else {
None
}
};
let transform = match transform {
Some(transform) => transform,
None => {
let (transform, mat) = factory();
let mut pending =
pending.lock().expect("flow materialized factory poisoned");
let slots = pending.entry(thread_id).or_default();
slots.push(PendingSlot {
transform: None,
mat: Some(mat),
transform_taken: true,
mat_taken: false,
});
transform
}
};
transform(input, materializer)
}
};
let materialize = {
let pending = Arc::clone(&pending);
let factory = Arc::clone(&factory);
move || {
let thread_id = thread::current().id();
let mat = {
let mut pending = pending.lock().expect("flow materialized factory poisoned");
let slots = pending.entry(thread_id).or_default();
if let Some(index) = slots
.iter()
.position(|slot| !slot.mat_taken && slot.transform_taken)
{
let slot = slots
.get_mut(index)
.expect("pending flow materialization slot exists");
slot.mat_taken = true;
let mat = slot
.mat
.take()
.expect("pending flow materialized value present");
if slot.transform_taken && slot.mat_taken {
slots.remove(index);
}
if slots.is_empty() {
pending.remove(&thread_id);
}
Some(mat)
} else {
None
}
};
match mat {
Some(mat) => Ok(mat),
None => {
let (transform, mat) = factory();
let mut pending =
pending.lock().expect("flow materialized factory poisoned");
let slots = pending.entry(thread_id).or_default();
slots.push(PendingSlot {
transform: Some(transform),
mat: None,
transform_taken: false,
mat_taken: true,
});
Ok(mat)
}
}
}
};
Flow {
transform: FlowTransform::Runtime(Arc::new(transform)),
materialize: Arc::new(materialize),
hints: FlowHints::default(),
attributes: Attributes::default(),
}
}
fn from_parts_with_hints<F, M>(transform: F, materialize: M, hints: FlowHints) -> Self
where
F: Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync + 'static,
M: Fn() -> StreamResult<Mat> + Send + Sync + 'static,
{
Self {
transform: FlowTransform::Pure(Arc::new(transform)),
materialize: Arc::new(materialize),
hints,
attributes: Attributes::default(),
}
}
#[must_use]
pub fn attributes(&self) -> &Attributes {
&self.attributes
}
#[must_use]
pub fn with_attributes(mut self, attributes: Attributes) -> Self {
self.attributes = attributes;
self
}
#[must_use]
pub fn add_attributes(mut self, attributes: Attributes) -> Self {
self.attributes = self.attributes.and(attributes);
self
}
#[must_use]
pub fn named(self, name: impl Into<String>) -> Self {
self.add_attributes(Attributes::named(name))
}
#[must_use]
pub fn async_boundary(self) -> Self {
self.async_boundary_with_config(crate::graph::AsyncBoundaryExecutionConfig::default())
}
#[must_use]
pub fn r#async(self) -> Self {
self.async_boundary()
}
#[must_use]
pub fn async_boundary_with_config(
self,
config: crate::graph::AsyncBoundaryExecutionConfig,
) -> Self {
self.via(Flow::from_runtime_transform(move |input, materializer| {
super::async_boundary::linear_async_boundary_stream(input, materializer, config)
}))
}
#[must_use]
pub fn async_boundary_with_buffer(self, buffer_size: usize) -> Self {
self.async_boundary_with_config(crate::graph::AsyncBoundaryExecutionConfig {
buffer_size,
..crate::graph::AsyncBoundaryExecutionConfig::default()
})
}
#[must_use]
pub fn via<Next, NextMat>(self, next: Flow<Out, Next, NextMat>) -> Flow<In, Next, Mat>
where
Next: Send + 'static,
NextMat: Send + 'static,
{
self.via_mat(next, Keep::left)
}
#[must_use]
pub fn via_mat<Next, NextMat, Combined, F>(
self,
next: Flow<Out, Next, NextMat>,
combine: F,
) -> Flow<In, Next, Combined>
where
Next: Send + 'static,
NextMat: Send + 'static,
Combined: Send + 'static,
F: Fn(Mat, NextMat) -> Combined + Send + Sync + 'static,
{
let Flow {
transform: first,
materialize: materialize_first,
hints: first_hints,
attributes: first_attributes,
} = self;
let Flow {
transform: second,
materialize: materialize_second,
hints: second_hints,
attributes: second_attributes,
} = next;
let combine = Arc::new(combine);
match (first, second) {
(FlowTransform::Pure(first), FlowTransform::Pure(second)) => {
let hints = first_hints.then(second_hints);
Flow::from_parts_with_hints(
move |input| second(first(input)),
move || {
let left = materialize_first()?;
let right = materialize_second()?;
Ok(combine(left, right))
},
hints,
)
.with_attributes(first_attributes.and(second_attributes))
}
(first, second) => {
let hints = first_hints.then(second_hints);
Flow {
transform: FlowTransform::Runtime(Arc::new(move |input, materializer| {
let stream = match &first {
FlowTransform::Pure(first) => first(input),
FlowTransform::Runtime(first) => first(input, materializer)?,
};
match &second {
FlowTransform::Pure(second) => Ok(second(stream)),
FlowTransform::Runtime(second) => second(stream, materializer),
}
})),
materialize: Arc::new(move || {
let left = materialize_first()?;
let right = materialize_second()?;
Ok(combine(left, right))
}),
hints,
attributes: first_attributes.and(second_attributes),
}
}
}
}
#[must_use]
pub fn via_mat_with<Next, NextMat, Combined, F>(
self,
next: Flow<Out, Next, NextMat>,
combine: F,
) -> Flow<In, Next, Combined>
where
Next: Send + 'static,
NextMat: Send + 'static,
Combined: Send + 'static,
F: Fn(Mat, NextMat) -> Combined + Send + Sync + 'static,
{
self.via_mat(next, combine)
}
#[must_use]
pub fn map<Next, F>(self, f: F) -> Flow<In, Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> Next + Send + Sync + 'static,
{
let stage = Arc::new(f);
match &self.transform {
FlowTransform::Pure(_) => {
let Flow {
transform,
materialize,
hints,
attributes,
} = self;
let FlowTransform::Pure(transform) = transform else {
unreachable!("pure transform checked above");
};
Flow::from_parts_with_hints(
move |input| {
let stage = Arc::clone(&stage);
Box::new(transform(input).map(move |item| item.map(|item| stage(item))))
},
move || materialize(),
hints,
)
.with_attributes(attributes)
}
FlowTransform::Runtime(_) => self.via(Flow::from_transform(move |input| {
let stage = Arc::clone(&stage);
Box::new(input.map(move |item| item.map(|item| stage(item))))
})),
}
}
#[must_use]
pub fn map_result<Next, F>(self, f: F) -> Flow<In, Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |input| {
let stage = Arc::clone(&stage);
Box::new(input.map(move |item| item.and_then(|item| stage(item))))
}))
}
#[must_use]
pub fn map_result_with_supervision<Next, F>(
self,
f: F,
decider: SupervisionDecider,
) -> Flow<In, Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |input| {
let stage = Arc::clone(&stage);
let decider = Arc::clone(&decider);
Box::new(input.filter_map(move |item| match item {
Ok(item) => match call_supervised("map_result callback", || stage(item)) {
Ok(next) => Some(Ok(next)),
Err(error) => match decide_supervision(&decider, &error) {
SupervisionDirective::Stop => Some(Err(error)),
SupervisionDirective::Resume | SupervisionDirective::Restart => None,
},
},
Err(error) => Some(Err(error)),
}))
}))
}
#[must_use]
pub fn filter<F>(self, predicate: F) -> Flow<In, Out, Mat>
where
F: Fn(&Out) -> bool + Send + Sync + 'static,
{
let predicate = Arc::new(predicate);
self.via(Flow::from_preserving_transform(move |input| {
let predicate = Arc::clone(&predicate);
Box::new(input.filter_map(move |item| match item {
Ok(item) if predicate(&item) => Some(Ok(item)),
Ok(_) => None,
Err(error) => Some(Err(error)),
}))
}))
}
#[must_use]
pub fn filter_result<F>(self, predicate: F) -> Flow<In, Out, Mat>
where
F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
{
let predicate = Arc::new(predicate);
self.via(Flow::from_transform(move |input| {
let predicate = Arc::clone(&predicate);
Box::new(input.filter_map(move |item| match item {
Ok(item) => match predicate(&item) {
Ok(true) => Some(Ok(item)),
Ok(false) => None,
Err(error) => Some(Err(error)),
},
Err(error) => Some(Err(error)),
}))
}))
}
#[must_use]
pub fn filter_result_with_supervision<F>(
self,
predicate: F,
decider: SupervisionDecider,
) -> Flow<In, Out, Mat>
where
F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
{
let predicate = Arc::new(predicate);
self.via(Flow::from_transform(move |input| {
let predicate = Arc::clone(&predicate);
let decider = Arc::clone(&decider);
Box::new(input.filter_map(move |item| match item {
Ok(item) => match call_supervised("filter_result callback", || predicate(&item)) {
Ok(true) => Some(Ok(item)),
Ok(false) => None,
Err(error) => match decide_supervision(&decider, &error) {
SupervisionDirective::Stop => Some(Err(error)),
SupervisionDirective::Resume | SupervisionDirective::Restart => None,
},
},
Err(error) => Some(Err(error)),
}))
}))
}
#[must_use]
pub fn filter_not<F>(self, predicate: F) -> Flow<In, Out, Mat>
where
F: Fn(&Out) -> bool + Send + Sync + 'static,
{
let predicate = Arc::new(predicate);
self.filter(move |item| !predicate(item))
}
#[must_use]
pub fn filter_map<Next, F>(self, f: F) -> Flow<In, Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> Option<Next> + Send + Sync + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |input| {
let stage = Arc::clone(&stage);
Box::new(input.filter_map(move |item| match item {
Ok(item) => stage(item).map(Ok),
Err(error) => Some(Err(error)),
}))
}))
}
#[must_use]
pub fn filter_map_result<Next, F>(self, f: F) -> Flow<In, Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |input| {
let stage = Arc::clone(&stage);
Box::new(input.filter_map(move |item| match item {
Ok(item) => match stage(item) {
Ok(Some(next)) => Some(Ok(next)),
Ok(None) => None,
Err(error) => Some(Err(error)),
},
Err(error) => Some(Err(error)),
}))
}))
}
#[must_use]
pub fn filter_map_result_with_supervision<Next, F>(
self,
f: F,
decider: SupervisionDecider,
) -> Flow<In, Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |input| {
let stage = Arc::clone(&stage);
let decider = Arc::clone(&decider);
Box::new(input.filter_map(move |item| match item {
Ok(item) => match call_supervised("filter_map_result callback", || stage(item)) {
Ok(Some(next)) => Some(Ok(next)),
Ok(None) => None,
Err(error) => match decide_supervision(&decider, &error) {
SupervisionDirective::Stop => Some(Err(error)),
SupervisionDirective::Resume | SupervisionDirective::Restart => None,
},
},
Err(error) => Some(Err(error)),
}))
}))
}
#[must_use]
pub fn map_concat<Next, F, I>(self, f: F) -> Flow<In, Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> I + Send + Sync + 'static,
I: IntoIterator<Item = Next>,
I::IntoIter: Send + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |mut input| {
let stage = Arc::clone(&stage);
let mut current = None::<I::IntoIter>;
let mut done = false;
Box::new(std::iter::from_fn(move || {
loop {
if let Some(iter) = &mut current {
if let Some(item) = iter.next() {
return Some(Ok(item));
}
current = None;
}
if done {
return None;
}
match input.next()? {
Ok(item) => current = Some(stage(item).into_iter()),
Err(error) => {
done = true;
return Some(Err(error));
}
}
}
}))
}))
}
#[must_use]
pub fn map_concat_result<Next, F, I>(self, f: F) -> Flow<In, Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
I: IntoIterator<Item = Next>,
I::IntoIter: Send + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |mut input| {
let stage = Arc::clone(&stage);
let mut current = None::<I::IntoIter>;
let mut done = false;
Box::new(std::iter::from_fn(move || {
loop {
if let Some(iter) = &mut current {
if let Some(item) = iter.next() {
return Some(Ok(item));
}
current = None;
}
if done {
return None;
}
match input.next()? {
Ok(item) => match stage(item) {
Ok(items) => current = Some(items.into_iter()),
Err(error) => {
done = true;
return Some(Err(error));
}
},
Err(error) => {
done = true;
return Some(Err(error));
}
}
}
}))
}))
}
#[must_use]
pub fn map_concat_result_with_supervision<Next, F, I>(
self,
f: F,
decider: SupervisionDecider,
) -> Flow<In, Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
I: IntoIterator<Item = Next>,
I::IntoIter: Send + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |mut input| {
let stage = Arc::clone(&stage);
let decider = Arc::clone(&decider);
let mut current = None::<I::IntoIter>;
let mut done = false;
Box::new(std::iter::from_fn(move || {
loop {
if let Some(iter) = &mut current {
if let Some(item) = iter.next() {
return Some(Ok(item));
}
current = None;
}
if done {
return None;
}
match input.next()? {
Ok(item) => {
match call_supervised("map_concat_result callback", || stage(item)) {
Ok(items) => current = Some(items.into_iter()),
Err(error) => match decide_supervision(&decider, &error) {
SupervisionDirective::Stop => {
done = true;
return Some(Err(error));
}
SupervisionDirective::Resume
| SupervisionDirective::Restart => {}
},
}
}
Err(error) => {
done = true;
return Some(Err(error));
}
}
}
}))
}))
}
#[must_use]
pub fn stateful_map<State, Next, F>(self, seed: State, f: F) -> Flow<In, Next, Mat>
where
State: Clone + Send + Sync + 'static,
Next: Send + 'static,
F: Fn(&mut State, Out) -> Next + Send + Sync + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |input| {
let stage = Arc::clone(&stage);
let mut state = seed.clone();
Box::new(input.map(move |item| item.map(|item| stage(&mut state, item))))
}))
}
#[must_use]
pub fn stateful_map_result<State, Next, F>(self, seed: State, f: F) -> Flow<In, Next, Mat>
where
State: Clone + Send + Sync + 'static,
Next: Send + 'static,
F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |input| {
let stage = Arc::clone(&stage);
let mut state = seed.clone();
Box::new(input.map(move |item| match item {
Ok(item) => stage(&mut state, item),
Err(error) => Err(error),
}))
}))
}
#[must_use]
pub fn stateful_map_result_with_supervision<State, Next, F>(
self,
seed: State,
f: F,
decider: SupervisionDecider,
) -> Flow<In, Next, Mat>
where
State: Clone + Send + Sync + 'static,
Next: Send + 'static,
F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |input| {
let stage = Arc::clone(&stage);
let decider = Arc::clone(&decider);
let seed = seed.clone();
let mut state = seed.clone();
Box::new(input.filter_map(move |item| match item {
Ok(item) => match call_supervised("stateful_map_result callback", || {
stage(&mut state, item)
}) {
Ok(next) => Some(Ok(next)),
Err(error) => match decide_supervision(&decider, &error) {
SupervisionDirective::Stop => Some(Err(error)),
SupervisionDirective::Resume => None,
SupervisionDirective::Restart => {
state = seed.clone();
None
}
},
},
Err(error) => Some(Err(error)),
}))
}))
}
#[must_use]
pub fn stateful_map_concat<State, Next, F, I>(self, seed: State, f: F) -> Flow<In, Next, Mat>
where
State: Clone + Send + Sync + 'static,
Next: Send + 'static,
F: Fn(&mut State, Out) -> I + Send + Sync + 'static,
I: IntoIterator<Item = Next>,
I::IntoIter: Send + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |mut input| {
let stage = Arc::clone(&stage);
let mut state = seed.clone();
let mut current = None::<I::IntoIter>;
let mut done = false;
Box::new(std::iter::from_fn(move || {
loop {
if let Some(iter) = &mut current {
if let Some(item) = iter.next() {
return Some(Ok(item));
}
current = None;
}
if done {
return None;
}
match input.next()? {
Ok(item) => current = Some(stage(&mut state, item).into_iter()),
Err(error) => {
done = true;
return Some(Err(error));
}
}
}
}))
}))
}
#[must_use]
pub fn stateful_map_concat_result<State, Next, F, I>(
self,
seed: State,
f: F,
) -> Flow<In, Next, Mat>
where
State: Clone + Send + Sync + 'static,
Next: Send + 'static,
F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
I: IntoIterator<Item = Next>,
I::IntoIter: Send + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |mut input| {
let stage = Arc::clone(&stage);
let mut state = seed.clone();
let mut current = None::<I::IntoIter>;
let mut done = false;
Box::new(std::iter::from_fn(move || {
loop {
if let Some(iter) = &mut current {
if let Some(item) = iter.next() {
return Some(Ok(item));
}
current = None;
}
if done {
return None;
}
match input.next()? {
Ok(item) => match stage(&mut state, item) {
Ok(items) => current = Some(items.into_iter()),
Err(error) => {
done = true;
return Some(Err(error));
}
},
Err(error) => {
done = true;
return Some(Err(error));
}
}
}
}))
}))
}
#[must_use]
pub fn stateful_map_concat_result_with_supervision<State, Next, F, I>(
self,
seed: State,
f: F,
decider: SupervisionDecider,
) -> Flow<In, Next, Mat>
where
State: Clone + Send + Sync + 'static,
Next: Send + 'static,
F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
I: IntoIterator<Item = Next>,
I::IntoIter: Send + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |mut input| {
let stage = Arc::clone(&stage);
let decider = Arc::clone(&decider);
let seed = seed.clone();
let mut state = seed.clone();
let mut current = None::<I::IntoIter>;
let mut done = false;
Box::new(std::iter::from_fn(move || {
loop {
if let Some(iter) = &mut current {
if let Some(item) = iter.next() {
return Some(Ok(item));
}
current = None;
}
if done {
return None;
}
match input.next()? {
Ok(item) => {
match call_supervised("stateful_map_concat_result callback", || {
stage(&mut state, item)
}) {
Ok(items) => current = Some(items.into_iter()),
Err(error) => match decide_supervision(&decider, &error) {
SupervisionDirective::Stop => {
done = true;
return Some(Err(error));
}
SupervisionDirective::Resume => {}
SupervisionDirective::Restart => state = seed.clone(),
},
}
}
Err(error) => {
done = true;
return Some(Err(error));
}
}
}
}))
}))
}
#[must_use]
pub fn map_async<Next, F, Fut>(self, parallelism: usize, f: F) -> Flow<In, Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Next>> + Send + 'static,
{
assert!(
parallelism > 0,
"map_async parallelism must be greater than zero"
);
let stage = Arc::new(f);
self.via(Flow::from_runtime_transform_with_hints(
move |input, _materializer| {
let stage = Arc::clone(&stage);
Ok(map_async_ordered(input, parallelism, stage))
},
FlowHints::PRESERVES_TERMINAL_CONSUMER_BATCH,
))
}
#[must_use]
pub fn map_async_with_supervision<Next, F, Fut>(
self,
parallelism: usize,
f: F,
decider: SupervisionDecider,
) -> Flow<In, Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Next>> + Send + 'static,
{
assert!(
parallelism > 0,
"map_async parallelism must be greater than zero"
);
let stage = Arc::new(f);
self.via(Flow::from_runtime_transform(move |input, _materializer| {
let stage = Arc::clone(&stage);
let decider = Arc::clone(&decider);
Ok(map_async_ordered_supervised(
input,
parallelism,
stage,
decider,
))
}))
}
#[must_use]
pub fn map_async_unordered<Next, F, Fut>(self, parallelism: usize, f: F) -> Flow<In, Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Next>> + Send + 'static,
{
assert!(
parallelism > 0,
"map_async_unordered parallelism must be greater than zero"
);
let stage = Arc::new(f);
self.via(Flow::from_runtime_transform_with_hints(
move |input, _materializer| {
let stage = Arc::clone(&stage);
Ok(map_async_unordered(input, parallelism, stage))
},
FlowHints::PRESERVES_TERMINAL_CONSUMER_BATCH,
))
}
#[must_use]
pub fn map_async_unordered_with_supervision<Next, F, Fut>(
self,
parallelism: usize,
f: F,
decider: SupervisionDecider,
) -> Flow<In, Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Next>> + Send + 'static,
{
assert!(
parallelism > 0,
"map_async_unordered parallelism must be greater than zero"
);
let stage = Arc::new(f);
self.via(Flow::from_runtime_transform(move |input, _materializer| {
let stage = Arc::clone(&stage);
let decider = Arc::clone(&decider);
Ok(map_async_unordered_supervised(
input,
parallelism,
stage,
decider,
))
}))
}
#[must_use]
pub fn map_async_partitioned<Key, Next, Partition, F, Fut>(
self,
parallelism: usize,
per_partition: usize,
partition: Partition,
f: F,
) -> Flow<In, Next, Mat>
where
Key: Clone + Eq + Hash + Send + 'static,
Next: Send + 'static,
Partition: Fn(&Out) -> Key + Send + Sync + 'static,
F: Fn(Out) -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Next>> + Send + 'static,
{
assert!(
parallelism > 0,
"map_async_partitioned parallelism must be greater than zero"
);
assert!(
per_partition > 0,
"map_async_partitioned per_partition must be greater than zero"
);
let partition = Arc::new(partition);
let stage = Arc::new(f);
self.via(Flow::from_runtime_transform_with_hints(
move |input, _materializer| {
let partition = Arc::clone(&partition);
let stage = Arc::clone(&stage);
Ok(map_async_partitioned(
input,
parallelism,
per_partition,
partition,
stage,
))
},
FlowHints::PRESERVES_TERMINAL_CONSUMER_BATCH,
))
}
#[must_use]
pub fn take(self, n: usize) -> Flow<In, Out, Mat> {
self.via(Flow::from_transform(move |input| Box::new(input.take(n))))
}
#[must_use]
pub fn drop(self, n: usize) -> Flow<In, Out, Mat> {
self.via(Flow::from_transform(move |input| {
let mut remaining = n;
Box::new(input.filter_map(move |item| match item {
Ok(_) if remaining > 0 => {
remaining -= 1;
None
}
other => Some(other),
}))
}))
}
#[must_use]
pub fn take_while<F>(self, predicate: F) -> Flow<In, Out, Mat>
where
F: Fn(&Out) -> bool + Send + Sync + 'static,
{
let predicate = Arc::new(predicate);
self.via(Flow::from_transform(move |mut input| {
let predicate = Arc::clone(&predicate);
let mut open = true;
Box::new(std::iter::from_fn(move || {
if !open {
return None;
}
match input.next() {
Some(Ok(item)) if predicate(&item) => Some(Ok(item)),
Some(Ok(_)) | None => {
open = false;
None
}
Some(Err(error)) => Some(Err(error)),
}
}))
}))
}
#[must_use]
pub fn drop_while<F>(self, predicate: F) -> Flow<In, Out, Mat>
where
F: Fn(&Out) -> bool + Send + Sync + 'static,
{
let predicate = Arc::new(predicate);
self.via(Flow::from_transform(move |mut input| {
let predicate = Arc::clone(&predicate);
let mut dropping = true;
Box::new(std::iter::from_fn(move || {
loop {
let next = input.next()?;
match next {
Ok(item) if dropping && predicate(&item) => continue,
Ok(item) => {
dropping = false;
return Some(Ok(item));
}
Err(error) => return Some(Err(error)),
}
}
}))
}))
}
#[must_use]
pub fn limit(self, max: u64) -> Flow<In, Out, Mat> {
self.via(Flow::from_transform(move |input| {
let mut seen = 0_u64;
Box::new(input.map(move |item| match item {
Ok(item) if seen < max => {
seen += 1;
Ok(item)
}
Ok(_) => Err(StreamError::LimitExceeded { max }),
Err(error) => Err(error),
}))
}))
}
#[must_use]
pub fn grouped(self, size: usize) -> Flow<In, Vec<Out>, Mat> {
assert!(size > 0, "grouped size must be greater than zero");
self.via(Flow::from_transform(move |mut input| {
Box::new(std::iter::from_fn(move || {
let mut group = Vec::with_capacity(size);
while group.len() < size {
match input.next() {
Some(Ok(item)) => group.push(item),
Some(Err(error)) => return Some(Err(error)),
None => break,
}
}
if group.is_empty() {
None
} else {
Some(Ok(group))
}
}))
}))
}
#[must_use]
pub fn scan<State, F>(self, seed: State, f: F) -> Flow<In, State, Mat>
where
State: Clone + Send + Sync + 'static,
F: Fn(State, Out) -> State + Send + Sync + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |mut input| {
let stage = Arc::clone(&stage);
let mut state = Some(seed.clone());
let mut emit_seed = true;
Box::new(std::iter::from_fn(move || {
if emit_seed {
emit_seed = false;
return Some(Ok(state.as_ref().expect("scan state present").clone()));
}
match input.next()? {
Ok(item) => {
let prev = state.take().expect("scan state present");
let next = stage(prev, item);
state = Some(next.clone());
Some(Ok(next))
}
Err(error) => Some(Err(error)),
}
}))
}))
}
#[must_use]
pub fn scan_async<State, F, Fut>(self, seed: State, f: F) -> Flow<In, State, Mat>
where
State: Clone + Send + Sync + 'static,
F: Fn(State, Out) -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<State>> + Send + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |mut input| {
let stage = Arc::clone(&stage);
let mut state = Some(seed.clone());
let mut emit_seed = true;
let mut terminated = false;
Box::new(std::iter::from_fn(move || {
if terminated {
return None;
}
if emit_seed {
emit_seed = false;
return Some(Ok(state
.as_ref()
.expect("scan_async state present")
.clone()));
}
match input.next()? {
Ok(item) => {
let prev = state.take().expect("scan_async state present");
match catch_unwind_failed("scan_async factory", || stage(prev, item))
.and_then(run_future_inline_or_spawn)
{
Ok(next) => {
state = Some(next.clone());
Some(Ok(next))
}
Err(error) => {
terminated = true;
Some(Err(error))
}
}
}
Err(error) => {
terminated = true;
Some(Err(error))
}
}
}))
}))
}
#[must_use]
pub fn scan_result<State, F>(self, seed: State, f: F) -> Flow<In, State, Mat>
where
State: Clone + Send + Sync + 'static,
F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |mut input| {
let stage = Arc::clone(&stage);
let mut state = Some(seed.clone());
let mut emit_seed = true;
Box::new(std::iter::from_fn(move || {
if emit_seed {
emit_seed = false;
return Some(Ok(state.as_ref().expect("scan state present").clone()));
}
match input.next()? {
Ok(item) => {
let prev = state.take().expect("scan state present");
match stage(prev, item) {
Ok(next) => {
state = Some(next.clone());
Some(Ok(next))
}
Err(error) => Some(Err(error)),
}
}
Err(error) => Some(Err(error)),
}
}))
}))
}
#[must_use]
pub fn scan_result_with_supervision<State, F>(
self,
seed: State,
f: F,
decider: SupervisionDecider,
) -> Flow<In, State, Mat>
where
State: Clone + Send + Sync + 'static,
F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |mut input| {
let stage = Arc::clone(&stage);
let decider = Arc::clone(&decider);
let seed = seed.clone();
let mut state = Some(seed.clone());
let mut emit_seed = true;
Box::new(std::iter::from_fn(move || {
loop {
if emit_seed {
emit_seed = false;
return Some(Ok(state.as_ref().expect("scan state present").clone()));
}
match input.next()? {
Ok(item) => {
let prev = state.take().expect("scan state present");
match call_supervised("scan_result callback", || {
stage(prev.clone(), item)
}) {
Ok(next) => {
state = Some(next.clone());
return Some(Ok(next));
}
Err(error) => match decide_supervision(&decider, &error) {
SupervisionDirective::Stop => return Some(Err(error)),
SupervisionDirective::Resume => {
state = Some(prev);
}
SupervisionDirective::Restart => {
state = Some(seed.clone());
emit_seed = true;
}
},
}
}
Err(error) => return Some(Err(error)),
}
}
}))
}))
}
#[must_use]
pub fn sliding(self, size: usize, step: usize) -> Flow<In, Vec<Out>, Mat>
where
Out: Clone,
{
assert!(size > 0, "sliding size must be greater than zero");
assert!(step > 0, "sliding step must be greater than zero");
self.via(Flow::from_transform(move |mut input| {
let mut buffer = VecDeque::with_capacity(size.max(step));
let mut terminated = false;
Box::new(std::iter::from_fn(move || {
if terminated {
return None;
}
loop {
match input.next() {
Some(Ok(item)) => {
buffer.push_back(item);
if buffer.len() < size {
continue;
} else if buffer.len() == size {
return Some(Ok(buffer.iter().cloned().collect()));
} else if step <= size {
for _ in 0..step {
buffer.pop_front();
}
if buffer.len() == size {
return Some(Ok(buffer.iter().cloned().collect()));
}
} else if buffer.len() == step {
buffer.clear();
}
}
Some(Err(error)) => {
terminated = true;
return Some(Err(error));
}
None => {
terminated = true;
if !buffer.is_empty() && buffer.len() < size {
return Some(Ok(buffer.iter().cloned().collect()));
}
return None;
}
}
}
}))
}))
}
#[must_use]
pub fn fold<Acc, F>(self, zero: Acc, f: F) -> Flow<In, Acc, Mat>
where
Acc: Clone + Send + Sync + 'static,
F: Fn(Acc, Out) -> Acc + Send + Sync + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |input| {
let stage = Arc::clone(&stage);
let mut acc = zero.clone();
for item in input {
match item {
Ok(item) => acc = stage(acc, item),
Err(error) => return Box::new(std::iter::once(Err(error))),
}
}
Box::new(std::iter::once(Ok(acc)))
}))
}
#[must_use]
pub fn fold_async<Acc, F, Fut>(self, zero: Acc, f: F) -> Flow<In, Acc, Mat>
where
Acc: Clone + Send + Sync + 'static,
F: Fn(Acc, Out) -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Acc>> + Send + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |mut input| {
let stage = Arc::clone(&stage);
let mut acc = Some(zero.clone());
let mut done = false;
Box::new(std::iter::from_fn(move || {
if done {
return None;
}
done = true;
for item in input.by_ref() {
let item = match item {
Ok(item) => item,
Err(error) => return Some(Err(error)),
};
let current = acc.take().expect("fold_async accumulator present");
match catch_unwind_failed("fold_async factory", || stage(current, item))
.and_then(run_future_inline_or_spawn)
{
Ok(next) => acc = Some(next),
Err(error) => return Some(Err(error)),
}
}
Some(Ok(acc.take().expect("fold_async accumulator present")))
}))
}))
}
#[must_use]
pub fn map_with_resource<Resource, Next, Create, F, Close>(
self,
create: Create,
f: F,
close: Close,
) -> Flow<In, Next, Mat>
where
Resource: Send + 'static,
Next: Send + 'static,
Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
F: Fn(&mut Resource, Out) -> StreamResult<Next> + Send + Sync + 'static,
Close: Fn(Resource) -> StreamResult<Option<Next>> + Send + Sync + 'static,
{
let create = Arc::new(create);
let stage = Arc::new(f);
let close = Arc::new(close);
self.via(Flow::from_transform(move |input| {
Box::new(MapWithResourceStream {
input,
create: Arc::clone(&create),
stage: Arc::clone(&stage),
close: Arc::clone(&close),
resource: None,
created: false,
pending_terminal: None,
terminated: false,
_marker: PhantomData,
}) as BoxStream<Next>
}))
}
#[must_use]
pub fn fold_result<Acc, F>(self, zero: Acc, f: F) -> Flow<In, Acc, Mat>
where
Acc: Clone + Send + Sync + 'static,
F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |input| {
let stage = Arc::clone(&stage);
let mut acc = zero.clone();
for item in input {
match item {
Ok(item) => match stage(acc, item) {
Ok(next) => acc = next,
Err(error) => return Box::new(std::iter::once(Err(error))),
},
Err(error) => return Box::new(std::iter::once(Err(error))),
}
}
Box::new(std::iter::once(Ok(acc)))
}))
}
#[must_use]
pub fn fold_result_with_supervision<Acc, F>(
self,
zero: Acc,
f: F,
decider: SupervisionDecider,
) -> Flow<In, Acc, Mat>
where
Acc: Clone + Send + Sync + 'static,
F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |input| {
let stage = Arc::clone(&stage);
let decider = Arc::clone(&decider);
let mut acc = zero.clone();
for item in input {
match item {
Ok(item) => {
let previous = acc;
match call_supervised("fold_result callback", || {
stage(previous.clone(), item)
}) {
Ok(next) => acc = next,
Err(error) => match decide_supervision(&decider, &error) {
SupervisionDirective::Stop => {
return Box::new(std::iter::once(Err(error)));
}
SupervisionDirective::Resume => acc = previous,
SupervisionDirective::Restart => acc = zero.clone(),
},
}
}
Err(error) => return Box::new(std::iter::once(Err(error))),
}
}
Box::new(std::iter::once(Ok(acc)))
}))
}
#[must_use]
pub fn reduce<F>(self, f: F) -> Flow<In, Out, Mat>
where
F: Fn(Out, Out) -> Out + Send + Sync + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |mut input| {
let Some(first) = input.next() else {
return Box::new(std::iter::once(Err(StreamError::EmptyStream)));
};
let mut acc = match first {
Ok(item) => item,
Err(error) => return Box::new(std::iter::once(Err(error))),
};
for item in input {
match item {
Ok(item) => acc = stage(acc, item),
Err(error) => return Box::new(std::iter::once(Err(error))),
}
}
Box::new(std::iter::once(Ok(acc)))
}))
}
#[must_use]
pub fn reduce_result<F>(self, f: F) -> Flow<In, Out, Mat>
where
Out: Clone,
F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |mut input| {
let Some(first) = input.next() else {
return Box::new(std::iter::once(Err(StreamError::EmptyStream)));
};
let mut acc = match first {
Ok(item) => item,
Err(error) => return Box::new(std::iter::once(Err(error))),
};
for item in input {
match item {
Ok(item) => match stage(acc, item) {
Ok(next) => acc = next,
Err(error) => return Box::new(std::iter::once(Err(error))),
},
Err(error) => return Box::new(std::iter::once(Err(error))),
}
}
Box::new(std::iter::once(Ok(acc)))
}))
}
#[must_use]
pub fn reduce_result_with_supervision<F>(
self,
f: F,
decider: SupervisionDecider,
) -> Flow<In, Out, Mat>
where
Out: Clone,
F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |input| {
let stage = Arc::clone(&stage);
let decider = Arc::clone(&decider);
let mut acc = None::<Out>;
for item in input {
match item {
Ok(item) => {
let Some(previous) = acc.take() else {
acc = Some(item);
continue;
};
match call_supervised("reduce_result callback", || {
stage(previous.clone(), item)
}) {
Ok(next) => acc = Some(next),
Err(error) => match decide_supervision(&decider, &error) {
SupervisionDirective::Stop => {
return Box::new(std::iter::once(Err(error)));
}
SupervisionDirective::Resume => acc = Some(previous),
SupervisionDirective::Restart => acc = None,
},
}
}
Err(error) => return Box::new(std::iter::once(Err(error))),
}
}
match acc {
Some(acc) => Box::new(std::iter::once(Ok(acc))),
None => Box::new(std::iter::once(Err(StreamError::EmptyStream))),
}
}))
}
#[must_use]
pub fn map_error<F>(self, f: F) -> Flow<In, Out, Mat>
where
F: Fn(StreamError) -> StreamError + Send + Sync + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |input| {
let stage = Arc::clone(&stage);
Box::new(input.map(move |item| item.map_err(|error| stage(error))))
}))
}
#[must_use]
pub fn recover<F>(self, f: F) -> Flow<In, Out, Mat>
where
F: Fn(StreamError) -> Option<Out> + Send + Sync + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |mut input| {
let stage = Arc::clone(&stage);
let mut done = false;
Box::new(std::iter::from_fn(move || {
if done {
return None;
}
match input.next()? {
Ok(item) => Some(Ok(item)),
Err(error) => {
done = true;
match stage(error.clone()) {
Some(item) => Some(Ok(item)),
None => Some(Err(error)),
}
}
}
}))
}))
}
#[must_use]
pub fn recover_with<F>(self, f: F) -> Flow<In, Out, Mat>
where
F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
{
self.recover_with_attempts(None, f)
}
#[must_use]
pub fn recover_with_retries<F>(self, retries: usize, f: F) -> Flow<In, Out, Mat>
where
F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
{
self.recover_with_attempts(Some(retries), f)
}
fn recover_with_attempts<F>(self, attempts: Option<usize>, f: F) -> Flow<In, Out, Mat>
where
F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_runtime_transform(move |input, materializer| {
let replacement_materializer =
materializer.with_name_prefix(materializer.name_prefix().to_owned());
let stage = Arc::clone(&stage);
let mut remaining_retries = attempts;
let mut current = input;
let mut terminated = false;
Ok(Box::new(std::iter::from_fn(move || {
if terminated {
return None;
}
loop {
match current.next() {
Some(Ok(item)) => return Some(Ok(item)),
Some(Err(error)) if remaining_retries != Some(0) => {
if let Some(remaining) = remaining_retries.as_mut() {
*remaining -= 1;
}
match stage(error.clone()) {
Some(source) => match Arc::clone(&source.factory)
.create(&replacement_materializer)
{
Ok((stream, _)) => current = stream,
Err(error) => {
terminated = true;
return Some(Err(error));
}
},
None => {
terminated = true;
return Some(Err(error));
}
}
}
Some(Err(error)) => {
terminated = true;
return Some(Err(error));
}
None => {
terminated = true;
return None;
}
}
}
})) as BoxStream<Out>)
}))
}
#[must_use]
pub fn on_error_complete(self) -> Flow<In, Out, Mat> {
self.via(Flow::from_transform(move |mut input| {
let mut done = false;
Box::new(std::iter::from_fn(move || {
if done {
return None;
}
match input.next()? {
Ok(item) => Some(Ok(item)),
Err(_) => {
done = true;
None
}
}
}))
}))
}
#[must_use]
pub fn prefix_and_tail(self, n: usize) -> Flow<In, (Vec<Out>, Source<Out>), Mat> {
self.via(Flow::from_runtime_transform(move |input, _materializer| {
Ok(prefix_and_tail_stream(input, n))
}))
}
#[must_use]
pub fn flat_map_prefix<Next, NextMat, F>(self, n: usize, f: F) -> Flow<In, Next, Mat>
where
Next: Send + 'static,
NextMat: Send + 'static,
F: Fn(Vec<Out>) -> Flow<Out, Next, NextMat> + Send + Sync + 'static,
Out: Clone,
{
let stage = Arc::new(f);
self.via(Flow::from_runtime_transform(
move |mut input, materializer| {
let mut prefix = Vec::with_capacity(n);
while prefix.len() < n {
match input.next() {
Some(Ok(item)) => prefix.push(item),
Some(Err(error)) => {
return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Next>);
}
None => break,
}
}
let flow = stage(prefix);
let transform = flow.transform;
let _ = (flow.materialize)()?;
match transform {
FlowTransform::Pure(transform) => Ok(transform(input)),
FlowTransform::Runtime(transform) => transform(input, materializer),
}
},
))
}
#[must_use]
pub fn group_by<Key, F>(
self,
max_substreams: usize,
f: F,
allow_closed_substream_recreation: bool,
) -> Flow<In, Source<Out>, Mat>
where
Key: Clone + Eq + Hash + Send + 'static,
F: Fn(&Out) -> Key + Send + Sync + 'static,
Out: Clone,
{
self.group_by_with_batching(
max_substreams,
f,
allow_closed_substream_recreation,
GroupByBatchMode::Immediate,
)
}
pub(super) fn group_by_with_batching<Key, F>(
self,
max_substreams: usize,
f: F,
allow_closed_substream_recreation: bool,
batch_mode: GroupByBatchMode,
) -> Flow<In, Source<Out>, Mat>
where
Key: Clone + Eq + Hash + Send + 'static,
F: Fn(&Out) -> Key + Send + Sync + 'static,
Out: Clone,
{
assert!(
max_substreams > 0,
"group_by max_substreams must be greater than zero"
);
let key_fn = Arc::new(f);
self.via(Flow::from_runtime_transform(move |input, materializer| {
Ok(group_by_stream(
input,
max_substreams,
allow_closed_substream_recreation,
Arc::clone(&key_fn),
batch_mode,
materializer,
))
}))
}
#[must_use]
pub fn split_when<F>(self, predicate: F) -> Flow<In, Source<Out>, Mat>
where
F: Fn(&Out) -> bool + Send + Sync + 'static,
Out: Clone,
{
let predicate = Arc::new(predicate);
self.via(Flow::from_runtime_transform(move |input, materializer| {
Ok(split_streams(
input,
SplitMode::When,
Arc::clone(&predicate),
materializer,
))
}))
}
#[must_use]
pub fn split_after<F>(self, predicate: F) -> Flow<In, Source<Out>, Mat>
where
F: Fn(&Out) -> bool + Send + Sync + 'static,
Out: Clone,
{
let predicate = Arc::new(predicate);
self.via(Flow::from_runtime_transform(move |input, materializer| {
Ok(split_streams(
input,
SplitMode::After,
Arc::clone(&predicate),
materializer,
))
}))
}
#[must_use]
pub fn flat_map_concat<Next, NextMat, F>(self, f: F) -> Flow<In, Next, Mat>
where
Next: Send + 'static,
NextMat: Send + 'static,
F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_runtime_transform(move |input, materializer| {
Ok(flat_map_concat_stream(
input,
Arc::clone(&stage),
materializer,
))
}))
}
#[must_use]
pub fn flat_map_merge<Next, NextMat, F>(self, breadth: usize, f: F) -> Flow<In, Next, Mat>
where
Next: Send + 'static,
NextMat: Send + 'static,
F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
{
assert!(
breadth > 0,
"flat_map_merge breadth must be greater than zero"
);
let stage = Arc::new(f);
self.via(Flow::from_runtime_transform_with_hints(
move |input, materializer| {
Ok(flat_map_merge_stream(
input,
breadth,
Arc::clone(&stage),
materializer,
))
},
FlowHints::PRESERVES_TERMINAL_CONSUMER_BATCH,
))
}
#[must_use]
pub fn concat<Mat2>(self, that: Source<Out, Mat2>) -> Flow<In, Out, Mat>
where
Mat2: Send + 'static,
{
self.concat_sources([that])
}
#[must_use]
pub fn concat_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Flow<In, Out, Mat>
where
Mat2: Send + 'static,
{
let that_factory = that.factory;
self.via(Flow::from_runtime_transform(move |input, materializer| {
let primary = input;
Ok(concat_streams_lazy(
primary,
vec![Arc::clone(&that_factory)],
materializer,
))
}))
}
#[must_use]
pub fn concat_all_lazy<Mat2, I>(self, those: I) -> Flow<In, Out, Mat>
where
Mat2: Send + 'static,
I: IntoIterator<Item = Source<Out, Mat2>>,
{
let source_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
self.via(Flow::from_runtime_transform(move |input, materializer| {
Ok(concat_streams_lazy(
input,
source_factories.clone(),
materializer,
))
}))
}
#[must_use]
pub fn prepend<Mat2>(self, that: Source<Out, Mat2>) -> Flow<In, Out, Mat>
where
Mat2: Send + 'static,
{
self.prepend_sources([that])
}
#[must_use]
pub fn prepend_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Flow<In, Out, Mat>
where
Mat2: Send + 'static,
{
self.prepend(that)
}
#[must_use]
pub fn or_else<Mat2>(self, secondary: Source<Out, Mat2>) -> Flow<In, Out, Mat>
where
Mat2: Send + 'static,
{
let secondary_factory = secondary.factory;
self.via(Flow::from_runtime_transform(move |input, materializer| {
let secondary = match Arc::clone(&secondary_factory).create(materializer) {
Ok((stream, _)) => stream,
Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
};
Ok(or_else_stream(input, secondary))
}))
}
#[must_use]
pub fn interleave<Mat2>(
self,
that: Source<Out, Mat2>,
segment_size: usize,
) -> Flow<In, Out, Mat>
where
Mat2: Send + 'static,
{
self.interleave_all([that], segment_size, false)
}
#[must_use]
pub fn interleave_all<Mat2, I>(
self,
those: I,
segment_size: usize,
eager_close: bool,
) -> Flow<In, Out, Mat>
where
Mat2: Send + 'static,
I: IntoIterator<Item = Source<Out, Mat2>>,
{
let source_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
self.via(Flow::from_runtime_transform(move |input, materializer| {
let mut streams = Vec::with_capacity(source_factories.len() + 1);
streams.push(input);
for factory in &source_factories {
let stream = match Arc::clone(factory).create(materializer) {
Ok((stream, _)) => stream,
Err(error) => {
return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Out>);
}
};
streams.push(stream);
}
Ok(interleave_streams(streams, segment_size, eager_close))
}))
}
#[must_use]
pub fn merge_sorted<Mat2>(self, that: Source<Out, Mat2>) -> Flow<In, Out, Mat>
where
Out: Ord,
Mat2: Send + 'static,
{
let source_factory = that.factory;
self.via(Flow::from_runtime_transform(move |input, materializer| {
let other = match Arc::clone(&source_factory).create(materializer) {
Ok((stream, _)) => stream,
Err(error) => return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Out>),
};
Ok(merge_sorted_stream(input, other))
}))
}
#[must_use]
pub fn merge_latest<Mat2>(
self,
that: Source<Out, Mat2>,
eager_complete: bool,
) -> Flow<In, Vec<Out>, Mat>
where
Out: Clone,
Mat2: Send + 'static,
{
let source_factory = that.factory;
self.via(Flow::from_runtime_transform(move |input, materializer| {
let other = match Arc::clone(&source_factory).create(materializer) {
Ok((stream, _)) => stream,
Err(error) => {
return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Vec<Out>>);
}
};
Ok(merge_latest_streams(vec![input, other], eager_complete))
}))
}
#[must_use]
pub fn merge_all<Mat2, I>(self, those: I, eager_complete: bool) -> Flow<In, Out, Mat>
where
Mat2: Send + 'static,
I: IntoIterator<Item = Source<Out, Mat2>>,
{
let source_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
self.via(Flow::from_runtime_transform(move |input, materializer| {
let mut streams = Vec::with_capacity(source_factories.len() + 1);
streams.push(input);
for factory in &source_factories {
let stream = match Arc::clone(factory).create(materializer) {
Ok((stream, _)) => stream,
Err(error) => {
return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Out>);
}
};
streams.push(stream);
}
Ok(merge_streams(streams, eager_complete))
}))
}
#[must_use]
pub fn zip_with<Mat2, Out2, Next, F>(
self,
that: Source<Out2, Mat2>,
combine: F,
) -> Flow<In, Next, Mat>
where
Out2: Send + 'static,
Next: Send + 'static,
Mat2: Send + 'static,
F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
{
let source_factory = that.factory;
let combine = Arc::new(combine);
self.via(Flow::from_runtime_transform(move |input, materializer| {
let other = match Arc::clone(&source_factory).create(materializer) {
Ok((stream, _)) => stream,
Err(error) => return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Next>),
};
let combine = Arc::clone(&combine);
Ok(Box::new(
zip_streams(input, other)
.map(move |item| item.map(|(left, right)| combine(left, right))),
) as BoxStream<Next>)
}))
}
#[must_use]
pub fn zip_latest<Mat2, Out2>(self, that: Source<Out2, Mat2>) -> Flow<In, (Out, Out2), Mat>
where
Out: Clone,
Out2: Clone + Send + 'static,
Mat2: Send + 'static,
{
self.zip_latest_with(that, true, |left, right| (left, right))
}
#[must_use]
pub fn zip_latest_with<Mat2, Out2, Next, F>(
self,
that: Source<Out2, Mat2>,
eager_complete: bool,
combine: F,
) -> Flow<In, Next, Mat>
where
Out: Clone,
Out2: Clone + Send + 'static,
Next: Send + 'static,
Mat2: Send + 'static,
F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
{
let source_factory = that.factory;
let combine = Arc::new(combine);
self.via(Flow::from_runtime_transform(move |input, materializer| {
let other = match Arc::clone(&source_factory).create(materializer) {
Ok((stream, _)) => stream,
Err(error) => return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Next>),
};
Ok(zip_latest_with_stream(
input,
other,
eager_complete,
Arc::clone(&combine),
))
}))
}
#[must_use]
pub fn zip_with_index(self) -> Flow<In, (Out, u64), Mat> {
self.via(Flow::from_runtime_transform(
move |mut input, _materializer| {
let mut index = 0_u64;
Ok(Box::new(std::iter::from_fn(move || {
input.next().map(|item| {
item.map(|value| {
let pair = (value, index);
index = index.wrapping_add(1);
pair
})
})
})) as BoxStream<(Out, u64)>)
},
))
}
#[must_use]
pub fn zip_all<Mat2, Out2>(
self,
that: Source<Out2, Mat2>,
this_elem: Out,
that_elem: Out2,
) -> Flow<In, (Out, Out2), Mat>
where
Out: Clone + Sync,
Out2: Clone + Send + Sync + 'static,
Mat2: Send + 'static,
{
let source_factory = that.factory;
self.via(Flow::from_runtime_transform(move |input, materializer| {
let other = match Arc::clone(&source_factory).create(materializer) {
Ok((stream, _)) => stream,
Err(error) => {
return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<(Out, Out2)>);
}
};
Ok(zip_all_stream(
input,
other,
this_elem.clone(),
that_elem.clone(),
))
}))
}
#[must_use]
pub fn also_to<SideMat>(self, sink: Sink<Out, SideMat>) -> Flow<In, Out, Mat>
where
Out: Clone,
SideMat: Send + 'static,
{
self.via(Flow::from_runtime_transform(
move |mut input: BoxStream<Out>, materializer| {
let (side_sender, side_mat) = materialize_side_sink(&sink, materializer, 0)?;
let mut sender = Some(side_sender);
let side_mat = side_mat;
Ok(Box::new(std::iter::from_fn(move || match input.next() {
Some(Ok(item)) => {
let _ = &side_mat;
if sender
.as_ref()
.is_some_and(|sender| sender.send(Ok(item.clone())).is_err())
{
sender = None;
return None;
}
Some(Ok(item))
}
Some(Err(error)) => {
let _ = &side_mat;
let _ = sender
.as_ref()
.and_then(|sender| sender.send(Err(error.clone())).ok());
sender = None;
Some(Err(error))
}
None => {
let _ = &side_mat;
sender = None;
None
}
})) as BoxStream<Out>)
},
))
}
#[must_use]
pub fn also_to_all<SideMat, I>(self, sinks: I) -> Flow<In, Out, Mat>
where
Out: Clone,
SideMat: Send + 'static,
I: IntoIterator<Item = Sink<Out, SideMat>>,
{
let sinks: Vec<_> = sinks.into_iter().collect();
if sinks.is_empty() {
return self;
}
self.via(Flow::from_runtime_transform(
move |mut input: BoxStream<Out>, materializer| {
let mut sides = sinks
.iter()
.map(|sink| materialize_side_sink(sink, materializer, 0))
.collect::<StreamResult<Vec<_>>>()?;
Ok(Box::new(std::iter::from_fn(move || match input.next() {
Some(Ok(item)) => {
for (sender, _) in &sides {
if sender.send(Ok(item.clone())).is_err() {
sides.clear();
return None;
}
}
Some(Ok(item))
}
Some(Err(error)) => {
for (sender, _) in &sides {
let _ = sender.send(Err(error.clone())).ok();
}
sides.clear();
Some(Err(error))
}
None => {
sides.clear();
None
}
})) as BoxStream<Out>)
},
))
}
#[must_use]
pub fn divert_to<SideMat, F>(self, sink: Sink<Out, SideMat>, predicate: F) -> Flow<In, Out, Mat>
where
SideMat: Send + 'static,
F: Fn(&Out) -> bool + Send + Sync + 'static,
{
let predicate = Arc::new(predicate);
self.via(Flow::from_runtime_transform(
move |mut input: BoxStream<Out>, materializer| {
let predicate = Arc::clone(&predicate);
let (side_sender, side_mat) = materialize_side_sink(&sink, materializer, 0)?;
let mut sender = Some(side_sender);
let side_mat = side_mat;
Ok(Box::new(std::iter::from_fn(move || {
loop {
let _ = &side_mat;
match input.next() {
Some(Ok(item)) if predicate(&item) => {
if sender
.as_ref()
.is_some_and(|sender| sender.send(Ok(item)).is_err())
{
sender = None;
return None;
}
}
Some(Ok(item)) => return Some(Ok(item)),
Some(Err(error)) => {
let _ = sender
.as_ref()
.and_then(|sender| sender.send(Err(error.clone())).ok());
sender = None;
return Some(Err(error));
}
None => {
sender = None;
return None;
}
}
}
})) as BoxStream<Out>)
},
))
}
#[must_use]
pub fn wire_tap<SideMat>(self, sink: Sink<Out, SideMat>) -> Flow<In, Out, Mat>
where
Out: Clone,
SideMat: Send + 'static,
{
self.via(Flow::from_runtime_transform(
move |mut input: BoxStream<Out>, materializer| {
let (side_sender, side_mat) = materialize_side_sink(&sink, materializer, 1)?;
let mut sender = Some(side_sender);
let side_mat = side_mat;
Ok(Box::new(std::iter::from_fn(move || match input.next() {
Some(Ok(item)) => {
let _ = &side_mat;
if let Some(side) = sender.as_ref() {
match side.try_send(Ok(item.clone())) {
Ok(()) | Err(std::sync::mpsc::TrySendError::Full(_)) => {}
Err(std::sync::mpsc::TrySendError::Disconnected(_)) => {
sender = None
}
}
}
Some(Ok(item))
}
Some(Err(error)) => {
let _ = &side_mat;
if let Some(side) = sender.as_ref() {
match side.try_send(Err(error.clone())) {
Ok(())
| Err(std::sync::mpsc::TrySendError::Full(_))
| Err(std::sync::mpsc::TrySendError::Disconnected(_)) => {}
}
}
sender = None;
Some(Err(error))
}
None => {
let _ = &side_mat;
sender = None;
None
}
})) as BoxStream<Out>)
},
))
}
fn concat_sources<Mat2, I>(self, those: I) -> Flow<In, Out, Mat>
where
Mat2: Send + 'static,
I: IntoIterator<Item = Source<Out, Mat2>>,
{
let source_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
self.via(Flow::from_runtime_transform(move |input, materializer| {
let mut streams = Vec::with_capacity(source_factories.len() + 1);
streams.push(input);
for factory in &source_factories {
let stream = match Arc::clone(factory).create(materializer) {
Ok((stream, _)) => stream,
Err(error) => {
return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Out>);
}
};
streams.push(stream);
}
Ok(concat_streams(streams))
}))
}
fn prepend_sources<Mat2, I>(self, those: I) -> Flow<In, Out, Mat>
where
Mat2: Send + 'static,
I: IntoIterator<Item = Source<Out, Mat2>>,
{
let source_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
self.via(Flow::from_runtime_transform(move |input, materializer| {
let mut streams = Vec::with_capacity(source_factories.len() + 1);
for factory in &source_factories {
let stream = match Arc::clone(factory).create(materializer) {
Ok((stream, _)) => stream,
Err(error) => {
return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Out>);
}
};
streams.push(stream);
}
streams.push(input);
Ok(concat_streams(streams))
}))
}
#[must_use]
pub fn intersperse(self, inject: Out) -> Flow<In, Out, Mat>
where
Out: Clone + Sync,
{
let inject = Arc::new(inject);
self.via(Flow::from_transform(move |mut input| {
let inject = Arc::clone(&inject);
let mut first = true;
Box::new(std::iter::from_fn(move || {
if first {
first = false;
match input.next() {
None => None,
Some(item) => {
if item.is_err() {
first = true;
}
Some(item)
}
}
} else {
match input.next() {
None => None,
Some(item) => {
if item.is_err() {
first = true;
Some(item)
} else {
Some(Ok((*inject).clone()))
}
}
}
}
}))
}))
}
#[must_use]
pub fn flatten_optional<Inner>(self) -> Flow<In, Inner, Mat>
where
Out: Into<Option<Inner>>,
Inner: Send + 'static,
{
self.filter_map(|item| item.into())
}
#[must_use]
pub fn grouped_weighted<F>(self, max_weight: usize, cost_fn: F) -> Flow<In, Vec<Out>, Mat>
where
F: Fn(&Out) -> usize + Send + Sync + 'static,
{
let cost_fn = Arc::new(cost_fn);
self.via(Flow::from_transform(move |mut input| {
let cost_fn = Arc::clone(&cost_fn);
Box::new(std::iter::from_fn(move || {
let mut group = Vec::new();
let mut weight = 0usize;
while weight < max_weight {
match input.next() {
Some(Ok(item)) => {
let item_weight = cost_fn(&item);
if weight > 0 && weight + item_weight > max_weight {
group.push(item);
break;
}
weight += item_weight;
group.push(item);
}
Some(Err(error)) => return Some(Err(error)),
None => break,
}
}
if group.is_empty() {
None
} else {
Some(Ok(group))
}
}))
}))
}
#[must_use]
pub fn limit_weighted<F>(self, max_weight: usize, cost_fn: F) -> Flow<In, Out, Mat>
where
F: Fn(&Out) -> usize + Send + Sync + 'static,
{
let cost_fn = Arc::new(cost_fn);
self.via(Flow::from_transform(move |mut input| {
let cost_fn = Arc::clone(&cost_fn);
let mut weight = 0usize;
Box::new(std::iter::from_fn(move || match input.next()? {
Ok(item) => {
let item_weight = cost_fn(&item);
if weight + item_weight > max_weight {
Some(Err(StreamError::LimitExceeded {
max: max_weight as u64,
}))
} else {
weight += item_weight;
Some(Ok(item))
}
}
Err(error) => Some(Err(error)),
}))
}))
}
#[must_use]
pub fn contramap<NewIn, F>(self, f: F) -> Flow<NewIn, Out, Mat>
where
NewIn: Send + 'static,
F: Fn(NewIn) -> In + Send + Sync + 'static,
{
let stage = Arc::new(f);
Flow::from_transform(move |input| {
let stage = Arc::clone(&stage);
Box::new(input.map(move |item| item.map(|item| stage(item))))
})
.via_mat(self, Keep::right)
}
#[must_use]
pub fn monitor<F>(self, f: F) -> Flow<In, Out, Mat>
where
Out: Clone,
F: Fn(&Out) + Send + Sync + 'static,
{
let stage = Arc::new(f);
self.via(Flow::from_transform(move |input| {
let stage = Arc::clone(&stage);
Box::new(input.map(move |item| match item {
Ok(item) => {
stage(&item);
Ok(item)
}
Err(error) => Err(error),
}))
}))
}
#[must_use]
pub fn watch_termination<CallbackMat, F>(self, materialize_callback: F) -> Flow<In, Out, Mat>
where
Mat: Clone,
CallbackMat: Send + 'static,
F: Fn(Mat) -> CallbackMat + Send + Sync + 'static,
{
let cb = Arc::new(materialize_callback);
self.map_materialized_value(move |mat| {
let _ = cb(mat.clone());
mat
})
}
#[must_use]
pub fn to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Sink<In, Mat>
where
SinkMat: Send + 'static,
{
self.to_mat(sink, Keep::left)
}
#[must_use]
pub fn to_mat<SinkMat, Combined, F>(
self,
sink: Sink<Out, SinkMat>,
combine: F,
) -> Sink<In, Combined>
where
SinkMat: Send + 'static,
Combined: Send + 'static,
F: Fn(Mat, SinkMat) -> Combined + Send + Sync + 'static,
{
let transform = self.transform;
let materialize = self.materialize;
let combine = Arc::new(combine);
Sink::from_runner(move |input, materializer| {
let flow_mat = materialize()?;
let input = match &transform {
FlowTransform::Pure(transform) => transform(input),
FlowTransform::Runtime(transform) => transform(input, materializer)?,
};
let sink_mat = sink.run(input, materializer)?;
Ok(combine(flow_mat, sink_mat))
})
}
#[must_use]
pub fn map_materialized_value<NextMat, F>(self, f: F) -> Flow<In, Out, NextMat>
where
NextMat: Send + 'static,
F: Fn(Mat) -> NextMat + Send + Sync + 'static,
{
let transform = self.transform;
let materialize = self.materialize;
let hints = self.hints;
let attributes = self.attributes;
let f = Arc::new(f);
match transform {
FlowTransform::Pure(transform) => Flow::from_parts_with_hints(
move |input| transform(input),
move || {
let mat = materialize()?;
Ok(f(mat))
},
hints,
)
.with_attributes(attributes),
FlowTransform::Runtime(transform) => Flow {
transform: FlowTransform::Runtime(transform),
materialize: Arc::new(move || {
let mat = materialize()?;
Ok(f(mat))
}),
hints,
attributes,
},
}
}
}
struct MapWithResourceStream<In, Out, Resource, Create, F, Close>
where
Create: Fn() -> StreamResult<Resource>,
F: Fn(&mut Resource, In) -> StreamResult<Out>,
Close: Fn(Resource) -> StreamResult<Option<Out>>,
{
input: BoxStream<In>,
create: Arc<Create>,
stage: Arc<F>,
close: Arc<Close>,
resource: Option<Resource>,
created: bool,
pending_terminal: Option<StreamError>,
terminated: bool,
_marker: PhantomData<fn() -> Out>,
}
impl<In, Out, Resource, Create, F, Close> MapWithResourceStream<In, Out, Resource, Create, F, Close>
where
Create: Fn() -> StreamResult<Resource>,
F: Fn(&mut Resource, In) -> StreamResult<Out>,
Close: Fn(Resource) -> StreamResult<Option<Out>>,
{
fn ensure_created(&mut self) -> StreamResult<()> {
if self.created {
return Ok(());
}
self.created = true;
let resource = catch_unwind_failed("map_with_resource create", || (self.create)())
.and_then(|result| result)?;
self.resource = Some(resource);
Ok(())
}
fn close_resource(&mut self) -> StreamResult<Option<Out>> {
match self.resource.take() {
Some(resource) => {
catch_unwind_failed("map_with_resource close", || (self.close)(resource))
.and_then(|result| result)
}
None => Ok(None),
}
}
fn close_with_terminal(&mut self, terminal: Option<StreamError>) -> Option<StreamResult<Out>> {
self.terminated = terminal.is_none();
match self.close_resource() {
Ok(Some(item)) => {
self.pending_terminal = terminal;
Some(Ok(item))
}
Ok(None) => match terminal {
Some(error) => {
self.terminated = true;
Some(Err(error))
}
None => {
self.terminated = true;
None
}
},
Err(error) => {
self.terminated = true;
Some(Err(terminal.unwrap_or(error)))
}
}
}
}
impl<In, Out, Resource, Create, F, Close> Iterator
for MapWithResourceStream<In, Out, Resource, Create, F, Close>
where
Create: Fn() -> StreamResult<Resource>,
F: Fn(&mut Resource, In) -> StreamResult<Out>,
Close: Fn(Resource) -> StreamResult<Option<Out>>,
{
type Item = StreamResult<Out>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(error) = self.pending_terminal.take() {
self.terminated = true;
return Some(Err(error));
}
if self.terminated {
return None;
}
if let Err(error) = self.ensure_created() {
self.terminated = true;
return Some(Err(error));
}
match self.input.next() {
Some(Ok(item)) => {
let result = {
let resource = self
.resource
.as_mut()
.expect("map_with_resource resource is open");
catch_unwind_failed("map_with_resource function", || {
(self.stage)(resource, item)
})
.and_then(|result| result)
};
match result {
Ok(item) => Some(Ok(item)),
Err(error) => self.close_with_terminal(Some(error)),
}
}
Some(Err(error)) => self.close_with_terminal(Some(error)),
None => self.close_with_terminal(None),
}
}
}
impl<I1: Send + 'static, O1: Send + 'static, I2: Send + 'static, O2: Send + 'static>
BidiFlow<I1, O1, I2, O2>
{
#[must_use]
pub fn from_flows<Mat1, Mat2>(top: Flow<I1, O1, Mat1>, bottom: Flow<I2, O2, Mat2>) -> Self
where
Mat1: Send + 'static,
Mat2: Send + 'static,
{
Self {
top: top.map_materialized_value(|_| NotUsed),
bottom: bottom.map_materialized_value(|_| NotUsed),
attributes: Attributes::default(),
}
}
}
impl<I1: Send + 'static, O1: Send + 'static, I2: Send + 'static, O2: Send + 'static>
BidiFlow<I1, O1, I2, O2>
{
#[must_use]
pub fn attributes(&self) -> &Attributes {
&self.attributes
}
#[must_use]
pub fn with_attributes(mut self, attributes: Attributes) -> Self {
self.attributes = attributes;
self
}
#[must_use]
pub fn add_attributes(mut self, attributes: Attributes) -> Self {
self.attributes = self.attributes.and(attributes);
self
}
#[must_use]
pub fn named(self, name: impl Into<String>) -> Self {
self.add_attributes(Attributes::named(name))
}
#[must_use]
pub fn join<Mat2>(self, flow: Flow<O1, I2, Mat2>) -> Flow<I1, O2, NotUsed>
where
Mat2: Send + 'static,
{
self.top
.via(flow)
.via(self.bottom)
.map_materialized_value(|_| NotUsed)
.with_attributes(self.attributes)
}
#[must_use]
pub fn atop<OO1: Send + 'static, II2: Send + 'static>(
self,
bidi: BidiFlow<O1, OO1, II2, I2>,
) -> BidiFlow<I1, OO1, II2, O2> {
BidiFlow {
top: self.top.via(bidi.top).map_materialized_value(|_| NotUsed),
bottom: bidi
.bottom
.via(self.bottom)
.map_materialized_value(|_| NotUsed),
attributes: self.attributes.and(bidi.attributes),
}
}
#[must_use]
pub fn reversed(self) -> BidiFlow<I2, O2, I1, O1> {
BidiFlow {
top: self.bottom,
bottom: self.top.map_materialized_value(|_| NotUsed),
attributes: self.attributes,
}
}
}
impl<In, Out, Resource, Create, F, Close> Drop
for MapWithResourceStream<In, Out, Resource, Create, F, Close>
where
Create: Fn() -> StreamResult<Resource>,
F: Fn(&mut Resource, In) -> StreamResult<Out>,
Close: Fn(Resource) -> StreamResult<Option<Out>>,
{
fn drop(&mut self) {
let _ = self.close_resource();
}
}
fn materialize_inner_flow<In, Out, InnerMat>(
flow: Flow<In, Out, InnerMat>,
input: BoxStream<In>,
materializer: &Materializer,
) -> StreamResult<(BoxStream<Out>, InnerMat)>
where
In: Send + 'static,
Out: Send + 'static,
InnerMat: Send + 'static,
{
let mat = (flow.materialize)()?;
let stream = match flow.transform {
FlowTransform::Pure(transform) => transform(input),
FlowTransform::Runtime(transform) => transform(input, materializer)?,
};
Ok((stream, mat))
}
struct FutureFlowStream<In, Out, InnerMat, F, Fut> {
future: Arc<F>,
materializer: Materializer,
input: Option<BoxStream<In>>,
current: Option<BoxStream<Out>>,
mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
initialized: bool,
terminated: bool,
_marker: PhantomData<fn() -> Fut>,
}
impl<In, Out, InnerMat, F, Fut> FutureFlowStream<In, Out, InnerMat, F, Fut>
where
In: Send + 'static,
Out: Send + 'static,
InnerMat: Send + 'static,
F: Fn() -> Fut,
Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
{
fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
if let Some(sender) = self.mat_sender.take() {
let _ = sender.send(result);
}
}
fn initialize(&mut self) -> StreamResult<()> {
if self.initialized {
return Ok(());
}
self.initialized = true;
let flow = match catch_unwind_failed("future_flow factory", || (self.future)())
.and_then(run_future_inline_or_spawn)
{
Ok(flow) => flow,
Err(error) => {
self.complete_mat(Err(error.clone()));
return Err(error);
}
};
let input = self.input.take().expect("future_flow input available");
match materialize_inner_flow(flow, input, &self.materializer) {
Ok((stream, mat)) => {
self.current = Some(stream);
self.complete_mat(Ok(mat));
Ok(())
}
Err(error) => {
self.complete_mat(Err(error.clone()));
Err(error)
}
}
}
}
impl<In, Out, InnerMat, F, Fut> Iterator for FutureFlowStream<In, Out, InnerMat, F, Fut>
where
In: Send + 'static,
Out: Send + 'static,
InnerMat: Send + 'static,
F: Fn() -> Fut,
Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
{
type Item = StreamResult<Out>;
fn next(&mut self) -> Option<Self::Item> {
if self.terminated {
return None;
}
if let Err(error) = self.initialize() {
self.terminated = true;
return Some(Err(error));
}
match self
.current
.as_mut()
.expect("future_flow current stream initialized")
.next()
{
Some(Ok(item)) => Some(Ok(item)),
Some(Err(error)) => {
self.terminated = true;
Some(Err(error))
}
None => {
self.terminated = true;
None
}
}
}
}
impl<In, Out, InnerMat, F, Fut> Drop for FutureFlowStream<In, Out, InnerMat, F, Fut> {
fn drop(&mut self) {
if !self.initialized
&& let Some(sender) = self.mat_sender.take()
{
let _ = sender.send(Err(StreamError::Failed(
"future flow was never materialized".into(),
)));
}
}
}
struct LazyFutureFlowStream<In, Out, InnerMat, F, Fut> {
create: Arc<F>,
materializer: Materializer,
input: Option<BoxStream<In>>,
current: Option<BoxStream<Out>>,
mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
initialized: bool,
terminated: bool,
_marker: PhantomData<fn() -> Fut>,
}
impl<In, Out, InnerMat, F, Fut> LazyFutureFlowStream<In, Out, InnerMat, F, Fut>
where
In: Send + 'static,
Out: Send + 'static,
InnerMat: Send + 'static,
F: Fn() -> Fut,
Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
{
fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
if let Some(sender) = self.mat_sender.take() {
let _ = sender.send(result);
}
}
fn initialize(&mut self) -> Result<bool, StreamError> {
if self.initialized {
return Ok(true);
}
self.initialized = true;
let first = match self
.input
.as_mut()
.expect("lazy_future_flow input available")
.next()
{
Some(Ok(item)) => item,
Some(Err(error)) => {
self.complete_mat(Err(error.clone()));
return Err(error);
}
None => {
self.complete_mat(Err(StreamError::Failed(
"lazy flow was never materialized".into(),
)));
self.terminated = true;
return Ok(false);
}
};
let flow = match catch_unwind_failed("lazy_future_flow factory", || (self.create)())
.and_then(run_future_inline_or_spawn)
{
Ok(flow) => flow,
Err(error) => {
self.complete_mat(Err(error.clone()));
return Err(error);
}
};
let input = prepend_first_stream(
first,
self.input
.take()
.expect("lazy_future_flow input available after first element"),
);
match materialize_inner_flow(flow, input, &self.materializer) {
Ok((stream, mat)) => {
self.current = Some(stream);
self.complete_mat(Ok(mat));
Ok(true)
}
Err(error) => {
self.complete_mat(Err(error.clone()));
Err(error)
}
}
}
}
impl<In, Out, InnerMat, F, Fut> Iterator for LazyFutureFlowStream<In, Out, InnerMat, F, Fut>
where
In: Send + 'static,
Out: Send + 'static,
InnerMat: Send + 'static,
F: Fn() -> Fut,
Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
{
type Item = StreamResult<Out>;
fn next(&mut self) -> Option<Self::Item> {
if self.terminated {
return None;
}
match self.initialize() {
Ok(true) => {}
Ok(false) => return None,
Err(error) => {
self.terminated = true;
return Some(Err(error));
}
}
match self
.current
.as_mut()
.expect("lazy_future_flow current stream initialized")
.next()
{
Some(Ok(item)) => Some(Ok(item)),
Some(Err(error)) => {
self.terminated = true;
Some(Err(error))
}
None => {
self.terminated = true;
None
}
}
}
}
impl<In, Out, InnerMat, F, Fut> Drop for LazyFutureFlowStream<In, Out, InnerMat, F, Fut> {
fn drop(&mut self) {
if !self.initialized
&& let Some(sender) = self.mat_sender.take()
{
let _ = sender.send(Err(StreamError::Failed(
"lazy flow was never materialized".into(),
)));
}
}
}
fn prepend_first_stream<In>(first: In, mut rest: BoxStream<In>) -> BoxStream<In>
where
In: Send + 'static,
{
let mut first = Some(first);
Box::new(std::iter::from_fn(move || {
if let Some(item) = first.take() {
Some(Ok(item))
} else {
rest.next()
}
}))
}
pub(super) fn poll_once_or_pending<Fut, T>(future: Fut) -> Result<StreamResult<T>, Pin<Box<Fut>>>
where
Fut: Future<Output = StreamResult<T>>,
{
let mut future = Box::pin(future);
let _guard = stream_tokio_runtime().enter();
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
match catch_unwind(AssertUnwindSafe(|| future.as_mut().poll(&mut cx))) {
Ok(Poll::Ready(output)) => Ok(output),
Ok(Poll::Pending) => Err(future),
Err(_) => Ok(Err(StreamError::Failed("future task panicked".into()))),
}
}
pub(super) fn spawn_completion_task<Fut, T, Msg, Map>(
task_id: usize,
future: Pin<Box<Fut>>,
sender: std::sync::mpsc::Sender<(usize, Msg)>,
map: Map,
) -> AbortOnDropHandle<()>
where
Fut: Future<Output = StreamResult<T>> + Send + 'static,
T: Send + 'static,
Msg: Send + 'static,
Map: FnOnce(StreamResult<T>) -> Msg + Send + 'static,
{
spawn_tokio_task(async move {
let result = AssertUnwindSafe(future).catch_unwind().await;
let message = match result {
Ok(output) => map(output),
Err(_) => map(Err(StreamError::Failed("future task panicked".into()))),
};
let _ = sender.send((task_id, message));
})
}
pub(super) fn recv_completion<Msg>(
receiver: &std::sync::mpsc::Receiver<(usize, Msg)>,
) -> Option<(usize, Msg)> {
let mut idle_spins = 0;
loop {
match receiver.try_recv() {
Ok(message) => return Some(message),
Err(std::sync::mpsc::TryRecvError::Disconnected) => return None,
Err(std::sync::mpsc::TryRecvError::Empty) if idle_spins < STREAM_READY_SPINS => {
idle_spins += STREAM_SPIN_BACKOFF;
for _ in 0..STREAM_SPIN_BACKOFF {
std::hint::spin_loop();
}
}
Err(std::sync::mpsc::TryRecvError::Empty) => {
idle_spins = 0;
match receiver.recv_timeout(STREAM_MAX_PARK) {
Ok(message) => return Some(message),
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => return None,
}
}
}
}
}
pub(super) fn run_future_inline_or_spawn<Fut, T>(future: Fut) -> StreamResult<T>
where
Fut: Future<Output = StreamResult<T>> + Send + 'static,
T: Send + 'static,
{
match poll_once_or_pending(future) {
Ok(result) => result,
Err(future) => {
let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<T>)>();
let _task = spawn_completion_task(0, future, sender, |result| result);
recv_completion(&receiver)
.map(|(_, result)| result)
.unwrap_or_else(|| Err(StreamError::Failed("future task dropped".into())))
}
}
}
fn map_async_ordered<Out, Next, F, Fut>(
mut input: BoxStream<Out>,
parallelism: usize,
stage: Arc<F>,
) -> BoxStream<Next>
where
Out: Send + 'static,
Next: Send + 'static,
F: Fn(Out) -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Next>> + Send + 'static,
{
let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<Next>)>();
let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
let mut next_index = 0_usize;
let mut next_to_emit = 0_usize;
let mut completed = BTreeMap::new();
let mut input_done = false;
Box::new(std::iter::from_fn(move || {
loop {
if let Some(result) = completed.remove(&next_to_emit) {
next_to_emit += 1;
return Some(result);
}
while tasks.len() + completed.len() < parallelism && !input_done {
match input.next() {
Some(Ok(item)) => {
let index = next_index;
next_index += 1;
match poll_once_or_pending(stage(item)) {
Ok(result) => {
if index == next_to_emit {
next_to_emit += 1;
return Some(result);
}
completed.insert(index, result);
}
Err(future) => {
tasks.insert(
index,
spawn_completion_task(
index,
future,
sender.clone(),
|result| result,
),
);
}
}
}
Some(Err(error)) => {
completed.insert(next_index, Err(error));
next_index += 1;
input_done = true;
}
None => input_done = true,
}
}
if let Some(result) = completed.remove(&next_to_emit) {
next_to_emit += 1;
return Some(result);
}
if tasks.is_empty() {
return None;
}
if let Some((index, result)) = recv_completion(&receiver) {
tasks.remove(&index);
if index == next_to_emit {
next_to_emit += 1;
return Some(result);
}
completed.insert(index, result);
}
}
}))
}
fn supervise_async_result<Next>(
result: StreamResult<Next>,
decider: &SupervisionDecider,
) -> Option<StreamResult<Next>> {
match result {
Ok(item) => Some(Ok(item)),
Err(error) => match decide_supervision(decider, &error) {
SupervisionDirective::Stop => Some(Err(error)),
SupervisionDirective::Resume | SupervisionDirective::Restart => None,
},
}
}
fn map_async_ordered_supervised<Out, Next, F, Fut>(
mut input: BoxStream<Out>,
parallelism: usize,
stage: Arc<F>,
decider: SupervisionDecider,
) -> BoxStream<Next>
where
Out: Send + 'static,
Next: Send + 'static,
F: Fn(Out) -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Next>> + Send + 'static,
{
let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<Next>)>();
let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
let mut next_index = 0_usize;
let mut next_to_emit = 0_usize;
let mut completed = BTreeMap::<usize, Option<StreamResult<Next>>>::new();
let mut input_done = false;
Box::new(std::iter::from_fn(move || {
loop {
while let Some(result) = completed.remove(&next_to_emit) {
next_to_emit += 1;
if let Some(result) = result {
return Some(result);
}
}
while tasks.len() + completed.len() < parallelism && !input_done {
match input.next() {
Some(Ok(item)) => {
let index = next_index;
next_index += 1;
match catch_unwind(AssertUnwindSafe(|| poll_once_or_pending(stage(item)))) {
Ok(Ok(result)) => {
let result = supervise_async_result(result, &decider);
if index == next_to_emit {
next_to_emit += 1;
if let Some(result) = result {
return Some(result);
}
} else {
completed.insert(index, result);
}
}
Ok(Err(future)) => {
tasks.insert(
index,
spawn_completion_task(
index,
future,
sender.clone(),
|result| result,
),
);
}
Err(_) => {
let error = panic_stream_error("map_async callback");
let result = supervise_async_result(Err(error), &decider);
if index == next_to_emit {
next_to_emit += 1;
if let Some(result) = result {
return Some(result);
}
} else {
completed.insert(index, result);
}
}
}
}
Some(Err(error)) => {
completed.insert(next_index, Some(Err(error)));
next_index += 1;
input_done = true;
}
None => input_done = true,
}
}
while let Some(result) = completed.remove(&next_to_emit) {
next_to_emit += 1;
if let Some(result) = result {
return Some(result);
}
}
if tasks.is_empty() {
return None;
}
if let Some((index, result)) = recv_completion(&receiver) {
tasks.remove(&index);
let result = supervise_async_result(result, &decider);
if index == next_to_emit {
next_to_emit += 1;
if let Some(result) = result {
return Some(result);
}
} else {
completed.insert(index, result);
}
}
}
}))
}
fn concat_streams<Out>(streams: Vec<BoxStream<Out>>) -> BoxStream<Out>
where
Out: Send + 'static,
{
let mut streams: VecDeque<_> = streams.into();
let mut current = streams.pop_front();
Box::new(std::iter::from_fn(move || {
loop {
match current.as_mut() {
Some(stream) => match stream.next() {
Some(item) => return Some(item),
None => current = streams.pop_front(),
},
None => return None,
}
}
}))
}
fn concat_streams_lazy<Out, Mat>(
initial: BoxStream<Out>,
factories: Vec<Arc<dyn SourceFactory<Out, Mat>>>,
materializer: &Materializer,
) -> BoxStream<Out>
where
Out: Send + 'static,
Mat: Send + 'static,
{
let mut current = Some(initial);
let mut remaining: VecDeque<_> = factories.into();
let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
Box::new(std::iter::from_fn(move || {
loop {
match current.as_mut() {
Some(stream) => match stream.next() {
Some(item) => return Some(item),
None => {
current = remaining.pop_front().map(|factory| {
match factory.create(&materializer) {
Ok((stream, _)) => stream,
Err(error) => {
Box::new(std::iter::once(Err(error))) as BoxStream<Out>
}
}
});
}
},
None => return None,
}
}
}))
}
fn or_else_stream<Out>(mut primary: BoxStream<Out>, mut secondary: BoxStream<Out>) -> BoxStream<Out>
where
Out: Send + 'static,
{
let mut primary_emitted = false;
let mut using_secondary = false;
Box::new(std::iter::from_fn(move || {
loop {
if using_secondary {
return secondary.next();
}
match primary.next() {
Some(Ok(item)) => {
primary_emitted = true;
return Some(Ok(item));
}
Some(Err(error)) => return Some(Err(error)),
None if primary_emitted => return None,
None => using_secondary = true,
}
}
}))
}
fn interleave_streams<Out>(
streams: Vec<BoxStream<Out>>,
segment_size: usize,
eager_close: bool,
) -> BoxStream<Out>
where
Out: Send + 'static,
{
if segment_size == 0 {
return Box::new(std::iter::once(Err(StreamError::GraphValidation(
"interleave segment size must be greater than zero".into(),
))));
}
let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
let mut pending: Vec<Option<StreamResult<Out>>> = (0..streams.len()).map(|_| None).collect();
let mut current = 0usize;
let mut emitted = 0usize;
Box::new(std::iter::from_fn(move || {
loop {
if streams.iter().all(Option::is_none) {
return None;
}
if streams[current].is_none() {
match next_active_stream(&streams, current) {
Some(next) => {
current = next;
emitted = 0;
}
None => return None,
}
}
let Some(stream) = streams[current].as_mut() else {
continue;
};
let next_item = pending[current].take().or_else(|| stream.next());
match next_item {
Some(Ok(item)) => {
emitted += 1;
if emitted == segment_size {
emitted = 0;
if let Some(next) = next_active_stream(&streams, current) {
current = next;
}
}
return Some(Ok(item));
}
Some(Err(error)) => return Some(Err(error)),
None => {
streams[current] = None;
emitted = 0;
if eager_close {
return None;
}
match next_active_stream(&streams, current) {
Some(next) => current = next,
None => return None,
}
}
}
}
}))
}
fn next_active_stream<Out>(streams: &[Option<BoxStream<Out>>], current: usize) -> Option<usize>
where
Out: Send + 'static,
{
if streams.is_empty() {
return None;
}
for offset in 1..=streams.len() {
let index = (current + offset) % streams.len();
if streams[index].is_some() {
return Some(index);
}
}
None
}
fn materialize_side_sink<Out, Mat>(
sink: &Sink<Out, Mat>,
materializer: &Materializer,
buffer: usize,
) -> StreamResult<(std::sync::mpsc::SyncSender<StreamResult<Out>>, Mat)>
where
Out: Send + 'static,
Mat: Send + 'static,
{
let (sender, receiver) = std::sync::mpsc::sync_channel(buffer);
let mat = sink.run(side_receiver_stream(receiver), materializer)?;
Ok((sender, mat))
}
fn side_receiver_stream<Out>(
receiver: std::sync::mpsc::Receiver<StreamResult<Out>>,
) -> BoxStream<Out>
where
Out: Send + 'static,
{
Box::new(std::iter::from_fn(move || receiver.recv().ok()))
}
#[derive(Clone)]
enum LiveSubstreamTerminal {
Complete,
Error(StreamError),
}
const LIVE_SUBSTREAM_CAPACITY: usize = 256;
const LIVE_SUBSTREAM_BATCH: usize = 64;
const FLAT_MAP_MERGE_SUBSTREAM_WINDOW: usize = 64;
struct LiveSubstreamShared<T> {
state: Mutex<LiveSubstreamState<T>>,
available: Condvar,
cancelled: Arc<AtomicBool>,
capacity: usize,
batch_size: usize,
}
struct LiveSubstreamState<T> {
buffered: usize,
batches: VecDeque<VecDeque<T>>,
terminal: Option<LiveSubstreamTerminal>,
}
impl<T> LiveSubstreamShared<T> {
fn new() -> Arc<Self> {
Self::with_capacity(LIVE_SUBSTREAM_CAPACITY)
}
fn with_capacity(capacity: usize) -> Arc<Self> {
Self::with_batching(capacity, LIVE_SUBSTREAM_BATCH)
}
fn with_batching(capacity: usize, batch_size: usize) -> Arc<Self> {
Arc::new(Self {
state: Mutex::new(LiveSubstreamState {
buffered: 0,
batches: VecDeque::new(),
terminal: None,
}),
available: Condvar::new(),
cancelled: Arc::new(AtomicBool::new(false)),
capacity,
batch_size: batch_size.max(1),
})
}
}
struct LiveSubstreamStream<T> {
shared: Arc<LiveSubstreamShared<T>>,
completion: Option<StreamCompletion<NotUsed>>,
local_batch: VecDeque<T>,
}
impl<T> Iterator for LiveSubstreamStream<T> {
type Item = StreamResult<T>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(item) = self.local_batch.pop_front() {
return Some(Ok(item));
}
let mut state = self
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
loop {
if let Some(mut batch) = state.batches.pop_front() {
state.buffered -= batch.len();
drop(state);
self.shared.available.notify_all();
let item = batch.pop_front().expect("live substream batch has an item");
self.local_batch = batch;
return Some(Ok(item));
}
if let Some(terminal) = state.terminal.clone() {
return match terminal {
LiveSubstreamTerminal::Complete => None,
LiveSubstreamTerminal::Error(error) => Some(Err(error)),
};
}
state = self
.shared
.available
.wait(state)
.unwrap_or_else(|poison| poison.into_inner());
}
}
}
impl<T> Drop for LiveSubstreamStream<T> {
fn drop(&mut self) {
self.shared.cancelled.store(true, Ordering::SeqCst);
self.shared.available.notify_all();
let _ = self.completion.take();
}
}
fn push_live_substream_batch<T>(
shared: &Arc<LiveSubstreamShared<T>>,
batch: &mut VecDeque<T>,
) -> Result<(), ()> {
while !batch.is_empty() {
let mut state = shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
while state.buffered >= shared.capacity && state.terminal.is_none() {
if shared.cancelled.load(Ordering::SeqCst) {
batch.clear();
return Err(());
}
state = shared
.available
.wait(state)
.unwrap_or_else(|poison| poison.into_inner());
}
if shared.cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
batch.clear();
return Err(());
}
let was_empty = state.buffered == 0;
while state.buffered < shared.capacity && !batch.is_empty() {
let item = batch.pop_front().expect("batch non-empty");
if let Some(back) = state.batches.back_mut()
&& back.len() < shared.batch_size
{
back.push_back(item);
} else {
let mut new_batch =
VecDeque::with_capacity(shared.batch_size.min(shared.capacity.max(1)));
new_batch.push_back(item);
state.batches.push_back(new_batch);
}
state.buffered += 1;
}
drop(state);
if was_empty {
shared.available.notify_all();
}
}
Ok(())
}
fn push_live_substream<T>(shared: &Arc<LiveSubstreamShared<T>>, item: T) -> Result<(), T> {
let mut state = shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
while state.buffered >= shared.capacity && state.terminal.is_none() {
if shared.cancelled.load(Ordering::SeqCst) {
return Err(item);
}
state = shared
.available
.wait(state)
.unwrap_or_else(|poison| poison.into_inner());
}
if shared.cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
return Err(item);
}
let was_empty = state.buffered == 0;
if let Some(batch) = state.batches.back_mut()
&& batch.len() < shared.batch_size
{
batch.push_back(item);
} else {
let mut batch = VecDeque::with_capacity(shared.batch_size.min(shared.capacity.max(1)));
batch.push_back(item);
state.batches.push_back(batch);
}
state.buffered += 1;
drop(state);
if was_empty {
shared.available.notify_all();
}
Ok(())
}
fn complete_live_substream<T>(shared: &Arc<LiveSubstreamShared<T>>) {
let mut state = shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.terminal.is_none() {
state.terminal = Some(LiveSubstreamTerminal::Complete);
}
drop(state);
shared.available.notify_all();
}
fn fail_live_substream<T>(shared: &Arc<LiveSubstreamShared<T>>, error: StreamError) {
let mut state = shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.terminal.is_none() {
state.terminal = Some(LiveSubstreamTerminal::Error(error));
}
drop(state);
shared.available.notify_all();
}
fn cancel_live_substream<T>(shared: &Arc<LiveSubstreamShared<T>>) {
fail_live_substream(shared, StreamError::Cancelled);
}
fn source_from_live_substream<T>(shared: Arc<LiveSubstreamShared<T>>) -> Source<T>
where
T: Send + 'static,
{
let claimed = Arc::new(AtomicBool::new(false));
Source::from_materialized_factory(move |_materializer| {
if claimed.swap(true, Ordering::SeqCst) {
return Err(StreamError::Failed(
"substream source cannot be materialized more than once".into(),
));
}
Ok((
Box::new(LiveSubstreamStream {
shared: Arc::clone(&shared),
completion: None,
local_batch: VecDeque::new(),
}) as BoxStream<T>,
NotUsed,
))
})
}
fn source_from_once_stream<T>(stream: BoxStream<T>) -> Source<T>
where
T: Send + 'static,
{
let stream = Arc::new(Mutex::new(Some(stream)));
Source::from_materialized_factory(move |_materializer| {
let mut slot = stream.lock().unwrap_or_else(|poison| poison.into_inner());
let stream = slot.take().ok_or_else(|| {
StreamError::Failed("substream source cannot be materialized more than once".into())
})?;
Ok((stream, NotUsed))
})
}
fn prefix_and_tail_stream<Out>(
input: BoxStream<Out>,
n: usize,
) -> BoxStream<(Vec<Out>, Source<Out>)>
where
Out: Send + 'static,
{
let mut input = Some(input);
let mut emitted = false;
Box::new(std::iter::from_fn(move || {
if emitted {
return None;
}
emitted = true;
let mut prefix = Vec::with_capacity(n);
while prefix.len() < n {
match input
.as_mut()
.expect("prefix_and_tail input available")
.next()
{
Some(Ok(item)) => prefix.push(item),
Some(Err(error)) => return Some(Err(error)),
None => return Some(Ok((prefix, Source::empty()))),
}
}
Some(Ok((
prefix,
source_from_once_stream(input.take().expect("tail input available")),
)))
}))
}
struct GroupByWorkerGuard<Key, Out> {
outer: Arc<LiveSubstreamShared<Source<Out>>>,
active: HashMap<Key, Arc<LiveSubstreamShared<Out>>>,
closed: HashSet<Key>,
armed: bool,
}
impl<Key, Out> GroupByWorkerGuard<Key, Out>
where
Key: Eq + Hash,
{
fn new(outer: Arc<LiveSubstreamShared<Source<Out>>>) -> Self {
Self {
outer,
active: HashMap::new(),
closed: HashSet::new(),
armed: true,
}
}
fn disarm(&mut self) {
self.armed = false;
}
fn fail_all(&self, error: StreamError)
where
Out: Send + 'static,
{
fail_live_substream(&self.outer, error.clone());
for substream in self.active.values() {
fail_live_substream(substream, error.clone());
}
}
fn complete_all(&self)
where
Out: Send + 'static,
{
complete_live_substream(&self.outer);
for substream in self.active.values() {
complete_live_substream(substream);
}
}
fn cancel_all(&self)
where
Out: Send + 'static,
{
for substream in self.active.values() {
cancel_live_substream(substream);
}
}
}
impl<Key, Out> Drop for GroupByWorkerGuard<Key, Out> {
fn drop(&mut self) {
if self.armed {
fail_live_substream(&self.outer, StreamError::AbruptTermination);
for substream in self.active.values() {
fail_live_substream(substream, StreamError::AbruptTermination);
}
}
}
}
fn group_by_flush_write_batch<Key, Out>(
guard: &mut GroupByWorkerGuard<Key, Out>,
wb_key: &mut Option<Key>,
wb_sub: &mut Option<Arc<LiveSubstreamShared<Out>>>,
wb_items: &mut VecDeque<Out>,
allow_closed_substream_recreation: bool,
) where
Key: Clone + Eq + Hash,
Out: Send + 'static,
{
if wb_items.is_empty() {
return;
}
let key = wb_key.take().expect("wb_key set when wb_items non-empty");
if let Some(ref sub) = *wb_sub {
if push_live_substream_batch(sub, wb_items).is_err() {
guard.active.remove(&key);
if !allow_closed_substream_recreation {
guard.closed.insert(key);
}
}
} else {
wb_items.clear();
}
*wb_sub = None;
}
fn group_by_stream<Out, Key, F>(
mut input: BoxStream<Out>,
max_substreams: usize,
allow_closed_substream_recreation: bool,
key_fn: Arc<F>,
batch_mode: GroupByBatchMode,
materializer: &Materializer,
) -> BoxStream<Source<Out>>
where
Out: Clone + Send + 'static,
Key: Clone + Eq + Hash + Send + 'static,
F: Fn(&Out) -> Key + Send + Sync + 'static,
{
let outer = LiveSubstreamShared::new();
let worker_outer = Arc::clone(&outer);
let batch_repeated_keys = batch_mode == GroupByBatchMode::FiniteEagerNoRecreate;
let completion = materializer.spawn_stream(move |cancelled| {
let mut guard = GroupByWorkerGuard::new(worker_outer);
let mut wb_key: Option<Key> = None;
let mut wb_sub: Option<Arc<LiveSubstreamShared<Out>>> = None;
let mut wb_items: VecDeque<Out> = VecDeque::with_capacity(LIVE_SUBSTREAM_BATCH);
while !cancelled.load(Ordering::SeqCst) {
if guard.outer.cancelled.load(Ordering::SeqCst) {
guard.disarm();
return Ok(NotUsed);
}
match input.next() {
Some(Ok(item)) => {
let key = match catch_unwind(AssertUnwindSafe(|| key_fn(&item))) {
Ok(key) => key,
Err(_panic) => {
wb_items.clear();
guard.fail_all(StreamError::AbruptTermination);
guard.disarm();
return Ok(NotUsed);
}
};
if let Some(current) = guard.active.get(&key)
&& current.cancelled.load(Ordering::SeqCst)
{
if wb_key.as_ref() == Some(&key) {
wb_items.clear();
wb_key = None;
wb_sub = None;
}
guard.active.remove(&key);
if !allow_closed_substream_recreation {
guard.closed.insert(key.clone());
}
}
let mut item = item;
if let Some(current) = guard.active.get(&key).cloned() {
if !batch_repeated_keys {
item = match push_live_substream(¤t, item) {
Ok(()) => {
continue;
}
Err(item) => item,
};
guard.active.remove(&key);
if !allow_closed_substream_recreation {
guard.closed.insert(key.clone());
continue;
}
} else {
if wb_key.as_ref() != Some(&key) {
group_by_flush_write_batch(
&mut guard,
&mut wb_key,
&mut wb_sub,
&mut wb_items,
allow_closed_substream_recreation,
);
wb_key = Some(key.clone());
wb_sub = Some(current);
}
wb_items.push_back(item);
if wb_items.len() >= LIVE_SUBSTREAM_BATCH {
group_by_flush_write_batch(
&mut guard,
&mut wb_key,
&mut wb_sub,
&mut wb_items,
allow_closed_substream_recreation,
);
}
continue;
}
}
if !wb_items.is_empty() {
group_by_flush_write_batch(
&mut guard,
&mut wb_key,
&mut wb_sub,
&mut wb_items,
allow_closed_substream_recreation,
);
}
if guard.closed.contains(&key) {
continue;
}
if guard.active.len() + guard.closed.len() == max_substreams {
let error = StreamError::Failed(format!(
"group_by reached max_substreams ({max_substreams})"
));
guard.fail_all(error.clone());
guard.disarm();
return Err(error);
}
let substream = LiveSubstreamShared::with_capacity(LIVE_SUBSTREAM_CAPACITY);
push_live_substream(&substream, item)
.unwrap_or_else(|_| unreachable!("fresh group_by substream"));
guard.active.insert(key.clone(), Arc::clone(&substream));
if push_live_substream(
&guard.outer,
source_from_live_substream(Arc::clone(&substream)),
)
.is_err()
{
guard.cancel_all();
cancel_live_substream(&substream);
guard.disarm();
return Ok(NotUsed);
}
}
Some(Err(error)) => {
wb_items.clear();
guard.fail_all(error.clone());
guard.disarm();
return Err(error);
}
None => {
group_by_flush_write_batch(
&mut guard,
&mut wb_key,
&mut wb_sub,
&mut wb_items,
allow_closed_substream_recreation,
);
guard.complete_all();
guard.disarm();
return Ok(NotUsed);
}
}
}
group_by_flush_write_batch(
&mut guard,
&mut wb_key,
&mut wb_sub,
&mut wb_items,
allow_closed_substream_recreation,
);
guard.complete_all();
guard.disarm();
Ok(NotUsed)
});
Box::new(LiveSubstreamStream {
shared: outer,
completion: Some(completion),
local_batch: VecDeque::new(),
})
}
#[derive(Clone, Copy, Debug)]
enum SplitMode {
When,
After,
}
#[cfg(test)]
struct SplitWorkerGuard<Out> {
outer: Arc<LiveSubstreamShared<Source<Out>>>,
current: Option<Arc<LiveSubstreamShared<Out>>>,
armed: bool,
pending: VecDeque<Out>,
}
#[cfg(test)]
impl<Out> SplitWorkerGuard<Out> {
fn new(outer: Arc<LiveSubstreamShared<Source<Out>>>) -> Self {
Self {
outer,
current: None,
armed: true,
pending: VecDeque::with_capacity(LIVE_SUBSTREAM_BATCH),
}
}
fn disarm(&mut self) {
self.armed = false;
}
fn open_segment(&mut self) -> Result<(), ()>
where
Out: Send + 'static,
{
let substream = LiveSubstreamShared::new();
self.current = Some(Arc::clone(&substream));
push_live_substream(&self.outer, source_from_live_substream(substream)).map_err(|_| ())
}
fn flush_pending(&mut self) -> Result<(), ()>
where
Out: Send + 'static,
{
if self.pending.is_empty() {
return Ok(());
}
match self.current {
Some(ref current) => push_live_substream_batch(current, &mut self.pending),
None => {
self.pending.clear();
Ok(())
}
}
}
fn push_item(&mut self, item: Out) -> Result<(), ()>
where
Out: Send + 'static,
{
if self.current.is_none() {
return Ok(());
}
self.pending.push_back(item);
if self.pending.len() >= LIVE_SUBSTREAM_BATCH {
self.flush_pending()
} else {
Ok(())
}
}
fn close_segment(&mut self)
where
Out: Send + 'static,
{
let _ = self.flush_pending();
if let Some(current) = self.current.take() {
complete_live_substream(¤t);
}
}
fn fail_current(&mut self, error: StreamError)
where
Out: Send + 'static,
{
self.pending.clear();
if let Some(current) = self.current.take() {
fail_live_substream(¤t, error);
}
}
fn fail_all(&mut self, error: StreamError)
where
Out: Send + 'static,
{
self.fail_current(error.clone());
fail_live_substream(&self.outer, error);
}
fn complete_all(&mut self)
where
Out: Send + 'static,
{
self.close_segment();
complete_live_substream(&self.outer);
}
}
#[cfg(test)]
impl<Out> Drop for SplitWorkerGuard<Out> {
fn drop(&mut self) {
if self.armed {
self.pending.clear();
if let Some(current) = self.current.take() {
fail_live_substream(¤t, StreamError::AbruptTermination);
}
fail_live_substream(&self.outer, StreamError::AbruptTermination);
}
}
}
fn split_streams<Out, F>(
input: BoxStream<Out>,
mode: SplitMode,
predicate: Arc<F>,
materializer: &Materializer,
) -> BoxStream<Source<Out>>
where
Out: Clone + Send + 'static,
F: Fn(&Out) -> bool + Send + Sync + 'static,
{
#[cfg(test)]
if current_substream_mode() == SubstreamExecutorMode::LegacyOnly {
return split_streams_legacy(input, mode, predicate, materializer);
}
let parent_cancelled = Arc::new(AtomicBool::new(false));
split_streams_fast(input, mode, predicate, parent_cancelled, materializer)
}
#[cfg(test)]
fn split_streams_legacy<Out, F>(
mut input: BoxStream<Out>,
mode: SplitMode,
predicate: Arc<F>,
materializer: &Materializer,
) -> BoxStream<Source<Out>>
where
Out: Clone + Send + 'static,
F: Fn(&Out) -> bool + Send + Sync + 'static,
{
let outer = LiveSubstreamShared::new();
let worker_outer = Arc::clone(&outer);
let completion = materializer.spawn_stream(move |cancelled| {
let mut guard = SplitWorkerGuard::new(Arc::clone(&worker_outer));
while !cancelled.load(Ordering::SeqCst) {
if worker_outer.cancelled.load(Ordering::SeqCst) {
guard.disarm();
return Ok(NotUsed);
}
match input.next() {
Some(Ok(item)) => {
let split = match catch_unwind(AssertUnwindSafe(|| predicate(&item))) {
Ok(split) => split,
Err(_panic) => {
guard.fail_all(StreamError::AbruptTermination);
guard.disarm();
return Ok(NotUsed);
}
};
match mode {
SplitMode::When => {
if split && guard.current.is_some() {
guard.close_segment();
}
if guard.current.is_none() && guard.open_segment().is_err() {
guard.disarm();
return Ok(NotUsed);
}
if guard.push_item(item).is_err() {
guard.disarm();
return Ok(NotUsed);
}
}
SplitMode::After => {
if guard.current.is_none() && guard.open_segment().is_err() {
guard.disarm();
return Ok(NotUsed);
}
if guard.push_item(item).is_err() {
guard.disarm();
return Ok(NotUsed);
}
if split {
guard.close_segment();
}
}
}
}
Some(Err(error)) => {
guard.fail_all(error.clone());
guard.disarm();
return Err(error);
}
None => {
guard.complete_all();
guard.disarm();
return Ok(NotUsed);
}
}
}
guard.complete_all();
guard.disarm();
Ok(NotUsed)
});
Box::new(LiveSubstreamStream {
shared: outer,
completion: Some(completion),
local_batch: VecDeque::new(),
})
}
trait SplitConsumer<T: Send + 'static>: Send + 'static {
fn push_item(&mut self, item: T) -> StreamResult<()>;
fn complete(self: Box<Self>);
fn fail(self: Box<Self>, error: StreamError);
}
struct FoldConsumer<T, Acc> {
acc: Option<Acc>,
f: Arc<dyn Fn(Acc, T) -> Acc + Send + Sync + 'static>,
tx: futures::channel::oneshot::Sender<StreamResult<Acc>>,
}
impl<T: Send + 'static, Acc: Send + 'static> SplitConsumer<T> for FoldConsumer<T, Acc> {
fn push_item(&mut self, item: T) -> StreamResult<()> {
let acc = self.acc.take().expect("FoldConsumer: push after done");
self.acc = Some((self.f)(acc, item));
Ok(())
}
fn complete(mut self: Box<Self>) {
let acc = self
.acc
.take()
.expect("FoldConsumer: complete called twice");
let _ = self.tx.send(Ok(acc));
}
fn fail(mut self: Box<Self>, error: StreamError) {
self.acc = None;
let _ = self.tx.send(Err(error));
}
}
struct FoldResultConsumer<T, Acc> {
acc: Option<Acc>,
f: Arc<dyn Fn(Acc, T) -> StreamResult<Acc> + Send + Sync + 'static>,
tx: futures::channel::oneshot::Sender<StreamResult<Acc>>,
}
impl<T: Send + 'static, Acc: Send + 'static> SplitConsumer<T> for FoldResultConsumer<T, Acc> {
fn push_item(&mut self, item: T) -> StreamResult<()> {
let acc = self
.acc
.take()
.expect("FoldResultConsumer: push after done");
match (self.f)(acc, item) {
Ok(new_acc) => {
self.acc = Some(new_acc);
Ok(())
}
Err(e) => Err(e),
}
}
fn complete(mut self: Box<Self>) {
let acc = self
.acc
.take()
.expect("FoldResultConsumer: complete called twice");
let _ = self.tx.send(Ok(acc));
}
fn fail(mut self: Box<Self>, error: StreamError) {
self.acc = None;
let _ = self.tx.send(Err(error));
}
}
struct CollectConsumer<T> {
items: Vec<T>,
tx: futures::channel::oneshot::Sender<StreamResult<Vec<T>>>,
}
impl<T: Send + 'static> SplitConsumer<T> for CollectConsumer<T> {
fn push_item(&mut self, item: T) -> StreamResult<()> {
self.items.push(item);
Ok(())
}
fn complete(self: Box<Self>) {
let _ = self.tx.send(Ok(self.items));
}
fn fail(self: Box<Self>, error: StreamError) {
let _ = self.tx.send(Err(error));
}
}
struct IgnoreConsumer<T> {
tx: futures::channel::oneshot::Sender<StreamResult<NotUsed>>,
_phantom: std::marker::PhantomData<fn(T)>,
}
impl<T: Send + 'static> SplitConsumer<T> for IgnoreConsumer<T> {
fn push_item(&mut self, _item: T) -> StreamResult<()> {
Ok(())
}
fn complete(self: Box<Self>) {
let _ = self.tx.send(Ok(NotUsed));
}
fn fail(self: Box<Self>, error: StreamError) {
let _ = self.tx.send(Err(error));
}
}
struct TerminalDrainCancelGuard<T: 'static> {
hook: Option<Arc<dyn TerminalSourceHookDyn<T>>>,
}
impl<T: 'static> TerminalDrainCancelGuard<T> {
fn new(hook: Arc<dyn TerminalSourceHookDyn<T>>) -> Self {
Self { hook: Some(hook) }
}
fn disarm(&mut self) {
self.hook = None;
}
}
impl<T: 'static> Drop for TerminalDrainCancelGuard<T> {
fn drop(&mut self) {
if let Some(hook) = self.hook.take() {
hook.cancel_terminal();
}
}
}
fn terminal_drain_status<T: 'static>(
hook: &Arc<dyn TerminalSourceHookDyn<T>>,
materializer: &Materializer,
cancelled: &Arc<AtomicBool>,
) -> StreamResult<()> {
if materializer.is_shutdown() {
hook.cancel_terminal();
Err(StreamError::AbruptTermination)
} else if cancelled.load(Ordering::SeqCst) {
hook.cancel_terminal();
Err(StreamError::Cancelled)
} else {
Ok(())
}
}
pub(super) struct FoldDescriptor<T, Acc> {
pub(super) zero: Acc,
pub(super) f: Arc<dyn Fn(Acc, T) -> Acc + Send + Sync + 'static>,
}
impl<T: Send + 'static, Acc: Clone + Send + Sync + 'static> FoldFastPathDyn<T>
for FoldDescriptor<T, Acc>
{
fn try_register(
&self,
hook: Arc<dyn SplitSegmentHookDyn>,
) -> Option<StreamResult<Box<dyn Any + Send>>> {
let hook_any = hook.as_any_arc();
let slot = hook_any.downcast::<SegmentConsumerSlot<T>>().ok()?;
if slot.claimed.swap(true, Ordering::SeqCst) {
return Some(Err(StreamError::Failed(
"substream source cannot be materialized more than once".into(),
)));
}
let (tx, rx) = futures::channel::oneshot::channel::<StreamResult<Acc>>();
{
let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
if let Some(terminal) = state.terminal.take() {
let buffer = std::mem::take(&mut state.buffer);
state.consumer = SegmentConsumer::DirectTaken;
drop(state);
let mut acc = self.zero.clone();
for item in buffer {
acc = (self.f)(acc, item);
}
let result = match terminal {
LiveSubstreamTerminal::Complete => Ok(acc),
LiveSubstreamTerminal::Error(e) => Err(e),
};
let _ = tx.send(result);
let completion = StreamCompletion::from_receiver(rx, None);
return Some(Ok(Box::new(completion)));
}
let consumer: Box<dyn SplitConsumer<T>> = Box::new(FoldConsumer {
acc: Some(self.zero.clone()),
f: Arc::clone(&self.f),
tx,
});
state.consumer = SegmentConsumer::Direct(consumer);
}
slot.available.notify_all();
let completion = StreamCompletion::from_receiver(rx, None);
Some(Ok(Box::new(completion)))
}
fn supports_terminal_drain(&self) -> bool {
true
}
fn try_register_terminal_drain(
&self,
hook: Arc<dyn TerminalSourceHookDyn<T>>,
materializer: &Materializer,
) -> Option<StreamResult<Box<dyn Any + Send>>> {
let zero = self.zero.clone();
let f = Arc::clone(&self.f);
let worker_materializer =
materializer.with_name_prefix(materializer.name_prefix().to_owned());
let completion = materializer.spawn_stream(move |cancelled| {
let mut guard = TerminalDrainCancelGuard::new(Arc::clone(&hook));
let mut acc = Some(zero);
let mut batch = Vec::new();
loop {
let status =
hook.drain_terminal_batch(&worker_materializer, &cancelled, &mut batch)?;
for (index, item) in batch.drain(..).enumerate() {
let previous = acc.take().expect("fold accumulator present");
acc = Some(f(previous, item));
if (index + 1) % 64 == 0 {
terminal_drain_status(&hook, &worker_materializer, &cancelled)?;
}
}
if matches!(status, TerminalSourceStatus::Completed) {
guard.disarm();
return Ok(acc.expect("fold accumulator present"));
}
}
});
Some(Ok(Box::new(completion)))
}
}
pub(super) struct FoldResultDescriptor<T, Acc> {
pub(super) zero: Acc,
pub(super) f: Arc<dyn Fn(Acc, T) -> StreamResult<Acc> + Send + Sync + 'static>,
}
impl<T: Send + 'static, Acc: Clone + Send + Sync + 'static> FoldFastPathDyn<T>
for FoldResultDescriptor<T, Acc>
{
fn try_register(
&self,
hook: Arc<dyn SplitSegmentHookDyn>,
) -> Option<StreamResult<Box<dyn Any + Send>>> {
let hook_any = hook.as_any_arc();
let slot = hook_any.downcast::<SegmentConsumerSlot<T>>().ok()?;
if slot.claimed.swap(true, Ordering::SeqCst) {
return Some(Err(StreamError::Failed(
"substream source cannot be materialized more than once".into(),
)));
}
let (tx, rx) = futures::channel::oneshot::channel::<StreamResult<Acc>>();
{
let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
if let Some(terminal) = state.terminal.take() {
let buffer = std::mem::take(&mut state.buffer);
state.consumer = SegmentConsumer::DirectTaken;
drop(state);
let acc = self.zero.clone();
let result = match terminal {
LiveSubstreamTerminal::Complete => buffer
.into_iter()
.try_fold(acc, |a, item| (self.f)(a, item)),
LiveSubstreamTerminal::Error(e) => Err(e),
};
let _ = tx.send(result);
let completion = StreamCompletion::from_receiver(rx, None);
return Some(Ok(Box::new(completion)));
}
let consumer: Box<dyn SplitConsumer<T>> = Box::new(FoldResultConsumer {
acc: Some(self.zero.clone()),
f: Arc::clone(&self.f),
tx,
});
state.consumer = SegmentConsumer::Direct(consumer);
}
slot.available.notify_all();
let completion = StreamCompletion::from_receiver(rx, None);
Some(Ok(Box::new(completion)))
}
fn supports_terminal_drain(&self) -> bool {
true
}
fn try_register_terminal_drain(
&self,
hook: Arc<dyn TerminalSourceHookDyn<T>>,
materializer: &Materializer,
) -> Option<StreamResult<Box<dyn Any + Send>>> {
let zero = self.zero.clone();
let f = Arc::clone(&self.f);
let worker_materializer =
materializer.with_name_prefix(materializer.name_prefix().to_owned());
let completion = materializer.spawn_stream(move |cancelled| {
let mut guard = TerminalDrainCancelGuard::new(Arc::clone(&hook));
let mut acc = Some(zero);
let mut batch = Vec::new();
loop {
let status =
hook.drain_terminal_batch(&worker_materializer, &cancelled, &mut batch)?;
for (index, item) in batch.drain(..).enumerate() {
let previous = acc.take().expect("fold accumulator present");
match f(previous, item) {
Ok(next) => {
acc = Some(next);
}
Err(error) => return Err(error),
}
if (index + 1) % 64 == 0 {
terminal_drain_status(&hook, &worker_materializer, &cancelled)?;
}
}
if matches!(status, TerminalSourceStatus::Completed) {
guard.disarm();
return Ok(acc.expect("fold accumulator present"));
}
}
});
Some(Ok(Box::new(completion)))
}
}
pub(super) struct CollectDescriptor<T> {
pub(super) _phantom: std::marker::PhantomData<fn(T)>,
}
impl<T: Send + 'static> FoldFastPathDyn<T> for CollectDescriptor<T> {
fn try_register(
&self,
hook: Arc<dyn SplitSegmentHookDyn>,
) -> Option<StreamResult<Box<dyn Any + Send>>> {
let hook_any = hook.as_any_arc();
let slot = hook_any.downcast::<SegmentConsumerSlot<T>>().ok()?;
if slot.claimed.swap(true, Ordering::SeqCst) {
return Some(Err(StreamError::Failed(
"substream source cannot be materialized more than once".into(),
)));
}
let (tx, rx) = futures::channel::oneshot::channel::<StreamResult<Vec<T>>>();
{
let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
if let Some(terminal) = state.terminal.take() {
let buffer = std::mem::take(&mut state.buffer);
state.consumer = SegmentConsumer::DirectTaken;
drop(state);
let items: Vec<T> = buffer.into_iter().collect();
let result = match terminal {
LiveSubstreamTerminal::Complete => Ok(items),
LiveSubstreamTerminal::Error(e) => Err(e),
};
let _ = tx.send(result);
let completion = StreamCompletion::from_receiver(rx, None);
return Some(Ok(Box::new(completion)));
}
let consumer: Box<dyn SplitConsumer<T>> = Box::new(CollectConsumer {
items: Vec::new(),
tx,
});
state.consumer = SegmentConsumer::Direct(consumer);
}
slot.available.notify_all();
let completion = StreamCompletion::from_receiver(rx, None);
Some(Ok(Box::new(completion)))
}
fn supports_terminal_drain(&self) -> bool {
true
}
fn try_register_terminal_drain(
&self,
hook: Arc<dyn TerminalSourceHookDyn<T>>,
materializer: &Materializer,
) -> Option<StreamResult<Box<dyn Any + Send>>> {
let worker_materializer =
materializer.with_name_prefix(materializer.name_prefix().to_owned());
let completion = materializer.spawn_stream(move |cancelled| {
let mut guard = TerminalDrainCancelGuard::new(Arc::clone(&hook));
let mut items = Vec::new();
let mut batch = Vec::new();
loop {
let status =
hook.drain_terminal_batch(&worker_materializer, &cancelled, &mut batch)?;
for (index, item) in batch.drain(..).enumerate() {
items.push(item);
if (index + 1) % 64 == 0 {
terminal_drain_status(&hook, &worker_materializer, &cancelled)?;
}
}
if matches!(status, TerminalSourceStatus::Completed) {
guard.disarm();
return Ok(items);
}
}
});
Some(Ok(Box::new(completion)))
}
}
pub(super) struct IgnoreDescriptor<T> {
pub(super) _phantom: std::marker::PhantomData<fn(T)>,
}
impl<T: Send + 'static> FoldFastPathDyn<T> for IgnoreDescriptor<T> {
fn try_register(
&self,
hook: Arc<dyn SplitSegmentHookDyn>,
) -> Option<StreamResult<Box<dyn Any + Send>>> {
let hook_any = hook.as_any_arc();
let slot = hook_any.downcast::<SegmentConsumerSlot<T>>().ok()?;
if slot.claimed.swap(true, Ordering::SeqCst) {
return Some(Err(StreamError::Failed(
"substream source cannot be materialized more than once".into(),
)));
}
let (tx, rx) = futures::channel::oneshot::channel::<StreamResult<NotUsed>>();
{
let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
if let Some(terminal) = state.terminal.take() {
state.consumer = SegmentConsumer::DirectTaken;
drop(state);
let result = match terminal {
LiveSubstreamTerminal::Complete => Ok(NotUsed),
LiveSubstreamTerminal::Error(e) => Err(e),
};
let _ = tx.send(result);
let completion = StreamCompletion::from_receiver(rx, None);
return Some(Ok(Box::new(completion)));
}
let consumer: Box<dyn SplitConsumer<T>> = Box::new(IgnoreConsumer {
tx,
_phantom: std::marker::PhantomData,
});
state.consumer = SegmentConsumer::Direct(consumer);
}
slot.available.notify_all();
let completion = StreamCompletion::from_receiver(rx, None);
Some(Ok(Box::new(completion)))
}
fn supports_terminal_drain(&self) -> bool {
true
}
fn try_register_terminal_drain(
&self,
hook: Arc<dyn TerminalSourceHookDyn<T>>,
materializer: &Materializer,
) -> Option<StreamResult<Box<dyn Any + Send>>> {
let worker_materializer =
materializer.with_name_prefix(materializer.name_prefix().to_owned());
let completion = materializer.spawn_stream(move |cancelled| {
let mut guard = TerminalDrainCancelGuard::new(Arc::clone(&hook));
let mut batch = Vec::new();
loop {
let status =
hook.drain_terminal_batch(&worker_materializer, &cancelled, &mut batch)?;
batch.clear();
if matches!(status, TerminalSourceStatus::Completed) {
guard.disarm();
return Ok(NotUsed);
}
}
});
Some(Ok(Box::new(completion)))
}
}
enum SegmentConsumer<T: Send + 'static> {
Pending,
Direct(Box<dyn SplitConsumer<T>>),
DirectTaken,
Fallback,
}
struct SegmentSlotState<T: Send + 'static> {
buffer: VecDeque<T>,
consumer: SegmentConsumer<T>,
terminal: Option<LiveSubstreamTerminal>,
}
pub(super) struct SegmentConsumerSlot<T: Send + 'static> {
claimed: Arc<AtomicBool>,
state: Mutex<SegmentSlotState<T>>,
available: Condvar,
parent_cancelled: Arc<AtomicBool>,
}
impl<T: Clone + Send + 'static> SegmentConsumerSlot<T> {
fn new(parent_cancelled: Arc<AtomicBool>) -> Arc<Self> {
Arc::new(Self {
claimed: Arc::new(AtomicBool::new(false)),
state: Mutex::new(SegmentSlotState {
buffer: VecDeque::new(),
consumer: SegmentConsumer::Pending,
terminal: None,
}),
available: Condvar::new(),
parent_cancelled,
})
}
}
impl<T: Clone + Send + 'static> SplitSegmentHookDyn for SegmentConsumerSlot<T> {
fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
self
}
}
impl<T: Clone + Send + 'static> SourceFactory<T, NotUsed> for SegmentConsumerSlot<T> {
fn create(
self: Arc<Self>,
_materializer: &Materializer,
) -> StreamResult<(BoxStream<T>, NotUsed)> {
if self.claimed.swap(true, Ordering::SeqCst) {
return Err(StreamError::Failed(
"substream source cannot be materialized more than once".into(),
));
}
{
let mut state = self.state.lock().unwrap_or_else(|p| p.into_inner());
state.consumer = SegmentConsumer::Fallback;
}
self.available.notify_all();
let stream = FallbackSegmentStream {
slot: Arc::clone(&self),
};
Ok((Box::new(stream), NotUsed))
}
}
struct FallbackSegmentStream<T: Send + 'static> {
slot: Arc<SegmentConsumerSlot<T>>,
}
impl<T: Clone + Send + 'static> Iterator for FallbackSegmentStream<T> {
type Item = StreamResult<T>;
fn next(&mut self) -> Option<Self::Item> {
let mut state = self.slot.state.lock().unwrap_or_else(|p| p.into_inner());
loop {
if let Some(item) = state.buffer.pop_front() {
drop(state);
self.slot.available.notify_all();
return Some(Ok(item));
}
if let Some(terminal) = &state.terminal {
return match terminal {
LiveSubstreamTerminal::Complete => None,
LiveSubstreamTerminal::Error(e) => Some(Err(e.clone())),
};
}
if self.slot.parent_cancelled.load(Ordering::SeqCst) {
return Some(Err(StreamError::AbruptTermination));
}
state = self
.slot
.available
.wait(state)
.unwrap_or_else(|p| p.into_inner());
}
}
}
impl<T: Send + 'static> Drop for FallbackSegmentStream<T> {
fn drop(&mut self) {
let mut state = self.slot.state.lock().unwrap_or_else(|p| p.into_inner());
if state.terminal.is_none() {
state.terminal = Some(LiveSubstreamTerminal::Error(StreamError::Cancelled));
}
drop(state);
self.slot.available.notify_all();
}
}
struct SplitFastWorkerGuard<Out: Send + 'static> {
outer: Arc<LiveSubstreamShared<Source<Out>>>,
current_slot: Option<Arc<SegmentConsumerSlot<Out>>>,
current_consumer: Option<Box<dyn SplitConsumer<Out>>>,
armed: bool,
parent_cancelled: Arc<AtomicBool>,
local_pending: VecDeque<Out>,
}
impl<Out: Clone + Send + 'static> SplitFastWorkerGuard<Out> {
fn new(
outer: Arc<LiveSubstreamShared<Source<Out>>>,
parent_cancelled: Arc<AtomicBool>,
) -> Self {
Self {
outer,
current_slot: None,
current_consumer: None,
armed: true,
parent_cancelled,
local_pending: VecDeque::with_capacity(LIVE_SUBSTREAM_BATCH),
}
}
fn disarm(&mut self) {
self.armed = false;
}
fn open_segment(&mut self) -> Result<(), ()> {
let slot = SegmentConsumerSlot::new(Arc::clone(&self.parent_cancelled));
let factory: Arc<dyn SourceFactory<Out, NotUsed>> =
Arc::clone(&slot) as Arc<dyn SourceFactory<Out, NotUsed>>;
let hook: Arc<dyn SplitSegmentHookDyn> = Arc::clone(&slot) as Arc<dyn SplitSegmentHookDyn>;
let source = Source {
factory,
terminal_factory: None,
hints: SourceHints::default(),
attributes: Attributes::default(),
split_hook: Some(hook),
};
self.current_slot = Some(slot);
push_live_substream(&self.outer, source).map_err(|_| ())
}
fn push_item(&mut self, item: Out) -> Result<(), ()> {
if let Some(ref mut consumer) = self.current_consumer {
if let Err(e) = consumer.push_item(item) {
let c = self.current_consumer.take().unwrap();
c.fail(e);
return Err(());
}
return Ok(());
}
self.local_pending.push_back(item);
if self.local_pending.len() >= LIVE_SUBSTREAM_BATCH {
self.flush_pending()
} else {
Ok(())
}
}
fn flush_pending(&mut self) -> Result<(), ()> {
if self.local_pending.is_empty() {
return Ok(());
}
if let Some(ref mut consumer) = self.current_consumer {
for item in self.local_pending.drain(..) {
if let Err(e) = consumer.push_item(item) {
let c = self.current_consumer.take().unwrap();
c.fail(e);
return Err(());
}
}
return Ok(());
}
let slot = match &self.current_slot {
Some(s) => Arc::clone(s),
None => {
self.local_pending.clear();
return Ok(());
}
};
loop {
if self.parent_cancelled.load(Ordering::SeqCst) {
self.local_pending.clear();
return Err(());
}
let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
if matches!(state.consumer, SegmentConsumer::Direct(_)) {
let consumer =
match std::mem::replace(&mut state.consumer, SegmentConsumer::DirectTaken) {
SegmentConsumer::Direct(c) => c,
_ => unreachable!(),
};
let mut drain_buf = std::mem::take(&mut state.buffer);
drop(state);
let mut consumer_box = consumer;
for item in drain_buf.drain(..) {
if let Err(e) = consumer_box.push_item(item) {
consumer_box.fail(e);
self.local_pending.clear();
return Err(());
}
}
for item in self.local_pending.drain(..) {
if let Err(e) = consumer_box.push_item(item) {
consumer_box.fail(e);
return Err(());
}
}
self.current_consumer = Some(consumer_box);
return Ok(());
}
if matches!(
state.consumer,
SegmentConsumer::Pending | SegmentConsumer::Fallback
) {
let is_fallback = matches!(state.consumer, SegmentConsumer::Fallback);
let cap = LIVE_SUBSTREAM_CAPACITY.saturating_sub(state.buffer.len());
if cap > 0 {
let to_flush = cap.min(self.local_pending.len());
let items: Vec<Out> = self.local_pending.drain(..to_flush).collect();
state.buffer.extend(items);
let has_more = !self.local_pending.is_empty();
drop(state);
if is_fallback {
slot.available.notify_all();
}
if !has_more {
return Ok(());
}
continue;
} else {
state = slot
.available
.wait(state)
.unwrap_or_else(|p| p.into_inner());
continue;
}
}
return Ok(());
}
}
fn close_segment(&mut self) {
let _ = self.flush_pending();
if let Some(consumer) = self.current_consumer.take() {
consumer.complete();
self.current_slot = None;
return;
}
let slot = match self.current_slot.take() {
Some(s) => s,
None => return,
};
if self.parent_cancelled.load(Ordering::SeqCst) {
let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
if state.terminal.is_none() {
state.terminal = Some(LiveSubstreamTerminal::Error(StreamError::AbruptTermination));
}
drop(state);
slot.available.notify_all();
return;
}
let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
match std::mem::replace(&mut state.consumer, SegmentConsumer::DirectTaken) {
SegmentConsumer::Direct(mut consumer_box) => {
let mut drain_buf = std::mem::take(&mut state.buffer);
drop(state);
for item in drain_buf.drain(..) {
if let Err(e) = consumer_box.push_item(item) {
consumer_box.fail(e);
return;
}
}
consumer_box.complete();
}
SegmentConsumer::DirectTaken => {
}
SegmentConsumer::Fallback => {
state.consumer = SegmentConsumer::DirectTaken;
if state.terminal.is_none() {
state.terminal = Some(LiveSubstreamTerminal::Complete);
}
drop(state);
slot.available.notify_all();
}
SegmentConsumer::Pending => {
state.consumer = SegmentConsumer::Pending;
if state.terminal.is_none() {
state.terminal = Some(LiveSubstreamTerminal::Complete);
}
drop(state);
slot.available.notify_all();
}
}
}
fn fail_segment(&mut self, error: StreamError) {
self.local_pending.clear();
if let Some(consumer) = self.current_consumer.take() {
consumer.fail(error);
self.current_slot = None;
return;
}
let slot = match self.current_slot.take() {
Some(s) => s,
None => return,
};
let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
match std::mem::replace(&mut state.consumer, SegmentConsumer::DirectTaken) {
SegmentConsumer::Direct(c) => {
drop(state);
c.fail(error);
}
_ => {
if state.terminal.is_none() {
state.terminal = Some(LiveSubstreamTerminal::Error(error));
}
drop(state);
slot.available.notify_all();
}
}
}
fn fail_all(&mut self, error: StreamError) {
self.fail_segment(error.clone());
fail_live_substream(&self.outer, error);
}
fn complete_all(&mut self) {
self.close_segment();
complete_live_substream(&self.outer);
}
}
impl<Out: Send + 'static> Drop for SplitFastWorkerGuard<Out> {
fn drop(&mut self) {
if self.armed {
self.local_pending.clear(); if let Some(consumer) = self.current_consumer.take() {
consumer.fail(StreamError::AbruptTermination);
} else if let Some(slot) = self.current_slot.take() {
let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
match std::mem::replace(&mut state.consumer, SegmentConsumer::DirectTaken) {
SegmentConsumer::Direct(c) => {
drop(state);
c.fail(StreamError::AbruptTermination);
}
_ => {
if state.terminal.is_none() {
state.terminal =
Some(LiveSubstreamTerminal::Error(StreamError::AbruptTermination));
}
drop(state);
slot.available.notify_all();
}
}
}
fail_live_substream(&self.outer, StreamError::AbruptTermination);
}
}
}
fn split_streams_fast<Out, F>(
mut input: BoxStream<Out>,
mode: SplitMode,
predicate: Arc<F>,
parent_cancelled: Arc<AtomicBool>,
materializer: &Materializer,
) -> BoxStream<Source<Out>>
where
Out: Clone + Send + 'static,
F: Fn(&Out) -> bool + Send + Sync + 'static,
{
let outer = LiveSubstreamShared::new();
let worker_outer = Arc::clone(&outer);
let completion = materializer.spawn_stream(move |cancelled| {
let mut guard =
SplitFastWorkerGuard::new(Arc::clone(&worker_outer), Arc::clone(&parent_cancelled));
while !cancelled.load(Ordering::SeqCst) {
if worker_outer.cancelled.load(Ordering::SeqCst) {
guard.disarm();
return Ok(NotUsed);
}
match input.next() {
Some(Ok(item)) => {
let split = match catch_unwind(AssertUnwindSafe(|| predicate(&item))) {
Ok(s) => s,
Err(_) => {
guard.fail_all(StreamError::AbruptTermination);
guard.disarm();
return Ok(NotUsed);
}
};
match mode {
SplitMode::When => {
if split
&& (guard.current_slot.is_some()
|| guard.current_consumer.is_some())
{
guard.close_segment();
}
if guard.current_slot.is_none()
&& guard.current_consumer.is_none()
&& guard.open_segment().is_err()
{
guard.disarm();
return Ok(NotUsed);
}
if guard.push_item(item).is_err() {
guard.disarm();
return Ok(NotUsed);
}
}
SplitMode::After => {
if guard.current_slot.is_none()
&& guard.current_consumer.is_none()
&& guard.open_segment().is_err()
{
guard.disarm();
return Ok(NotUsed);
}
if guard.push_item(item).is_err() {
guard.disarm();
return Ok(NotUsed);
}
if split {
guard.close_segment();
}
}
}
}
Some(Err(error)) => {
guard.fail_all(error.clone());
guard.disarm();
return Err(error);
}
None => {
guard.complete_all();
guard.disarm();
return Ok(NotUsed);
}
}
}
guard.complete_all();
guard.disarm();
Ok(NotUsed)
});
Box::new(LiveSubstreamStream {
shared: outer,
completion: Some(completion),
local_batch: VecDeque::new(),
})
}
#[cfg(test)]
struct FlatMapMergeShared<T> {
state: Mutex<FlatMapMergeState<T>>,
available: Condvar,
cancelled: Arc<AtomicBool>,
}
#[cfg(test)]
struct FlatMapMergeState<T> {
queued: VecDeque<(usize, T)>,
window: HashMap<usize, usize>,
active_streams: usize,
input_done: bool,
terminal: Option<StreamError>,
}
#[cfg(test)]
impl<T> FlatMapMergeShared<T> {
fn new() -> Arc<Self> {
Arc::new(Self {
state: Mutex::new(FlatMapMergeState {
queued: VecDeque::new(),
window: HashMap::new(),
active_streams: 0,
input_done: false,
terminal: None,
}),
available: Condvar::new(),
cancelled: Arc::new(AtomicBool::new(false)),
})
}
}
#[cfg(test)]
struct FlatMapMergeStream<T> {
shared: Arc<FlatMapMergeShared<T>>,
completion: Option<StreamCompletion<NotUsed>>,
}
#[cfg(test)]
impl<T> Iterator for FlatMapMergeStream<T> {
type Item = StreamResult<T>;
fn next(&mut self) -> Option<Self::Item> {
let mut state = self
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
loop {
if let Some((stream_id, item)) = state.queued.pop_front() {
if let Some(count) = state.window.get_mut(&stream_id) {
*count -= 1;
if *count == 0 {
state.window.remove(&stream_id);
}
}
drop(state);
self.shared.available.notify_all();
return Some(Ok(item));
}
if let Some(error) = state.terminal.clone() {
return Some(Err(error));
}
if state.input_done && state.active_streams == 0 {
return None;
}
state = self
.shared
.available
.wait(state)
.unwrap_or_else(|poison| poison.into_inner());
}
}
}
#[cfg(test)]
impl<T> Drop for FlatMapMergeStream<T> {
fn drop(&mut self) {
self.shared.cancelled.store(true, Ordering::SeqCst);
self.shared.available.notify_all();
let _ = self.completion.take();
}
}
#[cfg(test)]
fn flat_map_merge_register_stream<T>(shared: &Arc<FlatMapMergeShared<T>>) {
let mut state = shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
state.active_streams += 1;
drop(state);
shared.available.notify_all();
}
#[cfg(test)]
fn flat_map_merge_finish_stream<T>(
shared: &Arc<FlatMapMergeShared<T>>,
stream_id: usize,
terminal: Result<(), StreamError>,
) {
let mut state = shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
state.active_streams = state.active_streams.saturating_sub(1);
if let Err(error) = terminal
&& state.terminal.is_none()
{
state.terminal = Some(error);
}
state.window.entry(stream_id).or_default();
drop(state);
shared.available.notify_all();
}
#[cfg(test)]
fn flat_map_merge_push<T>(
shared: &Arc<FlatMapMergeShared<T>>,
stream_id: usize,
item: T,
) -> Result<(), T> {
let mut state = shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
while state.window.get(&stream_id).copied().unwrap_or(0) >= FLAT_MAP_MERGE_SUBSTREAM_WINDOW
&& state.terminal.is_none()
{
if shared.cancelled.load(Ordering::SeqCst) {
return Err(item);
}
state = shared
.available
.wait(state)
.unwrap_or_else(|poison| poison.into_inner());
}
if shared.cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
return Err(item);
}
*state.window.entry(stream_id).or_insert(0) += 1;
let was_empty = state.queued.is_empty();
state.queued.push_back((stream_id, item));
drop(state);
if was_empty {
shared.available.notify_all();
}
Ok(())
}
#[cfg(test)]
struct FlatMapMergeCoordinatorGuard<T> {
shared: Arc<FlatMapMergeShared<T>>,
armed: bool,
}
#[cfg(test)]
impl<T> FlatMapMergeCoordinatorGuard<T> {
fn new(shared: Arc<FlatMapMergeShared<T>>) -> Self {
Self {
shared,
armed: true,
}
}
fn finish(&mut self) {
let mut state = self
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
state.input_done = true;
drop(state);
self.shared.available.notify_all();
self.armed = false;
}
}
#[cfg(test)]
impl<T> Drop for FlatMapMergeCoordinatorGuard<T> {
fn drop(&mut self) {
if self.armed {
let mut state = self
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.terminal.is_none() {
state.terminal = Some(StreamError::AbruptTermination);
}
state.input_done = true;
drop(state);
self.shared.cancelled.store(true, Ordering::SeqCst);
self.shared.available.notify_all();
}
}
}
#[cfg(test)]
struct FlatMapMergeWorkerGuard<T> {
shared: Arc<FlatMapMergeShared<T>>,
stream_id: usize,
armed: bool,
}
#[cfg(test)]
impl<T> FlatMapMergeWorkerGuard<T> {
fn new(shared: Arc<FlatMapMergeShared<T>>, stream_id: usize) -> Self {
Self {
shared,
stream_id,
armed: true,
}
}
fn finish(&mut self, terminal: Result<(), StreamError>) {
flat_map_merge_finish_stream(&self.shared, self.stream_id, terminal);
self.armed = false;
}
}
#[cfg(test)]
impl<T> Drop for FlatMapMergeWorkerGuard<T> {
fn drop(&mut self) {
if self.armed {
flat_map_merge_finish_stream(
&self.shared,
self.stream_id,
Err(StreamError::AbruptTermination),
);
self.shared.cancelled.store(true, Ordering::SeqCst);
}
}
}
#[cfg(test)]
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub(crate) enum SubstreamExecutorMode {
Auto,
LegacyOnly,
ReadyRingOnly,
SplitSinkOnly,
}
#[cfg(test)]
thread_local! {
static SUBSTREAM_EXECUTOR_MODE: std::cell::Cell<SubstreamExecutorMode> =
const { std::cell::Cell::new(SubstreamExecutorMode::Auto) };
}
#[cfg(test)]
pub(crate) fn with_substream_mode<R>(mode: SubstreamExecutorMode, f: impl FnOnce() -> R) -> R {
SUBSTREAM_EXECUTOR_MODE.with(|m| {
let old = m.get();
m.set(mode);
let result = f();
m.set(old);
result
})
}
#[cfg(test)]
fn current_substream_mode() -> SubstreamExecutorMode {
SUBSTREAM_EXECUTOR_MODE.with(|m| m.get())
}
fn flat_map_merge_stream<Out, Next, NextMat, F>(
input: BoxStream<Out>,
breadth: usize,
stage: Arc<F>,
materializer: &Materializer,
) -> BoxStream<Next>
where
Out: Send + 'static,
Next: Send + 'static,
NextMat: Send + 'static,
F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
{
#[cfg(test)]
if current_substream_mode() == SubstreamExecutorMode::LegacyOnly {
return flat_map_merge_stream_legacy(input, breadth, stage, materializer);
}
flat_map_merge_stream_ready(input, breadth, stage, materializer)
}
#[cfg(test)]
fn flat_map_merge_stream_legacy<Out, Next, NextMat, F>(
mut input: BoxStream<Out>,
breadth: usize,
stage: Arc<F>,
materializer: &Materializer,
) -> BoxStream<Next>
where
Out: Send + 'static,
Next: Send + 'static,
NextMat: Send + 'static,
F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
{
let worker_materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
let shared = FlatMapMergeShared::new();
let worker_shared = Arc::clone(&shared);
let completion = materializer.spawn_stream(move |cancelled| {
let mut next_id = 0usize;
let mut guard = FlatMapMergeCoordinatorGuard::new(worker_shared);
let mut workers = HashMap::<usize, StreamCompletion<NotUsed>>::with_capacity(breadth);
while !cancelled.load(Ordering::SeqCst) && !guard.shared.cancelled.load(Ordering::SeqCst) {
workers.retain(|_, completion| completion.try_wait().is_none());
{
let mut state = guard
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
while state.active_streams >= breadth
&& state.terminal.is_none()
&& !cancelled.load(Ordering::SeqCst)
&& !guard.shared.cancelled.load(Ordering::SeqCst)
{
state = guard
.shared
.available
.wait(state)
.unwrap_or_else(|poison| poison.into_inner());
}
if state.terminal.is_some() {
let error = state.terminal.clone().expect("terminal checked above");
drop(state);
guard.finish();
return Err(error);
}
}
match input.next() {
Some(Ok(item)) => {
let source = stage(item);
let (mut stream, _) =
match Arc::clone(&source.factory).create(&worker_materializer) {
Ok(parts) => parts,
Err(error) => {
let mut state = guard
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.terminal.is_none() {
state.terminal = Some(error.clone());
}
drop(state);
guard.shared.available.notify_all();
guard.finish();
return Err(error);
}
};
let stream_id = next_id;
next_id += 1;
flat_map_merge_register_stream(&guard.shared);
let worker_shared = Arc::clone(&guard.shared);
workers.insert(
stream_id,
worker_materializer.spawn_stream(move |inner_cancelled| {
let mut worker_guard =
FlatMapMergeWorkerGuard::new(Arc::clone(&worker_shared), stream_id);
while !inner_cancelled.load(Ordering::SeqCst)
&& !worker_shared.cancelled.load(Ordering::SeqCst)
{
match stream.next() {
Some(Ok(item)) => {
if flat_map_merge_push(&worker_shared, stream_id, item)
.is_err()
{
worker_guard.finish(Ok(()));
return Ok(NotUsed);
}
}
Some(Err(error)) => {
worker_guard.finish(Err(error.clone()));
return Err(error);
}
None => {
worker_guard.finish(Ok(()));
return Ok(NotUsed);
}
}
}
worker_guard.finish(Ok(()));
Ok(NotUsed)
}),
);
}
Some(Err(error)) => {
let mut state = guard
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.terminal.is_none() {
state.terminal = Some(error.clone());
}
drop(state);
guard.shared.available.notify_all();
guard.finish();
return Err(error);
}
None => {
guard.finish();
break;
}
}
}
if guard.armed {
guard.finish();
}
while !cancelled.load(Ordering::SeqCst) && !guard.shared.cancelled.load(Ordering::SeqCst) {
workers.retain(|_, completion| completion.try_wait().is_none());
let state = guard
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.active_streams == 0 {
return Ok(NotUsed);
}
drop(
guard
.shared
.available
.wait(state)
.unwrap_or_else(|poison| poison.into_inner()),
);
}
Ok(NotUsed)
});
Box::new(FlatMapMergeStream {
shared,
completion: Some(completion),
})
}
const FLAT_MAP_MERGE_READY_BATCH: usize = 16;
const FLAT_MAP_MERGE_INLINE_MICRO_MAX: usize = FLAT_MAP_MERGE_READY_BATCH;
struct FlatMapMergeReadyShared<T> {
coordinator: Mutex<FlatMapMergeReadyState<T>>,
available: Condvar,
cancelled: Arc<AtomicBool>,
}
struct FlatMapMergeReadyState<T> {
lanes: HashMap<usize, Arc<FlatMapMergeLane<T>>>,
ready: std::collections::VecDeque<usize>,
queued_items: usize,
active_streams: usize,
input_done: bool,
terminal: Option<StreamError>,
generation: u64,
}
struct FlatMapMergeLane<T> {
state: Mutex<FlatMapMergeLaneState<T>>,
space_available: Condvar,
}
struct FlatMapMergeLaneState<T> {
buffer: std::collections::VecDeque<T>,
in_ready_ring: bool,
publishing: bool,
closed: bool,
}
struct FlatMapMergeReadyStream<T> {
shared: Arc<FlatMapMergeReadyShared<T>>,
completion: Option<StreamCompletion<NotUsed>>,
local_batch: std::collections::VecDeque<T>,
}
impl<T: Send + 'static> Iterator for FlatMapMergeReadyStream<T> {
type Item = StreamResult<T>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(item) = self.local_batch.pop_front() {
return Some(Ok(item));
}
let mut coord = self
.shared
.coordinator
.lock()
.unwrap_or_else(|p| p.into_inner());
loop {
if let Some(error) = coord.terminal.clone() {
return Some(Err(error));
}
if let Some(lane_id) = coord.ready.pop_front() {
let lane = coord.lanes.get(&lane_id).cloned();
drop(coord);
if let Some(lane) = lane {
let mut lane_state = lane.state.lock().unwrap_or_else(|p| p.into_inner());
while lane_state.publishing {
lane_state = lane
.space_available
.wait(lane_state)
.unwrap_or_else(|p| p.into_inner());
}
let drain_n = lane_state.buffer.len().min(FLAT_MAP_MERGE_READY_BATCH);
let mut batch = std::collections::VecDeque::with_capacity(drain_n);
for _ in 0..drain_n {
if let Some(item) = lane_state.buffer.pop_front() {
batch.push_back(item);
}
}
let still_has_items = !lane_state.buffer.is_empty();
let is_closed = lane_state.closed;
if !still_has_items {
lane_state.in_ready_ring = false;
}
drop(lane_state);
if !batch.is_empty() {
lane.space_available.notify_all();
}
let freed = batch.len();
let mut coord = self
.shared
.coordinator
.lock()
.unwrap_or_else(|p| p.into_inner());
coord.queued_items = coord.queued_items.saturating_sub(freed);
if still_has_items {
coord.ready.push_back(lane_id);
} else if is_closed {
coord.lanes.remove(&lane_id);
}
coord.generation += 1;
drop(coord);
self.shared.available.notify_all();
let mut iter = batch.into_iter();
if let Some(first) = iter.next() {
self.local_batch.extend(iter);
return Some(Ok(first));
}
}
coord = self
.shared
.coordinator
.lock()
.unwrap_or_else(|p| p.into_inner());
continue;
}
if coord.terminal.is_none()
&& coord.input_done
&& coord.active_streams == 0
&& coord.queued_items == 0
{
return None;
}
let seen = coord.generation;
coord = self
.shared
.available
.wait_while(coord, |s| {
s.generation == seen
&& s.terminal.is_none()
&& s.ready.is_empty()
&& !(s.input_done && s.active_streams == 0 && s.queued_items == 0)
})
.unwrap_or_else(|p| p.into_inner());
}
}
}
impl<T> Drop for FlatMapMergeReadyStream<T> {
fn drop(&mut self) {
let lanes: Vec<Arc<FlatMapMergeLane<T>>> = {
let mut coord = self
.shared
.coordinator
.lock()
.unwrap_or_else(|p| p.into_inner());
coord.generation += 1;
coord.lanes.values().cloned().collect()
};
self.shared.cancelled.store(true, Ordering::SeqCst);
self.shared.available.notify_all();
for lane in lanes {
lane.space_available.notify_all();
}
let _ = self.completion.take();
}
}
fn finish_lane_ready<T>(
shared: &Arc<FlatMapMergeReadyShared<T>>,
stream_id: usize,
terminal: Result<(), StreamError>,
) {
let is_error = terminal.is_err();
let lane_is_empty = {
let coord = shared.coordinator.lock().unwrap_or_else(|p| p.into_inner());
let lane_opt = coord.lanes.get(&stream_id).cloned();
drop(coord);
if let Some(lane) = lane_opt {
let mut ls = lane.state.lock().unwrap_or_else(|p| p.into_inner());
ls.closed = true;
ls.buffer.is_empty()
} else {
true
}
};
let lanes_to_notify: Vec<Arc<FlatMapMergeLane<T>>> = {
let mut coord = shared.coordinator.lock().unwrap_or_else(|p| p.into_inner());
coord.active_streams = coord.active_streams.saturating_sub(1);
if let Err(ref error) = terminal
&& coord.terminal.is_none()
{
coord.terminal = Some(error.clone());
}
if lane_is_empty {
coord.lanes.remove(&stream_id);
}
coord.generation += 1;
if is_error {
coord.lanes.values().cloned().collect()
} else {
vec![]
}
};
if is_error {
shared.cancelled.store(true, Ordering::SeqCst);
for lane in &lanes_to_notify {
lane.space_available.notify_all();
}
}
shared.available.notify_all();
}
struct FlatMapMergeReadyCoordinatorGuard<T> {
shared: Arc<FlatMapMergeReadyShared<T>>,
armed: bool,
}
impl<T> FlatMapMergeReadyCoordinatorGuard<T> {
fn new(shared: Arc<FlatMapMergeReadyShared<T>>) -> Self {
Self {
shared,
armed: true,
}
}
fn finish(&mut self) {
let mut coord = self
.shared
.coordinator
.lock()
.unwrap_or_else(|p| p.into_inner());
coord.input_done = true;
coord.generation += 1;
drop(coord);
self.shared.available.notify_all();
self.armed = false;
}
}
impl<T> Drop for FlatMapMergeReadyCoordinatorGuard<T> {
fn drop(&mut self) {
if self.armed {
let lanes: Vec<Arc<FlatMapMergeLane<T>>> = {
let mut coord = self
.shared
.coordinator
.lock()
.unwrap_or_else(|p| p.into_inner());
if coord.terminal.is_none() {
coord.terminal = Some(StreamError::AbruptTermination);
}
coord.input_done = true;
coord.generation += 1;
coord.lanes.values().cloned().collect()
};
self.shared.cancelled.store(true, Ordering::SeqCst);
self.shared.available.notify_all();
for lane in lanes {
lane.space_available.notify_all();
}
}
}
}
struct FlatMapMergeReadyWorkerGuard<T> {
shared: Arc<FlatMapMergeReadyShared<T>>,
stream_id: usize,
armed: bool,
}
impl<T> FlatMapMergeReadyWorkerGuard<T> {
fn new(shared: Arc<FlatMapMergeReadyShared<T>>, stream_id: usize) -> Self {
Self {
shared,
stream_id,
armed: true,
}
}
fn finish(&mut self, terminal: Result<(), StreamError>) {
finish_lane_ready(&self.shared, self.stream_id, terminal);
self.armed = false;
}
}
impl<T> Drop for FlatMapMergeReadyWorkerGuard<T> {
fn drop(&mut self) {
if self.armed {
finish_lane_ready(
&self.shared,
self.stream_id,
Err(StreamError::AbruptTermination),
);
self.shared.cancelled.store(true, Ordering::SeqCst);
}
}
}
fn publish_ready_batch<T>(
shared: &Arc<FlatMapMergeReadyShared<T>>,
stream_id: usize,
lane: &Arc<FlatMapMergeLane<T>>,
batch: std::collections::VecDeque<T>,
) {
let batch_len = batch.len();
if batch_len == 0 {
return;
}
let was_not_in_ready = {
let mut ls = lane.state.lock().unwrap_or_else(|p| p.into_inner());
let was_not = !ls.in_ready_ring;
ls.publishing = true;
ls.buffer.extend(batch);
if !ls.buffer.is_empty() {
ls.in_ready_ring = true;
}
was_not
};
{
let mut coord = shared.coordinator.lock().unwrap_or_else(|p| p.into_inner());
coord.queued_items += batch_len;
if was_not_in_ready {
coord.ready.push_back(stream_id);
}
coord.generation += 1;
}
{
let mut ls = lane.state.lock().unwrap_or_else(|p| p.into_inner());
ls.publishing = false;
}
lane.space_available.notify_all();
shared.available.notify_all();
}
struct FlatMapMergeReadyInlineGuard<T> {
shared: Arc<FlatMapMergeReadyShared<T>>,
stream_id: usize,
armed: bool,
}
impl<T> FlatMapMergeReadyInlineGuard<T> {
fn new(shared: Arc<FlatMapMergeReadyShared<T>>, stream_id: usize) -> Self {
Self {
shared,
stream_id,
armed: true,
}
}
fn finish(&mut self, terminal: Result<(), StreamError>) {
finish_lane_ready(&self.shared, self.stream_id, terminal);
self.armed = false;
}
fn hand_off(&mut self) {
self.armed = false;
}
}
impl<T> Drop for FlatMapMergeReadyInlineGuard<T> {
fn drop(&mut self) {
if self.armed {
finish_lane_ready(
&self.shared,
self.stream_id,
Err(StreamError::AbruptTermination),
);
}
}
}
fn flat_map_merge_stream_ready<Out, Next, NextMat, F>(
mut input: BoxStream<Out>,
breadth: usize,
stage: Arc<F>,
materializer: &Materializer,
) -> BoxStream<Next>
where
Out: Send + 'static,
Next: Send + 'static,
NextMat: Send + 'static,
F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
{
let worker_materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
let shared: Arc<FlatMapMergeReadyShared<Next>> = Arc::new(FlatMapMergeReadyShared {
coordinator: Mutex::new(FlatMapMergeReadyState {
lanes: HashMap::new(),
ready: std::collections::VecDeque::new(),
queued_items: 0,
active_streams: 0,
input_done: false,
terminal: None,
generation: 0,
}),
available: Condvar::new(),
cancelled: Arc::new(AtomicBool::new(false)),
});
let worker_shared = Arc::clone(&shared);
let completion = materializer.spawn_stream(move |cancelled| {
let mut next_id = 0usize;
let mut guard = FlatMapMergeReadyCoordinatorGuard::new(worker_shared);
let mut workers = HashMap::<usize, StreamCompletion<NotUsed>>::with_capacity(breadth);
while !cancelled.load(Ordering::SeqCst) && !guard.shared.cancelled.load(Ordering::SeqCst) {
workers.retain(|_, c| c.try_wait().is_none());
{
let mut coord = guard
.shared
.coordinator
.lock()
.unwrap_or_else(|p| p.into_inner());
loop {
if coord.terminal.is_some()
|| coord.active_streams < breadth
|| cancelled.load(Ordering::SeqCst)
|| guard.shared.cancelled.load(Ordering::SeqCst)
{
break;
}
let seen = coord.generation;
coord = guard
.shared
.available
.wait_while(coord, |s| {
s.generation == seen
&& s.terminal.is_none()
&& s.active_streams >= breadth
&& !cancelled.load(Ordering::SeqCst)
&& !guard.shared.cancelled.load(Ordering::SeqCst)
})
.unwrap_or_else(|p| p.into_inner());
}
if coord.terminal.is_some() {
let error = coord.terminal.clone().expect("terminal checked");
drop(coord);
guard.finish();
return Err(error);
}
}
match input.next() {
Some(Ok(item)) => {
let source = stage(item);
let inline_hint = source.hints.inline_micro;
let (stream, _) = match Arc::clone(&source.factory).create(&worker_materializer)
{
Ok(parts) => parts,
Err(error) => {
let lanes: Vec<Arc<FlatMapMergeLane<Next>>> = {
let mut coord = guard
.shared
.coordinator
.lock()
.unwrap_or_else(|p| p.into_inner());
if coord.terminal.is_none() {
coord.terminal = Some(error.clone());
}
coord.generation += 1;
coord.lanes.values().cloned().collect()
};
guard.shared.cancelled.store(true, Ordering::SeqCst);
guard.shared.available.notify_all();
for lane in lanes {
lane.space_available.notify_all();
}
guard.finish();
return Err(error);
}
};
let stream_id = next_id;
next_id += 1;
let mut stream = stream;
let lane: Arc<FlatMapMergeLane<Next>> = Arc::new(FlatMapMergeLane {
state: Mutex::new(FlatMapMergeLaneState {
buffer: std::collections::VecDeque::new(),
in_ready_ring: false,
publishing: false,
closed: false,
}),
space_available: Condvar::new(),
});
{
let mut coord = guard
.shared
.coordinator
.lock()
.unwrap_or_else(|p| p.into_inner());
coord.lanes.insert(stream_id, Arc::clone(&lane));
coord.active_streams += 1;
}
let mut inline_guard =
FlatMapMergeReadyInlineGuard::new(Arc::clone(&guard.shared), stream_id);
let is_inline = inline_hint
.is_some_and(|h| h.max_success_items <= FLAT_MAP_MERGE_INLINE_MICRO_MAX);
if is_inline {
let max_items = inline_hint.expect("checked").max_success_items;
if guard.shared.cancelled.load(Ordering::SeqCst)
|| cancelled.load(Ordering::SeqCst)
{
inline_guard.finish(Ok(()));
continue;
}
let max_pulls = max_items.saturating_add(1);
let mut local_batch = std::collections::VecDeque::with_capacity(max_items);
let mut terminal_result: Option<Result<(), StreamError>> = None;
for _ in 0..max_pulls {
if guard.shared.cancelled.load(Ordering::SeqCst)
|| cancelled.load(Ordering::SeqCst)
{
break;
}
match stream.next() {
Some(Ok(item)) => local_batch.push_back(item),
Some(Err(e)) => {
terminal_result = Some(Err(e));
break;
}
None => {
terminal_result = Some(Ok(()));
break;
}
}
}
if guard.shared.cancelled.load(Ordering::SeqCst)
|| cancelled.load(Ordering::SeqCst)
{
inline_guard.finish(Ok(()));
continue;
}
publish_ready_batch(&guard.shared, stream_id, &lane, local_batch);
match terminal_result {
Some(Err(e)) => {
inline_guard.finish(Err(e));
}
Some(Ok(())) | None => {
inline_guard.finish(Ok(()));
}
}
} else {
inline_guard.hand_off();
let worker_shared = Arc::clone(&guard.shared);
let worker_lane = Arc::clone(&lane);
workers.insert(
stream_id,
worker_materializer.spawn_stream(move |inner_cancelled| {
let mut worker_guard = FlatMapMergeReadyWorkerGuard::new(
Arc::clone(&worker_shared),
stream_id,
);
loop {
if inner_cancelled.load(Ordering::SeqCst)
|| worker_shared.cancelled.load(Ordering::SeqCst)
{
worker_guard.finish(Ok(()));
return Ok(NotUsed);
}
let capacity;
{
let mut ls = worker_lane
.state
.lock()
.unwrap_or_else(|p| p.into_inner());
while ls.buffer.len() >= FLAT_MAP_MERGE_SUBSTREAM_WINDOW
&& !worker_shared.cancelled.load(Ordering::SeqCst)
&& !ls.closed
&& !inner_cancelled.load(Ordering::SeqCst)
{
ls = worker_lane
.space_available
.wait(ls)
.unwrap_or_else(|p| p.into_inner());
}
if worker_shared.cancelled.load(Ordering::SeqCst)
|| ls.closed
|| inner_cancelled.load(Ordering::SeqCst)
{
drop(ls);
worker_guard.finish(Ok(()));
return Ok(NotUsed);
}
capacity =
FLAT_MAP_MERGE_SUBSTREAM_WINDOW - ls.buffer.len();
}
let batch_size = capacity.min(FLAT_MAP_MERGE_READY_BATCH);
let mut local_batch =
std::collections::VecDeque::with_capacity(batch_size);
let mut terminal_result: Option<Result<(), StreamError>> = None;
for _ in 0..batch_size {
if inner_cancelled.load(Ordering::SeqCst)
|| worker_shared.cancelled.load(Ordering::SeqCst)
{
break;
}
match stream.next() {
Some(Ok(item)) => local_batch.push_back(item),
Some(Err(e)) => {
terminal_result = Some(Err(e));
break;
}
None => {
terminal_result = Some(Ok(()));
break;
}
}
}
let batch_len = local_batch.len();
publish_ready_batch(
&worker_shared,
stream_id,
&worker_lane,
local_batch,
);
match terminal_result {
Some(Ok(())) => {
worker_guard.finish(Ok(()));
return Ok(NotUsed);
}
Some(Err(e)) => {
worker_guard.finish(Err(e.clone()));
return Err(e);
}
None => {
if batch_len == 0 {
worker_guard.finish(Ok(()));
return Ok(NotUsed);
}
}
}
}
}),
);
}
}
Some(Err(error)) => {
let lanes: Vec<Arc<FlatMapMergeLane<Next>>> = {
let mut coord = guard
.shared
.coordinator
.lock()
.unwrap_or_else(|p| p.into_inner());
if coord.terminal.is_none() {
coord.terminal = Some(error.clone());
}
coord.generation += 1;
coord.lanes.values().cloned().collect()
};
guard.shared.cancelled.store(true, Ordering::SeqCst);
guard.shared.available.notify_all();
for lane in lanes {
lane.space_available.notify_all();
}
guard.finish();
return Err(error);
}
None => {
guard.finish();
break;
}
}
}
if guard.armed {
guard.finish();
}
while !cancelled.load(Ordering::SeqCst) && !guard.shared.cancelled.load(Ordering::SeqCst) {
workers.retain(|_, c| c.try_wait().is_none());
let coord = guard
.shared
.coordinator
.lock()
.unwrap_or_else(|p| p.into_inner());
if coord.active_streams == 0 {
return Ok(NotUsed);
}
let seen = coord.generation;
drop(
guard
.shared
.available
.wait_while(coord, |s| s.generation == seen && s.active_streams > 0)
.unwrap_or_else(|p| p.into_inner()),
);
}
Ok(NotUsed)
});
Box::new(FlatMapMergeReadyStream {
shared,
completion: Some(completion),
local_batch: std::collections::VecDeque::new(),
})
}
fn flat_map_concat_stream<Out, Next, NextMat, F>(
mut input: BoxStream<Out>,
stage: Arc<F>,
materializer: &Materializer,
) -> BoxStream<Next>
where
Out: Send + 'static,
Next: Send + 'static,
NextMat: Send + 'static,
F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
{
let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
let mut current: Option<BoxStream<Next>> = None;
Box::new(std::iter::from_fn(move || {
loop {
if let Some(stream) = current.as_mut() {
match stream.next() {
Some(item) => return Some(item),
None => current = None,
}
}
match input.next() {
Some(Ok(item)) => {
let source = stage(item);
current = Some(match Arc::clone(&source.factory).create(&materializer) {
Ok((stream, _)) => stream,
Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
});
}
Some(Err(error)) => return Some(Err(error)),
None => return None,
}
}
}))
}
fn map_async_unordered<Out, Next, F, Fut>(
mut input: BoxStream<Out>,
parallelism: usize,
stage: Arc<F>,
) -> BoxStream<Next>
where
Out: Send + 'static,
Next: Send + 'static,
F: Fn(Out) -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Next>> + Send + 'static,
{
let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<Next>)>();
let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
let mut next_task_id = 0_usize;
let mut input_done = false;
Box::new(std::iter::from_fn(move || {
loop {
while tasks.len() < parallelism && !input_done {
match input.next() {
Some(Ok(item)) => match poll_once_or_pending(stage(item)) {
Ok(result) => return Some(result),
Err(future) => {
let task_id = next_task_id;
next_task_id += 1;
tasks.insert(
task_id,
spawn_completion_task(task_id, future, sender.clone(), |result| {
result
}),
);
}
},
Some(Err(error)) => {
input_done = true;
return Some(Err(error));
}
None => input_done = true,
}
}
if tasks.is_empty() {
return None;
}
if let Some((task_id, result)) = recv_completion(&receiver) {
tasks.remove(&task_id);
return Some(result);
}
}
}))
}
fn map_async_unordered_supervised<Out, Next, F, Fut>(
mut input: BoxStream<Out>,
parallelism: usize,
stage: Arc<F>,
decider: SupervisionDecider,
) -> BoxStream<Next>
where
Out: Send + 'static,
Next: Send + 'static,
F: Fn(Out) -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Next>> + Send + 'static,
{
let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<Next>)>();
let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
let mut next_task_id = 0_usize;
let mut input_done = false;
Box::new(std::iter::from_fn(move || {
loop {
while tasks.len() < parallelism && !input_done {
match input.next() {
Some(Ok(item)) => {
match catch_unwind(AssertUnwindSafe(|| poll_once_or_pending(stage(item)))) {
Ok(Ok(result)) => {
if let Some(result) = supervise_async_result(result, &decider) {
return Some(result);
}
}
Ok(Err(future)) => {
let task_id = next_task_id;
next_task_id += 1;
tasks.insert(
task_id,
spawn_completion_task(
task_id,
future,
sender.clone(),
|result| result,
),
);
}
Err(_) => {
let error = panic_stream_error("map_async_unordered callback");
if let Some(result) = supervise_async_result(Err(error), &decider) {
return Some(result);
}
}
}
}
Some(Err(error)) => {
input_done = true;
return Some(Err(error));
}
None => input_done = true,
}
}
if tasks.is_empty() {
return None;
}
if let Some((task_id, result)) = recv_completion(&receiver) {
tasks.remove(&task_id);
if let Some(result) = supervise_async_result(result, &decider) {
return Some(result);
}
}
}
}))
}
#[inline(always)]
fn map_async_partitioned_serial<Out, Key, Next, Partition, F, Fut>(
mut input: BoxStream<Out>,
partition: Arc<Partition>,
stage: Arc<F>,
) -> BoxStream<Next>
where
Out: Send + 'static,
Key: Send + 'static,
Next: Send + 'static,
Partition: Fn(&Out) -> Key + Send + Sync + 'static,
F: Fn(Out) -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Next>> + Send + 'static,
{
let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<Next>)>();
let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(1);
let mut next_index = 0_usize;
Box::new(std::iter::from_fn(move || {
if !tasks.is_empty()
&& let Some((task_id, result)) = recv_completion(&receiver)
{
tasks.remove(&task_id);
return Some(result);
}
let item = input.next()?;
match item {
Ok(item) => {
let _ = partition(&item);
let index = next_index;
next_index += 1;
Some(match poll_once_or_pending(stage(item)) {
Ok(result) => result,
Err(future) => {
tasks.insert(
index,
spawn_completion_task(index, future, sender.clone(), |result| result),
);
let (task_id, result) =
recv_completion(&receiver).expect("pending map_async task completion");
tasks.remove(&task_id);
result
}
})
}
Err(error) => Some(Err(error)),
}
}))
}
#[inline(always)]
fn map_async_partitioned_scanning<Out, Key, Next, Partition, F, Fut>(
mut input: BoxStream<Out>,
parallelism: usize,
per_partition: usize,
partition: Arc<Partition>,
stage: Arc<F>,
) -> BoxStream<Next>
where
Out: Send + 'static,
Key: Clone + Eq + Hash + Send + 'static,
Next: Send + 'static,
Partition: Fn(&Out) -> Key + Send + Sync + 'static,
F: Fn(Out) -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Next>> + Send + 'static,
{
let (sender, receiver) = std::sync::mpsc::channel::<(usize, (Key, StreamResult<Next>))>();
let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
let mut active_by_key = HashMap::<Key, usize>::with_capacity(parallelism);
let mut pending = VecDeque::<(usize, Key, Out)>::with_capacity(parallelism);
let mut completed = BTreeMap::<usize, StreamResult<Next>>::new();
let mut next_index = 0_usize;
let mut next_to_emit = 0_usize;
let mut input_done = false;
Box::new(std::iter::from_fn(move || {
loop {
if let Some(result) = completed.remove(&next_to_emit) {
next_to_emit += 1;
return Some(result);
}
while tasks.len() + completed.len() < parallelism {
let next = pending
.iter()
.position(|(_, key, _)| {
active_by_key.get(key).copied().unwrap_or(0) < per_partition
})
.and_then(|index| pending.remove(index))
.or_else(|| {
if input_done {
return None;
}
match input.next() {
Some(Ok(item)) => {
let key = partition(&item);
let index = next_index;
next_index += 1;
Some((index, key, item))
}
Some(Err(error)) => {
completed.insert(next_index, Err(error));
next_index += 1;
input_done = true;
None
}
None => {
input_done = true;
None
}
}
});
let Some((index, key, item)) = next else {
break;
};
if active_by_key.get(&key).copied().unwrap_or(0) >= per_partition {
pending.push_back((index, key, item));
if input_done || pending.len() >= parallelism {
break;
}
continue;
}
*active_by_key.entry(key.clone()).or_default() += 1;
match poll_once_or_pending(stage(item)) {
Ok(result) => {
if let Some(count) = active_by_key.get_mut(&key) {
*count -= 1;
if *count == 0 {
active_by_key.remove(&key);
}
}
completed.insert(index, result);
}
Err(future) => {
tasks.insert(
index,
spawn_completion_task(index, future, sender.clone(), move |result| {
(key, result)
}),
);
}
}
}
if let Some(result) = completed.remove(&next_to_emit) {
next_to_emit += 1;
return Some(result);
}
if tasks.is_empty() {
return None;
}
if let Some((index, (key, result))) = recv_completion(&receiver) {
tasks.remove(&index);
if let Some(count) = active_by_key.get_mut(&key) {
*count -= 1;
if *count == 0 {
active_by_key.remove(&key);
}
}
if index == next_to_emit {
next_to_emit += 1;
return Some(result);
}
completed.insert(index, result);
}
}
}))
}
fn map_async_partitioned<Out, Key, Next, Partition, F, Fut>(
mut input: BoxStream<Out>,
parallelism: usize,
per_partition: usize,
partition: Arc<Partition>,
stage: Arc<F>,
) -> BoxStream<Next>
where
Out: Send + 'static,
Key: Clone + Eq + Hash + Send + 'static,
Next: Send + 'static,
Partition: Fn(&Out) -> Key + Send + Sync + 'static,
F: Fn(Out) -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Next>> + Send + 'static,
{
if parallelism == 1 {
return map_async_partitioned_serial(input, partition, stage);
}
if parallelism <= 4 {
return map_async_partitioned_scanning(input, parallelism, per_partition, partition, stage);
}
let (sender, receiver) = std::sync::mpsc::channel::<(usize, (usize, StreamResult<Next>))>();
let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
let mut slots_by_key = HashMap::<Key, usize>::with_capacity(parallelism);
let mut slots = Vec::<PartitionSlot<Key, Out>>::with_capacity(parallelism);
let mut free_slots = Vec::<usize>::new();
let mut ready_slots = VecDeque::<usize>::with_capacity(parallelism);
let mut completed = BTreeMap::<usize, StreamResult<Next>>::new();
let mut next_index = 0_usize;
let mut next_to_emit = 0_usize;
let mut input_done = false;
Box::new(std::iter::from_fn(move || {
loop {
if let Some(result) = completed.remove(&next_to_emit) {
next_to_emit += 1;
return Some(result);
}
while tasks.len() + completed.len() < parallelism {
if let Some((index, slot, item)) =
pop_ready_partition_slot(&mut slots, &mut ready_slots, per_partition)
{
match poll_once_or_pending(stage(item)) {
Ok(result) => {
let mut remove_empty = false;
if let Some(state) = slots.get_mut(slot) {
state.active -= 1;
remove_empty = state.active == 0
&& state.queued.is_empty()
&& !state.in_ready_queue;
}
if remove_empty {
retire_partition_slot(
slot,
&mut slots_by_key,
&mut slots,
&mut free_slots,
);
} else {
ready_partition_slot(
&mut slots,
&mut ready_slots,
slot,
per_partition,
);
}
if index == next_to_emit {
next_to_emit += 1;
return Some(result);
}
completed.insert(index, result);
}
Err(future) => {
tasks.insert(
index,
spawn_completion_task(
index,
future,
sender.clone(),
move |result| (slot, result),
),
);
}
}
continue;
}
if input_done {
break;
}
match input.next() {
Some(Ok(item)) => {
let key = partition(&item);
let index = next_index;
next_index += 1;
let slot =
partition_slot_for(key, &mut slots_by_key, &mut slots, &mut free_slots);
let state = &mut slots[slot];
if state.active < per_partition {
match poll_once_or_pending(stage(item)) {
Ok(result) => {
if index == next_to_emit {
next_to_emit += 1;
if state.queued.is_empty() && !state.in_ready_queue {
retire_partition_slot(
slot,
&mut slots_by_key,
&mut slots,
&mut free_slots,
);
}
return Some(result);
}
completed.insert(index, result);
if state.queued.is_empty() && !state.in_ready_queue {
retire_partition_slot(
slot,
&mut slots_by_key,
&mut slots,
&mut free_slots,
);
}
}
Err(future) => {
state.active += 1;
tasks.insert(
index,
spawn_completion_task(
index,
future,
sender.clone(),
move |result| (slot, result),
),
);
}
}
} else {
state.queued.push_back((index, item));
}
}
Some(Err(error)) => {
completed.insert(next_index, Err(error));
next_index += 1;
input_done = true;
break;
}
None => {
input_done = true;
break;
}
}
}
if let Some(result) = completed.remove(&next_to_emit) {
next_to_emit += 1;
return Some(result);
}
if tasks.is_empty() {
return None;
}
if let Some((index, (slot, result))) = recv_completion(&receiver) {
tasks.remove(&index);
let mut remove_empty = false;
if let Some(state) = slots.get_mut(slot) {
state.active -= 1;
remove_empty =
state.active == 0 && state.queued.is_empty() && !state.in_ready_queue;
}
if remove_empty {
retire_partition_slot(slot, &mut slots_by_key, &mut slots, &mut free_slots);
} else {
ready_partition_slot(&mut slots, &mut ready_slots, slot, per_partition);
}
if index == next_to_emit {
next_to_emit += 1;
return Some(result);
}
completed.insert(index, result);
}
}
}))
}
#[cfg(test)]
mod flat_map_merge_ready_ring_tests {
use super::*;
use std::sync::mpsc;
use std::time::Duration;
fn run_sorted<T: Ord + Send + 'static>(source: crate::Source<T>) -> Vec<T> {
let mut v = source.run_collect().unwrap();
v.sort_unstable();
v
}
#[test]
fn ready_ring_empty_upstream() {
let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, || {
run_sorted(crate::Source::<i32>::empty().flat_map_merge(4, crate::Source::single))
});
let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
run_sorted(crate::Source::<i32>::empty().flat_map_merge(4, crate::Source::single))
});
assert_eq!(legacy, ring);
assert!(ring.is_empty());
}
#[test]
fn ready_ring_single_lane() {
let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, || {
run_sorted(crate::Source::single(42_i32).flat_map_merge(4, crate::Source::single))
});
let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
run_sorted(crate::Source::single(42_i32).flat_map_merge(4, crate::Source::single))
});
assert_eq!(legacy, ring);
assert_eq!(ring, vec![42]);
}
#[test]
fn ready_ring_breadth_one_exact_order() {
let make = || {
crate::Source::from_iter(0_i32..5)
.flat_map_merge(1, |x| crate::Source::single(x * 10))
.run_collect()
.unwrap()
};
let mut legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
let mut ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
legacy.sort_unstable();
ring.sort_unstable();
assert_eq!(legacy, ring);
}
#[test]
fn ready_ring_breadth_gt_input() {
let make = || {
run_sorted(
crate::Source::from_iter(0_i32..3)
.flat_map_merge(100, |x| crate::Source::from_iter([x, x + 1, x + 2])),
)
};
let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
assert_eq!(legacy, ring);
assert_eq!(ring.len(), 9);
}
#[test]
fn ready_ring_mixed_short_long() {
let make = || {
run_sorted(crate::Source::from_iter(0_i32..8).flat_map_merge(4, |x| {
if x % 3 == 0 {
crate::Source::from_iter(0..20_i32).map(move |i| x * 100 + i)
} else {
crate::Source::single(x)
}
}))
};
let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
assert_eq!(legacy, ring);
}
#[test]
fn ready_ring_respects_breadth_bound() {
use std::sync::atomic::{AtomicUsize, Ordering as Ord};
let active = Arc::new(AtomicUsize::new(0));
let max_active = Arc::new(AtomicUsize::new(0));
let a2 = Arc::clone(&active);
let m2 = Arc::clone(&max_active);
let mut values = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
crate::Source::from_iter(0_i32..6)
.flat_map_merge(2, move |item| {
let a = Arc::clone(&a2);
let m = Arc::clone(&m2);
crate::Source::future(move || {
let a = Arc::clone(&a);
let m = Arc::clone(&m);
async move {
let now = a.fetch_add(1, Ord::SeqCst) + 1;
let mut seen = m.load(Ord::SeqCst);
while now > seen {
match m.compare_exchange(seen, now, Ord::SeqCst, Ord::SeqCst) {
Ok(_) => break,
Err(v) => seen = v,
}
}
thread::sleep(Duration::from_millis(20));
a.fetch_sub(1, Ord::SeqCst);
Ok(item)
}
})
})
.run_collect()
.unwrap()
});
values.sort_unstable();
assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
assert!(max_active.load(std::sync::atomic::Ordering::SeqCst) <= 2);
}
#[test]
fn ready_ring_fairness_slow_lane_not_starved() {
use std::sync::atomic::{AtomicBool, Ordering as Ord};
let slow_emitted = Arc::new(AtomicBool::new(false));
let slow_flag = Arc::clone(&slow_emitted);
let results = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
crate::Source::from_iter(0_i32..3)
.flat_map_merge(4, move |lane_id| {
let flag = Arc::clone(&slow_flag);
match lane_id {
0 => crate::Source::from_iter(0_i32..50),
1 => crate::Source::from_iter(100_i32..150),
_ => crate::Source::future(move || {
let flag = Arc::clone(&flag);
async move {
thread::sleep(Duration::from_millis(10));
flag.store(true, Ord::SeqCst);
Ok(999_i32)
}
}),
}
})
.run_collect()
.unwrap()
});
assert!(slow_emitted.load(std::sync::atomic::Ordering::SeqCst));
assert!(results.contains(&999));
assert_eq!(results.len(), 101);
}
#[test]
fn ready_ring_inner_failure_no_hang() {
let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
crate::Source::from_iter(0_i32..8)
.flat_map_merge(4, |x| {
if x == 3 {
crate::Source::failed(StreamError::Failed("lane-fail".into()))
} else {
crate::Source::from_iter(0_i32..10)
}
})
.run_collect()
});
assert_eq!(result, Err(StreamError::Failed("lane-fail".into())));
}
#[test]
fn ready_ring_factory_failure_propagates() {
let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
crate::Source::from_iter(0_i32..4)
.flat_map_merge(2, |x| {
if x == 2 {
crate::Source::failed(StreamError::Failed("factory-fail".into()))
} else {
crate::Source::single(x)
}
})
.run_collect()
});
assert!(result.is_err());
}
#[test]
fn ready_ring_closure_not_under_coordinator_lock() {
let guard_mutex = Arc::new(std::sync::Mutex::<()>::new(()));
let gm = Arc::clone(&guard_mutex);
let results = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
crate::Source::from_iter(0_i32..10)
.flat_map_merge(4, move |x| {
let _lock = gm.lock().unwrap();
crate::Source::single(x)
})
.run_collect()
.unwrap()
});
assert_eq!(results.len(), 10);
}
#[test]
fn ready_ring_bounded_memory_producer_blocks_at_window() {
const WINDOW: usize = FLAT_MAP_MERGE_SUBSTREAM_WINDOW;
const EXTRA: usize = 4;
let (gate_tx, gate_rx) = mpsc::channel::<()>();
let gate_tx = Arc::new(std::sync::Mutex::new(gate_tx));
let gate_tx2 = Arc::clone(&gate_tx);
let produced = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let prod2 = Arc::clone(&produced);
let queue = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
crate::Source::single(0_i32)
.flat_map_merge(2, move |_| {
let tx = Arc::clone(&gate_tx2);
let prod = Arc::clone(&prod2);
crate::Source::from_factory(move || {
let tx = Arc::clone(&tx);
let prod = Arc::clone(&prod);
let mut i = 0_i32;
Box::new(std::iter::from_fn(move || {
if i as usize >= WINDOW + EXTRA {
return None;
}
if i as usize == WINDOW {
let _ = tx.lock().unwrap().send(());
}
prod.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
i += 1;
Some(Ok(i))
}))
})
})
.run_with(crate::Sink::queue())
.unwrap()
});
let mut total = 0;
while queue.pull().unwrap().is_some() {
total += 1;
}
let signal = gate_rx.recv_timeout(Duration::from_secs(1));
assert!(signal.is_ok(), "producer never reached the window boundary");
assert_eq!(total, WINDOW + EXTRA);
}
#[test]
fn ready_ring_cancellation_wakes_blocked_lanes() {
let rt = crate::stream::runtime::Runtime::new();
let queue = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
crate::Source::from_iter(0_i32..4)
.flat_map_merge(4, |_| crate::Source::repeat(1_i32))
.run_with_materializer(crate::Sink::queue(), &rt)
.unwrap()
});
for _ in 0..8 {
let _ = queue.pull();
}
drop(queue);
rt.shutdown();
}
#[test]
fn ready_ring_lost_wakeup_stress() {
for _ in 0..20 {
let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
crate::Source::from_iter(0_i32..50)
.flat_map_merge(8, |item| {
crate::Source::from_iter([item, item + 1, item + 2])
})
.run_with(crate::Sink::fold(0i64, |acc, v| acc + v as i64))
.unwrap()
.wait()
});
assert_eq!(result, Ok(3825), "lost-wakeup stress: wrong sum");
}
}
#[test]
fn ready_ring_concurrent_streams_lost_wakeup_stress() {
const STREAMS: usize = 32;
const ROUNDS: usize = 8;
const EXPECTED: i64 = 998_080;
for _ in 0..ROUNDS {
let barrier = Arc::new(std::sync::Barrier::new(STREAMS));
let mut handles = Vec::with_capacity(STREAMS);
for _ in 0..STREAMS {
let barrier = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
barrier.wait();
let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
crate::Source::from_iter(0_i64..32)
.flat_map_merge(8, |item| {
crate::Source::from_iter(item * 100..item * 100 + 20)
})
.run_with(crate::Sink::fold(0i64, |acc, v| acc + v))
.unwrap()
.wait()
});
assert_eq!(result, Ok(EXPECTED), "concurrent ready-ring sum");
}));
}
for handle in handles {
handle.join().expect("ready-ring stress worker panicked");
}
}
}
#[test]
fn ready_ring_tail_loop_stress() {
for _ in 0..20 {
let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
crate::Source::from_iter(0_i64..100)
.flat_map_merge(16, |item| crate::Source::from_iter([item, item + 1000]))
.run_with(crate::Sink::fold(0i64, |acc, v| acc + v))
.unwrap()
.wait()
});
assert_eq!(result, Ok(109_900), "tail-loop stress: wrong sum");
}
}
#[test]
fn ready_ring_auto_mode_matches_ring() {
let make = || {
run_sorted(
crate::Source::from_iter(0_i32..10)
.flat_map_merge(4, |x| crate::Source::from_iter([x, x + 100])),
)
};
let auto = with_substream_mode(SubstreamExecutorMode::Auto, make);
let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
assert_eq!(auto, ring);
}
}
#[cfg(test)]
mod inline_micro_source_tests {
use super::*;
use crate::stream::source::test_source_with_inline_micro_hint;
use std::sync::mpsc;
fn run_sorted<T: Ord + Send + 'static>(source: crate::Source<T>) -> Vec<T> {
let mut v = source.run_collect().unwrap();
v.sort_unstable();
v
}
#[test]
fn inline_empty_upstream() {
let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, || {
run_sorted(crate::Source::<i32>::empty().flat_map_merge(4, crate::Source::single))
});
let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
run_sorted(crate::Source::<i32>::empty().flat_map_merge(4, crate::Source::single))
});
assert_eq!(legacy, ring);
assert!(ring.is_empty());
}
#[test]
fn inline_single_inner_source() {
let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, || {
run_sorted(
crate::Source::single(99_i32).flat_map_merge(4, |x| crate::Source::single(x * 2)),
)
});
let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
run_sorted(
crate::Source::single(99_i32).flat_map_merge(4, |x| crate::Source::single(x * 2)),
)
});
assert_eq!(legacy, ring);
assert_eq!(ring, vec![198]);
}
#[test]
fn inline_breadth_one_exact_order() {
let make = || {
crate::Source::from_iter(0_i32..6)
.flat_map_merge(1, |x| {
crate::Source::from_iter([x * 10, x * 10 + 1, x * 10 + 2, x * 10 + 3])
})
.run_collect()
.unwrap()
};
let mut legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
let mut ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
legacy.sort_unstable();
ring.sort_unstable();
assert_eq!(legacy, ring);
}
#[test]
fn inline_breadth_gt_input() {
let make = || {
run_sorted(
crate::Source::from_iter(0_i32..3)
.flat_map_merge(100, |x| crate::Source::from_iter([x, x + 1, x + 2, x + 3])),
)
};
let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
assert_eq!(legacy, ring);
assert_eq!(ring.len(), 12);
}
#[test]
fn inline_mixed_empty_single_four_item() {
let make = || {
run_sorted(
crate::Source::from_iter(0_i32..12).flat_map_merge(4, |x| match x % 3 {
0 => crate::Source::empty(),
1 => crate::Source::single(x),
_ => crate::Source::from_iter([x, x + 100, x + 200, x + 300]),
}),
)
};
let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
assert_eq!(legacy, ring);
}
#[test]
fn inline_2k_x4_b8_benchmark_shape() {
let make = || {
run_sorted(
crate::Source::from_iter(0_i32..2_000).flat_map_merge(8, |item| {
crate::Source::from_iter([item, item + 1, item + 2, item + 3])
}),
)
};
let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
assert_eq!(legacy, ring);
assert_eq!(ring.len(), 8_000);
}
#[test]
fn inline_lost_wakeup_stress() {
for _ in 0..20 {
let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
crate::Source::from_iter(0_i32..50)
.flat_map_merge(8, |item| {
crate::Source::from_iter([item, item + 1, item + 2, item + 3])
})
.run_with(crate::Sink::fold(0i64, |acc, v| acc + v as i64))
.unwrap()
.wait()
});
assert_eq!(result, Ok(5200), "lost-wakeup stress: wrong sum");
}
}
#[test]
fn inline_tail_loop_stress() {
for _ in 0..20 {
let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
crate::Source::from_iter(0_i64..100)
.flat_map_merge(16, |item| crate::Source::from_iter([item, item + 1000]))
.run_with(crate::Sink::fold(0i64, |acc, v| acc + v))
.unwrap()
.wait()
});
assert_eq!(result, Ok(109_900), "tail-loop stress: wrong sum");
}
}
#[test]
fn inline_large_source_uses_worker_fallback() {
let make = || {
run_sorted(crate::Source::from_iter(0_i32..4).flat_map_merge(2, |x| {
crate::Source::from_iter(0_i32..20).map(move |i| x * 100 + i)
}))
};
let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
assert_eq!(legacy, ring);
assert_eq!(ring.len(), 80);
}
#[test]
fn inline_inner_error_before_any_item() {
let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
crate::Source::from_iter(0_i32..4)
.flat_map_merge(2, |x| {
if x == 1 {
crate::Source::failed(StreamError::Failed("inline-err".into()))
} else {
crate::Source::from_iter([x, x + 1])
}
})
.run_collect()
});
assert_eq!(result, Err(StreamError::Failed("inline-err".into())));
}
#[test]
fn inline_inner_error_after_some_items() {
let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
crate::Source::single(0_i32)
.flat_map_merge(2, |_x| {
test_source_with_inline_micro_hint(
|| {
let mut count = 0;
Box::new(std::iter::from_fn(move || {
count += 1;
match count {
1 => Some(Ok(42_i32)),
2 => Some(Err(StreamError::Failed("after-items".into()))),
_ => None,
}
}))
},
1, )
})
.run_collect()
});
assert!(result.is_err());
}
#[test]
fn inline_worker_lane_fails_during_inline_drain() {
let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
crate::Source::from_iter(0_i32..10)
.flat_map_merge(2, |x| {
if x == 5 {
crate::Source::failed(StreamError::Failed("worker-fail".into()))
} else if x % 2 == 0 {
crate::Source::from_iter([x, x + 1, x + 2])
} else {
crate::Source::from_iter(0_i32..40).map(move |i| x * 100 + i)
}
})
.run_collect()
});
assert!(result.is_err());
}
#[test]
fn inline_cancellation_drop_output() {
let rt = crate::stream::runtime::Runtime::new();
let queue = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
crate::Source::from_iter(0_i32..100)
.flat_map_merge(8, |x| {
if x % 3 == 0 {
crate::Source::from_iter([x, x + 1, x + 2, x + 3]) } else {
crate::Source::repeat(x) }
})
.run_with_materializer(crate::Sink::queue(), &rt)
.unwrap()
});
for _ in 0..16 {
let _ = queue.pull();
}
drop(queue);
rt.shutdown();
}
#[test]
fn inline_next_not_under_coordinator_lock() {
let (tx, rx) = mpsc::channel::<()>();
let tx = Arc::new(std::sync::Mutex::new(tx));
let tx2 = Arc::clone(&tx);
let results = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
crate::Source::from_iter(0_i32..2)
.flat_map_merge(2, move |x| {
let sig = Arc::clone(&tx2);
if x == 0 {
crate::Source::from_iter(0_i32..40)
} else {
test_source_with_inline_micro_hint(
move || {
let sig = Arc::clone(&sig);
let mut emitted = false;
Box::new(std::iter::from_fn(move || {
if !emitted {
emitted = true;
let _ = sig.lock().unwrap().send(());
Some(Ok(999_i32))
} else {
None
}
}))
},
1,
)
}
})
.run_collect()
.unwrap()
});
let signal = rx.recv_timeout(std::time::Duration::from_secs(5));
assert!(signal.is_ok(), "inline next() never ran");
assert!(results.contains(&999));
assert_eq!(results.len(), 41);
}
#[test]
fn inline_auto_matches_readyring() {
let make = || {
run_sorted(
crate::Source::from_iter(0_i32..20)
.flat_map_merge(4, |x| crate::Source::from_iter([x, x + 1, x + 2, x + 3])),
)
};
let auto = with_substream_mode(SubstreamExecutorMode::Auto, make);
let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
assert_eq!(auto, ring);
}
}
#[cfg(test)]
mod split_sink_fast_path_tests {
use super::*;
fn run_split_fold(
items: Vec<u64>,
split_mode: SplitMode,
executor_mode: SubstreamExecutorMode,
) -> Vec<u64> {
with_substream_mode(executor_mode, || {
let source = match split_mode {
SplitMode::When => crate::Source::from_iter(items).split_when(|x| x % 10 == 0),
SplitMode::After => crate::Source::from_iter(items).split_after(|x| x % 10 == 0),
};
source
.run_with(crate::Sink::fold(
Vec::new(),
|mut acc, seg: crate::Source<u64>| {
let sum = seg
.run_with(crate::Sink::fold(0u64, |a, x| a + x))
.unwrap()
.wait()
.unwrap();
acc.push(sum);
acc
},
))
.unwrap()
.wait()
.unwrap()
})
}
fn run_split_collect_segments(
items: Vec<u64>,
split_mode: SplitMode,
executor_mode: SubstreamExecutorMode,
) -> Vec<Vec<u64>> {
with_substream_mode(executor_mode, || {
let source = match split_mode {
SplitMode::When => crate::Source::from_iter(items).split_when(move |x| x % 10 == 0),
SplitMode::After => {
crate::Source::from_iter(items).split_after(move |x| x % 10 == 0)
}
};
source
.run_with(crate::Sink::fold(
Vec::new(),
|mut acc, seg: crate::Source<u64>| {
let v = seg
.run_with(crate::Sink::collect())
.unwrap()
.wait()
.unwrap();
acc.push(v);
acc
},
))
.unwrap()
.wait()
.unwrap()
})
}
#[test]
fn split_fast_equivalence_empty_input() {
for sm in [SplitMode::When, SplitMode::After] {
let legacy = run_split_collect_segments(vec![], sm, SubstreamExecutorMode::LegacyOnly);
let fast = run_split_collect_segments(vec![], sm, SubstreamExecutorMode::SplitSinkOnly);
assert_eq!(legacy, fast, "empty input, mode {sm:?}");
}
}
#[test]
fn split_fast_equivalence_no_boundaries() {
let items: Vec<u64> = (1..=9).collect();
for sm in [SplitMode::When, SplitMode::After] {
let legacy =
run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
let fast =
run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
assert_eq!(legacy, fast, "no boundaries, mode {sm:?}");
}
}
#[test]
fn split_fast_equivalence_first_element_boundary_when() {
let items: Vec<u64> = vec![10, 1, 2, 3];
let legacy = run_split_collect_segments(
items.clone(),
SplitMode::When,
SubstreamExecutorMode::LegacyOnly,
);
let fast = run_split_collect_segments(
items,
SplitMode::When,
SubstreamExecutorMode::SplitSinkOnly,
);
assert_eq!(legacy, fast);
}
#[test]
fn split_fast_equivalence_first_element_boundary_after() {
let items: Vec<u64> = vec![10, 1, 2, 3];
let legacy = run_split_collect_segments(
items.clone(),
SplitMode::After,
SubstreamExecutorMode::LegacyOnly,
);
let fast = run_split_collect_segments(
items,
SplitMode::After,
SubstreamExecutorMode::SplitSinkOnly,
);
assert_eq!(legacy, fast);
}
#[test]
fn split_fast_equivalence_consecutive_matches() {
let items: Vec<u64> = vec![10, 20, 30, 1, 2, 40];
for sm in [SplitMode::When, SplitMode::After] {
let legacy =
run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
let fast =
run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
assert_eq!(legacy, fast, "consecutive matches, mode {sm:?}");
}
}
#[test]
fn split_fast_equivalence_last_element_boundary() {
let items: Vec<u64> = vec![1, 2, 3, 10];
for sm in [SplitMode::When, SplitMode::After] {
let legacy =
run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
let fast =
run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
assert_eq!(legacy, fast, "last element boundary, mode {sm:?}");
}
}
#[test]
fn split_fast_equivalence_mixed() {
let items: Vec<u64> = (0..50u64).collect();
for sm in [SplitMode::When, SplitMode::After] {
let legacy =
run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
let fast =
run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
assert_eq!(legacy, fast, "mixed 0..50, mode {sm:?}");
}
}
#[test]
fn split_fast_equivalence_fold_sums() {
let items: Vec<u64> = (0..50u64).collect();
for sm in [SplitMode::When, SplitMode::After] {
let legacy = run_split_fold(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
let fast = run_split_fold(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
assert_eq!(legacy, fast, "fold sums, mode {sm:?}");
}
}
#[test]
fn split_fast_equivalence_with_collect() {
let items: Vec<u64> = (0..312u64).collect();
for sm in [SplitMode::When, SplitMode::After] {
let legacy =
run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
let fast =
run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
assert_eq!(legacy, fast, "collect 312 items, mode {sm:?}");
}
}
#[test]
fn split_fast_fold_result_equivalence() {
let items: Vec<u64> = (0..50u64).collect();
let run = |executor_mode| {
with_substream_mode(executor_mode, || {
crate::Source::from_iter(items.clone())
.split_when(move |x| x % 10 == 0)
.run_with(crate::Sink::fold(
Vec::new(),
|mut acc, seg: crate::Source<u64>| {
let sum = seg
.run_with(crate::Sink::fold_result(0u64, |a, x| Ok(a + x)))
.unwrap()
.wait()
.unwrap();
acc.push(sum);
acc
},
))
.unwrap()
.wait()
.unwrap()
})
};
assert_eq!(
run(SubstreamExecutorMode::LegacyOnly),
run(SubstreamExecutorMode::SplitSinkOnly)
);
}
#[test]
fn split_fast_ignore_equivalence() {
let items: Vec<u64> = (0..50u64).collect();
let run = |executor_mode| {
with_substream_mode(executor_mode, || {
crate::Source::from_iter(items.clone())
.split_when(move |x| x % 10 == 0)
.run_with(crate::Sink::fold(0u64, |count, seg: crate::Source<u64>| {
seg.run_with(crate::Sink::ignore()).unwrap().wait().unwrap();
count + 1
}))
.unwrap()
.wait()
.unwrap()
})
};
let legacy = run(SubstreamExecutorMode::LegacyOnly);
let fast = run(SubstreamExecutorMode::SplitSinkOnly);
assert_eq!(legacy, fast, "ignore segment counts must match");
}
#[test]
fn split_fast_one_shot_cannot_materialize_twice() {
with_substream_mode(SubstreamExecutorMode::SplitSinkOnly, || {
let materializer = crate::Runtime::default();
let result = crate::Source::from_iter(1u64..=5)
.split_when(|x| x % 3 == 0)
.run_with(crate::Sink::fold(0u64, |_, seg: crate::Source<u64>| {
let c1 = seg.clone().run_with(crate::Sink::fold(0u64, |a, x| a + x));
let c2 = seg.run_with(crate::Sink::fold(0u64, |a, x| a + x));
assert!(c1.is_ok(), "first materialization should succeed");
assert!(c2.is_err(), "second materialization should fail: {c2:?}");
let _ = c1.unwrap().wait();
0u64
}));
let _ = result;
let _ = &materializer;
});
}
#[test]
fn split_fast_predicate_panic_both_modes() {
for sm in [SplitMode::When, SplitMode::After] {
let result = with_substream_mode(SubstreamExecutorMode::SplitSinkOnly, || {
let source = match sm {
SplitMode::When => crate::Source::from_iter(0u64..10).split_when(|x| {
if *x == 5 {
panic!("test panic");
}
x % 3 == 0
}),
SplitMode::After => crate::Source::from_iter(0u64..10).split_after(|x| {
if *x == 5 {
panic!("test panic");
}
x % 3 == 0
}),
};
source
.run_with(crate::Sink::fold(
Vec::<u64>::new(),
|mut acc, seg: crate::Source<u64>| {
let completion = seg.run_with(crate::Sink::ignore());
if let Ok(c) = completion {
let _ = c.wait();
}
acc.push(0u64);
acc
},
))
.map(|c| c.wait())
});
let _ = result;
}
}
#[test]
fn split_fast_stress_20x() {
for i in 0..20 {
let items: Vec<u64> = (0..10_000u64).collect();
for sm in [SplitMode::When, SplitMode::After] {
let fast = run_split_collect_segments(
items.clone(),
sm,
SubstreamExecutorMode::SplitSinkOnly,
);
let legacy = run_split_collect_segments(
items.clone(),
sm,
SubstreamExecutorMode::LegacyOnly,
);
assert_eq!(
fast.len(),
legacy.len(),
"stress run {i} segment count mismatch, mode {sm:?}"
);
assert_eq!(
fast.iter().flatten().sum::<u64>(),
legacy.iter().flatten().sum::<u64>(),
"stress run {i} sum mismatch, mode {sm:?}"
);
}
}
}
#[test]
fn split_fast_auto_mode_matches_fast() {
let items: Vec<u64> = (0..50u64).collect();
for sm in [SplitMode::When, SplitMode::After] {
let auto = run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::Auto);
let fast =
run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
assert_eq!(auto, fast, "auto == fast, mode {sm:?}");
}
}
#[test]
fn split_fast_fallback_path_via_foreach() {
use std::sync::atomic::{AtomicU64, Ordering as Ord};
let total = Arc::new(AtomicU64::new(0));
let total2 = Arc::clone(&total);
let result = with_substream_mode(SubstreamExecutorMode::SplitSinkOnly, || {
crate::Source::from_iter(0u64..30)
.split_when(|x| x % 10 == 0)
.run_with(crate::Sink::fold(
Vec::new(),
move |mut acc, seg: crate::Source<u64>| {
let t = Arc::clone(&total2);
seg.run_with(crate::Sink::foreach(move |x| {
t.fetch_add(x, Ord::SeqCst);
}))
.unwrap()
.wait()
.unwrap();
acc.push(1u64);
acc
},
))
.unwrap()
.wait()
.unwrap()
});
assert_eq!(result.len(), 3, "should have 3 segments");
assert_eq!(
total.load(std::sync::atomic::Ordering::SeqCst),
(0..30u64).sum::<u64>()
);
}
#[test]
fn split_fast_liveness_segment_count_when() {
let items: Vec<u64> = (0..30u64).collect();
let fast = run_split_collect_segments(
items.clone(),
SplitMode::When,
SubstreamExecutorMode::SplitSinkOnly,
);
let legacy =
run_split_collect_segments(items, SplitMode::When, SubstreamExecutorMode::LegacyOnly);
assert_eq!(fast.len(), legacy.len());
}
#[test]
fn split_fast_liveness_segment_count_after() {
let items: Vec<u64> = (0..30u64).collect();
let fast = run_split_collect_segments(
items.clone(),
SplitMode::After,
SubstreamExecutorMode::SplitSinkOnly,
);
let legacy =
run_split_collect_segments(items, SplitMode::After, SubstreamExecutorMode::LegacyOnly);
assert_eq!(fast.len(), legacy.len());
}
#[test]
fn split_fast_bounded_memory_rendezvous() {
use std::sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
mpsc,
};
use std::time::{Duration, Instant};
const CAPACITY: usize = LIVE_SUBSTREAM_CAPACITY;
const BATCH: usize = LIVE_SUBSTREAM_BATCH;
const TOTAL: usize = CAPACITY * 2;
const MAX_IN_FLIGHT: usize = CAPACITY + BATCH;
let produced = Arc::new(AtomicUsize::new(0));
let consumed = Arc::new(AtomicUsize::new(0));
let bound_violated = Arc::new(AtomicBool::new(false));
let (item_tx, item_rx) = mpsc::channel::<u64>();
let prod_for_factory = Arc::clone(&produced);
let prod_for_fold = Arc::clone(&produced);
let cons_for_fold = Arc::clone(&consumed);
let bv_for_fold = Arc::clone(&bound_violated);
let join = std::thread::spawn(move || {
with_substream_mode(SubstreamExecutorMode::SplitSinkOnly, || {
crate::Source::from_factory(move || {
let prod = Arc::clone(&prod_for_factory);
let mut i = 0u64;
Box::new(std::iter::from_fn(move || {
if i as usize >= TOTAL {
return None;
}
prod.fetch_add(1, Ordering::SeqCst);
let val = i;
i += 1;
Some(Ok(val))
}))
})
.split_when(|_| false)
.run_with(crate::Sink::fold(
0usize,
move |count, seg: crate::Source<u64>| {
let cons = Arc::clone(&cons_for_fold);
let bv = Arc::clone(&bv_for_fold);
let prod = Arc::clone(&prod_for_fold);
let itx = item_tx.clone();
seg.run_with(crate::Sink::foreach(move |x: u64| {
let c = cons.fetch_add(1, Ordering::SeqCst) + 1;
let p = prod.load(Ordering::SeqCst);
if p > c + MAX_IN_FLIGHT {
bv.store(true, Ordering::SeqCst);
}
let _ = itx.send(x);
}))
.unwrap()
.wait()
.unwrap();
count + 1
},
))
.unwrap()
.wait()
.unwrap()
})
});
let mut received = Vec::with_capacity(TOTAL);
let timeout = Duration::from_secs(60);
let deadline = Instant::now() + timeout;
for i in 0..TOTAL {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining == Duration::ZERO {
panic!(
"deadlock: received {} of {TOTAL} items within {timeout:?}",
received.len()
);
}
match item_rx.recv_timeout(remaining) {
Ok(item) => received.push(item),
Err(mpsc::RecvTimeoutError::Timeout) => {
panic!(
"deadlock: no item {i} before {timeout:?} rendezvous deadline; received {} of {TOTAL}",
received.len()
)
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
panic!("stream ended early at item {i}")
}
}
}
let seg_count = join.join().expect("stream thread panicked");
assert!(
!bound_violated.load(Ordering::SeqCst),
"bound violated: producer ran >MAX_IN_FLIGHT={MAX_IN_FLIGHT} ahead of consumer"
);
assert_eq!(seg_count, 1, "expected exactly 1 segment");
assert_eq!(received.len(), TOTAL, "not all items received");
let expected: Vec<u64> = (0..TOTAL as u64).collect();
assert_eq!(received, expected, "items not in correct order");
}
}