use super::flow::{self, FlowTransform};
use super::*;
use crate::Attributes;
use crate::context::SourceWithContext;
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
pub struct NotUsed;
type CombinedSourceFactory<Out> =
dyn Fn(&Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync;
pub struct Keep;
impl Keep {
pub fn left<Left, Right>(left: Left, _right: Right) -> Left {
left
}
pub fn right<Left, Right>(_left: Left, right: Right) -> Right {
right
}
pub fn both<Left, Right>(left: Left, right: Right) -> (Left, Right) {
(left, right)
}
pub fn none<Left, Right>(_left: Left, _right: Right) -> NotUsed {
NotUsed
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SourceCombineStrategy {
Merge {
eager_complete: bool,
},
Concat,
Prioritized {
priorities: Vec<usize>,
eager_complete: bool,
},
}
#[derive(Clone)]
pub struct MaybeHandle<T> {
value: Arc<Mutex<Option<StreamResult<T>>>>,
}
impl<T> MaybeHandle<T> {
#[must_use]
pub fn is_completed(&self) -> bool {
self.value.lock().expect("maybe handle poisoned").is_some()
}
pub fn complete(&self, item: T) -> StreamResult<()> {
self.settle(Ok(item))
}
pub fn fail(&self, error: StreamError) -> StreamResult<()> {
self.settle(Err(error))
}
fn settle(&self, result: StreamResult<T>) -> StreamResult<()> {
let mut value = self.value.lock().expect("maybe handle poisoned");
if value.is_some() {
return Err(StreamError::Failed("maybe source already completed".into()));
}
*value = Some(result);
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Demand(u64);
impl Demand {
pub const ZERO: Self = Self(0);
pub const ONE: Self = Self(1);
pub const MAX: Self = Self(u64::MAX);
#[must_use]
pub const fn new(available: u64) -> Self {
Self(available)
}
#[must_use]
pub const fn available(self) -> u64 {
self.0
}
#[must_use]
pub const fn is_unbounded(self) -> bool {
self.0 == u64::MAX
}
#[must_use]
pub const fn is_empty(self) -> bool {
self.0 == 0
}
#[must_use]
pub const fn saturating_add(self, rhs: Self) -> Self {
Self(self.0.saturating_add(rhs.0))
}
pub fn consume_one(&mut self) -> bool {
match self.0 {
0 => false,
u64::MAX => true,
_ => {
self.0 -= 1;
true
}
}
}
}
pub trait PushOutlet<T>: Send {
fn push(&mut self, item: T) -> StreamResult<Demand>;
fn complete(&mut self) -> StreamResult<()> {
Ok(())
}
fn fail(&mut self, cause: StreamError) -> StreamResult<()> {
Err(cause)
}
}
#[derive(Clone)]
pub struct Source<Out, Mat = NotUsed> {
pub(crate) factory: Arc<dyn SourceFactory<Out, Mat>>,
pub(super) hints: SourceHints,
pub(super) attributes: Attributes,
pub(crate) split_hook: Option<Arc<dyn SplitSegmentHookDyn>>,
}
impl<Out: Send + 'static> Source<Out, NotUsed> {
pub(super) fn from_factory<F>(factory: F) -> Self
where
F: Fn() -> BoxStream<Out> + Send + Sync + 'static,
{
Self::from_factory_with_hints(factory, SourceHints::default())
}
fn from_factory_with_hints<F>(factory: F, hints: SourceHints) -> Self
where
F: Fn() -> BoxStream<Out> + Send + Sync + 'static,
{
Self::from_materialized_factory_with_hints(
move |_materializer| Ok((factory(), NotUsed)),
hints,
)
}
#[must_use]
pub fn empty() -> Self {
Self::from_factory_with_hints(
|| Box::new(std::iter::empty()),
SourceHints::with_inline_micro(0),
)
}
#[must_use]
pub fn never() -> Self {
Self::from_materialized_factory_with_hints(
move |materializer| {
let state = Arc::clone(&materializer.inner.state);
Ok((
Box::new(std::iter::from_fn(move || {
loop {
if state.shutdown.load(Ordering::SeqCst) {
return Some(Err(StreamError::AbruptTermination));
}
if super::runtime::current_stream_cancelled()
.as_ref()
.is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
{
return Some(Err(StreamError::Cancelled));
}
std::thread::park_timeout(Duration::from_millis(1));
}
})),
NotUsed,
))
},
SourceHints::default(),
)
}
#[must_use]
pub fn failed(error: StreamError) -> Self {
Self::from_factory_with_hints(
move || Box::new(std::iter::once(Err(error.clone()))),
SourceHints::with_inline_micro(0),
)
}
#[must_use]
pub fn future<F, Fut>(future: F) -> Self
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Out>> + Send + 'static,
{
let future = Arc::new(future);
Self::from_factory(move || {
let future = Arc::clone(&future);
let mut emitted = false;
Box::new(std::iter::from_fn(move || {
if emitted {
return None;
}
emitted = true;
Some(
catch_unwind_failed("source future factory", || future())
.and_then(flow::run_future_inline_or_spawn),
)
}))
})
}
#[must_use]
pub fn future_source<F, Fut>(future: F) -> Self
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Source<Out>>> + Send + 'static,
{
let future = Arc::new(future);
Self::from_materialized_factory(move |materializer| {
let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
let future = Arc::clone(&future);
let mut current = None::<BoxStream<Out>>;
let mut initialized = false;
let mut terminated = false;
Ok((
Box::new(std::iter::from_fn(move || {
if terminated {
return None;
}
loop {
if let Some(stream) = current.as_mut() {
match stream.next() {
Some(item) => return Some(item),
None => {
terminated = true;
return None;
}
}
}
if initialized {
terminated = true;
return None;
}
initialized = true;
let source = match catch_unwind_failed("future_source factory", || future())
.and_then(flow::run_future_inline_or_spawn)
{
Ok(source) => source,
Err(error) => {
terminated = true;
return Some(Err(error));
}
};
current = Some(match Arc::clone(&source.factory).create(&materializer) {
Ok((stream, _)) => stream,
Err(error) => {
terminated = true;
return Some(Err(error));
}
});
}
})) as BoxStream<Out>,
NotUsed,
))
})
}
#[must_use]
pub fn cycle<F, I>(factory: F) -> Self
where
F: Fn() -> I + Send + Sync + 'static,
I: IntoIterator<Item = Out>,
I::IntoIter: Send + 'static,
{
let factory = Arc::new(factory);
Self::from_factory(move || {
let factory = Arc::clone(&factory);
let mut current = None::<I::IntoIter>;
let mut terminated = false;
Box::new(std::iter::from_fn(move || {
if terminated {
return None;
}
if let Some(iter) = current.as_mut()
&& let Some(item) = iter.next()
{
return Some(Ok(item));
}
let mut next = match catch_unwind_failed("cycle factory", || factory()) {
Ok(iterable) => iterable.into_iter(),
Err(error) => {
terminated = true;
return Some(Err(error));
}
};
match next.next() {
Some(item) => {
current = Some(next);
Some(Ok(item))
}
None => {
terminated = true;
Some(Err(StreamError::Failed("empty iterator".into())))
}
}
}))
})
}
#[must_use]
pub fn unfold<State, F>(initial: State, f: F) -> Self
where
State: Clone + Send + Sync + 'static,
F: Fn(State) -> Option<(State, Out)> + Send + Sync + 'static,
{
let f = Arc::new(f);
Self::from_factory(move || {
let f = Arc::clone(&f);
let mut state = Some(initial.clone());
let mut terminated = false;
Box::new(std::iter::from_fn(move || {
if terminated {
return None;
}
let current = state.take().expect("unfold state present");
match catch_unwind_failed("unfold function", || f(current)) {
Ok(Some((next_state, item))) => {
state = Some(next_state);
Some(Ok(item))
}
Ok(None) => {
terminated = true;
None
}
Err(error) => {
terminated = true;
Some(Err(error))
}
}
}))
})
}
#[must_use]
pub fn unfold_async<State, F, Fut>(initial: State, f: F) -> Self
where
State: Clone + Send + Sync + 'static,
F: Fn(State) -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Option<(State, Out)>>> + Send + 'static,
{
let f = Arc::new(f);
Self::from_factory(move || {
let f = Arc::clone(&f);
let mut state = Some(initial.clone());
let mut terminated = false;
Box::new(std::iter::from_fn(move || {
if terminated {
return None;
}
let current = state.take().expect("unfold_async state present");
match catch_unwind_failed("unfold_async factory", || f(current))
.and_then(flow::run_future_inline_or_spawn)
{
Ok(Some((next_state, item))) => {
state = Some(next_state);
Some(Ok(item))
}
Ok(None) => {
terminated = true;
None
}
Err(error) => {
terminated = true;
Some(Err(error))
}
}
}))
})
}
#[must_use]
pub fn unfold_resource<Resource, Create, Read, Close>(
create: Create,
read: Read,
close: Close,
) -> Self
where
Resource: Send + 'static,
Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
Read: Fn(&mut Resource) -> StreamResult<Option<Out>> + Send + Sync + 'static,
Close: Fn(Resource) -> StreamResult<()> + Send + Sync + 'static,
{
let create = Arc::new(create);
let read = Arc::new(read);
let close = Arc::new(close);
Self::from_factory(move || {
Box::new(UnfoldResourceStream {
create: Arc::clone(&create),
read: Arc::clone(&read),
close: Arc::clone(&close),
resource: None,
created: false,
terminated: false,
_marker: PhantomData,
})
})
}
#[must_use]
pub fn unfold_resource_async<Resource, Create, CreateFut, Read, ReadFut, Close, CloseFut>(
create: Create,
read: Read,
close: Close,
) -> Self
where
Resource: Send + 'static,
Create: Fn() -> CreateFut + Send + Sync + 'static,
CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
Read: Fn(&mut Resource) -> ReadFut + Send + Sync + 'static,
ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
Close: Fn(Resource) -> CloseFut + Send + Sync + 'static,
CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
{
let create = Arc::new(create);
let read = Arc::new(read);
let close = Arc::new(close);
Self::from_factory(move || {
Box::new(UnfoldResourceAsyncStream {
create: Arc::clone(&create),
read: Arc::clone(&read),
close: Arc::clone(&close),
resource: None,
created: false,
terminated: false,
_marker: PhantomData,
})
})
}
#[must_use]
pub fn lazy_single<F>(create: F) -> Self
where
F: Fn() -> Out + Send + Sync + 'static,
{
let create = Arc::new(create);
Self::from_factory(move || {
let create = Arc::clone(&create);
let mut emitted = false;
Box::new(std::iter::from_fn(move || {
if emitted {
return None;
}
emitted = true;
Some(catch_unwind_failed("lazy_single factory", || create()))
}))
})
}
#[must_use]
pub fn lazy_future<F, Fut>(create: F) -> Self
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Out>> + Send + 'static,
{
let create = Arc::new(create);
Self::from_factory(move || {
let create = Arc::clone(&create);
let mut emitted = false;
Box::new(std::iter::from_fn(move || {
if emitted {
return None;
}
emitted = true;
Some(
catch_unwind_failed("lazy_future factory", || create())
.and_then(flow::run_future_inline_or_spawn),
)
}))
})
}
#[must_use]
pub fn lazy_source<InnerMat, F>(create: F) -> Source<Out, StreamCompletion<InnerMat>>
where
InnerMat: Send + 'static,
F: Fn() -> Source<Out, InnerMat> + Send + Sync + 'static,
{
let create = Arc::new(create);
Source::from_materialized_factory(move |materializer| {
let (sender, receiver) = oneshot::channel();
Ok((
Box::new(LazySourceStream {
create: Arc::clone(&create),
materializer: materializer
.with_name_prefix(materializer.name_prefix().to_owned()),
current: None,
mat_sender: Some(sender),
initialized: false,
terminated: false,
}) as BoxStream<Out>,
StreamCompletion::from_receiver(receiver, None),
))
})
}
#[must_use]
pub fn lazy_future_source<InnerMat, F, Fut>(
create: F,
) -> Source<Out, StreamCompletion<InnerMat>>
where
InnerMat: Send + 'static,
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
{
let create = Arc::new(create);
Source::from_materialized_factory(move |materializer| {
let (sender, receiver) = oneshot::channel();
Ok((
Box::new(LazyFutureSourceStream {
create: Arc::clone(&create),
materializer: materializer
.with_name_prefix(materializer.name_prefix().to_owned()),
current: None,
mat_sender: Some(sender),
initialized: false,
terminated: false,
_marker: PhantomData,
}) as BoxStream<Out>,
StreamCompletion::from_receiver(receiver, None),
))
})
}
#[must_use]
pub fn from_fn_iter<F, I>(factory: F) -> Self
where
F: Fn() -> I + Send + Sync + 'static,
I: IntoIterator<Item = Out>,
I::IntoIter: Send + 'static,
{
Self::from_factory(move || Box::new(factory().into_iter().map(Ok)))
}
}
impl<Out: Send + 'static, Mat: Send + 'static> Source<Out, Mat> {
fn from_materialized_factory_with_hints<F>(factory: F, hints: SourceHints) -> Self
where
F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
{
Self {
factory: Arc::new(FnSourceFactory(factory)),
hints,
attributes: Attributes::default(),
split_hook: None,
}
}
pub(crate) fn from_materialized_factory<F>(factory: F) -> Self
where
F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
{
Self::from_materialized_factory_with_hints(factory, SourceHints::default())
}
pub(crate) fn from_terminal_batch_materialized_factory<F>(factory: F) -> Self
where
F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
{
Self::from_materialized_factory_with_hints(
factory,
SourceHints::with_terminal_consumer_batch(),
)
}
#[must_use]
pub fn as_source_with_context<Ctx, F>(
self,
extract_context: F,
) -> SourceWithContext<Out, Ctx, Mat>
where
Ctx: Send + 'static,
F: Fn(&Out) -> Ctx + Send + Sync + 'static,
{
SourceWithContext::from_source(self.map(move |item| {
let context = extract_context(&item);
(item, context)
}))
}
#[must_use]
pub fn via<Next, FlowMat>(self, flow: Flow<Out, Next, FlowMat>) -> Source<Next, Mat>
where
Next: Send + 'static,
FlowMat: Send + 'static,
{
self.via_mat(flow, Keep::left)
}
#[must_use]
pub fn via_mat<Next, FlowMat, Combined, F>(
self,
flow: Flow<Out, Next, FlowMat>,
combine: F,
) -> Source<Next, Combined>
where
Next: Send + 'static,
FlowMat: Send + 'static,
Combined: Send + 'static,
F: Fn(Mat, FlowMat) -> Combined + Send + Sync + 'static,
{
let source = self.factory;
let transform = flow.transform;
let materialize_flow = flow.materialize;
let hints = self.hints.after_flow(flow.hints);
let combine = Arc::new(combine);
Source::from_materialized_factory_with_hints(
move |materializer| {
let (stream, source_mat) = Arc::clone(&source).create(materializer)?;
let flow_mat = materialize_flow()?;
let stream = match &transform {
FlowTransform::Pure(transform) => transform(stream),
FlowTransform::Runtime(transform) => transform(stream, materializer)?,
};
Ok((stream, combine(source_mat, flow_mat)))
},
hints,
)
}
#[must_use]
pub fn via_mat_with<Next, FlowMat, Combined, F>(
self,
flow: Flow<Out, Next, FlowMat>,
combine: F,
) -> Source<Next, Combined>
where
Next: Send + 'static,
FlowMat: Send + 'static,
Combined: Send + 'static,
F: Fn(Mat, FlowMat) -> Combined + Send + Sync + 'static,
{
self.via_mat(flow, combine)
}
#[must_use]
pub fn map<Next, F>(self, f: F) -> Source<Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> Next + Send + Sync + 'static,
{
let hints = self.hints.without_inline_micro();
Source {
factory: Arc::new(MapSourceFactory {
source: self.factory,
stage: f,
_marker: PhantomData,
}),
hints,
attributes: self.attributes,
split_hook: None,
}
}
#[must_use]
pub fn attributes(&self) -> &Attributes {
&self.attributes
}
#[must_use]
pub fn with_attributes(mut self, attributes: Attributes) -> Self {
self.attributes = attributes;
self
}
#[must_use]
pub fn add_attributes(mut self, attributes: Attributes) -> Self {
self.attributes = self.attributes.and(attributes);
self
}
#[must_use]
pub fn named(self, name: impl Into<String>) -> Self {
self.add_attributes(Attributes::named(name))
}
#[must_use]
pub fn map_result<Next, F>(self, f: F) -> Source<Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
{
self.via(Flow::identity().map_result(f))
}
#[must_use]
pub fn map_result_with_supervision<Next, F>(
self,
f: F,
decider: SupervisionDecider,
) -> Source<Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
{
self.via(Flow::identity().map_result_with_supervision(f, decider))
}
#[must_use]
pub fn filter<F>(self, predicate: F) -> Source<Out, Mat>
where
F: Fn(&Out) -> bool + Send + Sync + 'static,
{
self.via(Flow::identity().filter(predicate))
}
#[must_use]
pub fn filter_result<F>(self, predicate: F) -> Source<Out, Mat>
where
F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
{
self.via(Flow::identity().filter_result(predicate))
}
#[must_use]
pub fn filter_result_with_supervision<F>(
self,
predicate: F,
decider: SupervisionDecider,
) -> Source<Out, Mat>
where
F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
{
self.via(Flow::identity().filter_result_with_supervision(predicate, decider))
}
#[must_use]
pub fn filter_not<F>(self, predicate: F) -> Source<Out, Mat>
where
F: Fn(&Out) -> bool + Send + Sync + 'static,
{
self.via(Flow::identity().filter_not(predicate))
}
#[must_use]
pub fn filter_map<Next, F>(self, f: F) -> Source<Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> Option<Next> + Send + Sync + 'static,
{
self.via(Flow::identity().filter_map(f))
}
#[must_use]
pub fn filter_map_result<Next, F>(self, f: F) -> Source<Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
{
self.via(Flow::identity().filter_map_result(f))
}
#[must_use]
pub fn filter_map_result_with_supervision<Next, F>(
self,
f: F,
decider: SupervisionDecider,
) -> Source<Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
{
self.via(Flow::identity().filter_map_result_with_supervision(f, decider))
}
#[must_use]
pub fn map_concat<Next, F, I>(self, f: F) -> Source<Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> I + Send + Sync + 'static,
I: IntoIterator<Item = Next>,
I::IntoIter: Send + 'static,
{
self.via(Flow::identity().map_concat(f))
}
#[must_use]
pub fn map_concat_result<Next, F, I>(self, f: F) -> Source<Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
I: IntoIterator<Item = Next>,
I::IntoIter: Send + 'static,
{
self.via(Flow::identity().map_concat_result(f))
}
#[must_use]
pub fn map_concat_result_with_supervision<Next, F, I>(
self,
f: F,
decider: SupervisionDecider,
) -> Source<Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
I: IntoIterator<Item = Next>,
I::IntoIter: Send + 'static,
{
self.via(Flow::identity().map_concat_result_with_supervision(f, decider))
}
#[must_use]
pub fn stateful_map<State, Next, F>(self, seed: State, f: F) -> Source<Next, Mat>
where
State: Clone + Send + Sync + 'static,
Next: Send + 'static,
F: Fn(&mut State, Out) -> Next + Send + Sync + 'static,
{
self.via(Flow::identity().stateful_map(seed, f))
}
#[must_use]
pub fn stateful_map_result<State, Next, F>(self, seed: State, f: F) -> Source<Next, Mat>
where
State: Clone + Send + Sync + 'static,
Next: Send + 'static,
F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
{
self.via(Flow::identity().stateful_map_result(seed, f))
}
#[must_use]
pub fn stateful_map_result_with_supervision<State, Next, F>(
self,
seed: State,
f: F,
decider: SupervisionDecider,
) -> Source<Next, Mat>
where
State: Clone + Send + Sync + 'static,
Next: Send + 'static,
F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
{
self.via(Flow::identity().stateful_map_result_with_supervision(seed, f, decider))
}
#[must_use]
pub fn stateful_map_concat<State, Next, F, I>(self, seed: State, f: F) -> Source<Next, Mat>
where
State: Clone + Send + Sync + 'static,
Next: Send + 'static,
F: Fn(&mut State, Out) -> I + Send + Sync + 'static,
I: IntoIterator<Item = Next>,
I::IntoIter: Send + 'static,
{
self.via(Flow::identity().stateful_map_concat(seed, f))
}
#[must_use]
pub fn stateful_map_concat_result<State, Next, F, I>(
self,
seed: State,
f: F,
) -> Source<Next, Mat>
where
State: Clone + Send + Sync + 'static,
Next: Send + 'static,
F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
I: IntoIterator<Item = Next>,
I::IntoIter: Send + 'static,
{
self.via(Flow::identity().stateful_map_concat_result(seed, f))
}
#[must_use]
pub fn stateful_map_concat_result_with_supervision<State, Next, F, I>(
self,
seed: State,
f: F,
decider: SupervisionDecider,
) -> Source<Next, Mat>
where
State: Clone + Send + Sync + 'static,
Next: Send + 'static,
F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
I: IntoIterator<Item = Next>,
I::IntoIter: Send + 'static,
{
self.via(Flow::identity().stateful_map_concat_result_with_supervision(seed, f, decider))
}
#[must_use]
pub fn map_async<Next, F, Fut>(self, parallelism: usize, f: F) -> Source<Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Next>> + Send + 'static,
{
self.via(Flow::identity().map_async(parallelism, f))
}
#[must_use]
pub fn map_async_with_supervision<Next, F, Fut>(
self,
parallelism: usize,
f: F,
decider: SupervisionDecider,
) -> Source<Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Next>> + Send + 'static,
{
self.via(Flow::identity().map_async_with_supervision(parallelism, f, decider))
}
#[must_use]
pub fn map_async_unordered<Next, F, Fut>(self, parallelism: usize, f: F) -> Source<Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Next>> + Send + 'static,
{
self.via(Flow::identity().map_async_unordered(parallelism, f))
}
#[must_use]
pub fn map_async_unordered_with_supervision<Next, F, Fut>(
self,
parallelism: usize,
f: F,
decider: SupervisionDecider,
) -> Source<Next, Mat>
where
Next: Send + 'static,
F: Fn(Out) -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Next>> + Send + 'static,
{
self.via(Flow::identity().map_async_unordered_with_supervision(parallelism, f, decider))
}
#[must_use]
pub fn map_async_partitioned<Key, Next, Partition, F, Fut>(
self,
parallelism: usize,
per_partition: usize,
partition: Partition,
f: F,
) -> Source<Next, Mat>
where
Key: Clone + Eq + Hash + Send + 'static,
Next: Send + 'static,
Partition: Fn(&Out) -> Key + Send + Sync + 'static,
F: Fn(Out) -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Next>> + Send + 'static,
{
self.via(Flow::identity().map_async_partitioned(parallelism, per_partition, partition, f))
}
#[must_use]
pub fn prefix_and_tail(self, n: usize) -> Source<(Vec<Out>, Source<Out>), Mat> {
self.via(Flow::identity().prefix_and_tail(n))
}
#[must_use]
pub fn flat_map_prefix<Next, FlowMat, F>(self, n: usize, f: F) -> Source<Next, Mat>
where
Next: Send + 'static,
FlowMat: Send + 'static,
F: Fn(Vec<Out>) -> Flow<Out, Next, FlowMat> + Send + Sync + 'static,
Out: Clone,
{
self.via(Flow::identity().flat_map_prefix(n, f))
}
#[must_use]
pub fn group_by<Key, F>(
self,
max_substreams: usize,
f: F,
allow_closed_substream_recreation: bool,
) -> Source<Source<Out>, Mat>
where
Key: Clone + Eq + Hash + Send + 'static,
F: Fn(&Out) -> Key + Send + Sync + 'static,
Out: Clone,
{
let batch_mode = if self.hints.inline_micro.is_some() && !allow_closed_substream_recreation
{
flow::GroupByBatchMode::FiniteEagerNoRecreate
} else {
flow::GroupByBatchMode::Immediate
};
self.via(Flow::identity().group_by_with_batching(
max_substreams,
f,
allow_closed_substream_recreation,
batch_mode,
))
}
#[must_use]
pub fn split_when<F>(self, predicate: F) -> Source<Source<Out>, Mat>
where
F: Fn(&Out) -> bool + Send + Sync + 'static,
Out: Clone,
{
self.via(Flow::identity().split_when(predicate))
}
#[must_use]
pub fn split_after<F>(self, predicate: F) -> Source<Source<Out>, Mat>
where
F: Fn(&Out) -> bool + Send + Sync + 'static,
Out: Clone,
{
self.via(Flow::identity().split_after(predicate))
}
#[must_use]
pub fn flat_map_concat<Next, NextMat, F>(self, f: F) -> Source<Next, Mat>
where
Next: Send + 'static,
NextMat: Send + 'static,
F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
{
self.via(Flow::identity().flat_map_concat(f))
}
#[must_use]
pub fn flat_map_merge<Next, NextMat, F>(self, breadth: usize, f: F) -> Source<Next, Mat>
where
Next: Send + 'static,
NextMat: Send + 'static,
F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
{
self.via(Flow::identity().flat_map_merge(breadth, f))
}
#[must_use]
pub fn take(self, n: usize) -> Source<Out, Mat> {
self.via(Flow::identity().take(n))
}
#[must_use]
pub fn drop(self, n: usize) -> Source<Out, Mat> {
self.via(Flow::identity().drop(n))
}
#[must_use]
pub fn take_while<F>(self, predicate: F) -> Source<Out, Mat>
where
F: Fn(&Out) -> bool + Send + Sync + 'static,
{
self.via(Flow::identity().take_while(predicate))
}
#[must_use]
pub fn drop_while<F>(self, predicate: F) -> Source<Out, Mat>
where
F: Fn(&Out) -> bool + Send + Sync + 'static,
{
self.via(Flow::identity().drop_while(predicate))
}
#[must_use]
pub fn limit(self, max: u64) -> Source<Out, Mat> {
self.via(Flow::identity().limit(max))
}
#[must_use]
pub fn grouped(self, size: usize) -> Source<Vec<Out>, Mat> {
self.via(Flow::identity().grouped(size))
}
#[must_use]
pub fn scan<State, F>(self, seed: State, f: F) -> Source<State, Mat>
where
State: Clone + Send + Sync + 'static,
F: Fn(State, Out) -> State + Send + Sync + 'static,
{
self.via(Flow::identity().scan(seed, f))
}
#[must_use]
pub fn scan_async<State, F, Fut>(self, seed: State, f: F) -> Source<State, Mat>
where
State: Clone + Send + Sync + 'static,
F: Fn(State, Out) -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<State>> + Send + 'static,
{
self.via(Flow::identity().scan_async(seed, f))
}
#[must_use]
pub fn scan_result<State, F>(self, seed: State, f: F) -> Source<State, Mat>
where
State: Clone + Send + Sync + 'static,
F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
{
self.via(Flow::identity().scan_result(seed, f))
}
#[must_use]
pub fn scan_result_with_supervision<State, F>(
self,
seed: State,
f: F,
decider: SupervisionDecider,
) -> Source<State, Mat>
where
State: Clone + Send + Sync + 'static,
F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
{
self.via(Flow::identity().scan_result_with_supervision(seed, f, decider))
}
#[must_use]
pub fn sliding(self, size: usize, step: usize) -> Source<Vec<Out>, Mat>
where
Out: Clone,
{
self.via(Flow::identity().sliding(size, step))
}
#[must_use]
pub fn fold<Acc, F>(self, zero: Acc, f: F) -> Source<Acc, Mat>
where
Acc: Clone + Send + Sync + 'static,
F: Fn(Acc, Out) -> Acc + Send + Sync + 'static,
{
self.via(Flow::identity().fold(zero, f))
}
#[must_use]
pub fn fold_async<Acc, F, Fut>(self, zero: Acc, f: F) -> Source<Acc, Mat>
where
Acc: Clone + Send + Sync + 'static,
F: Fn(Acc, Out) -> Fut + Send + Sync + 'static,
Fut: Future<Output = StreamResult<Acc>> + Send + 'static,
{
self.via(Flow::identity().fold_async(zero, f))
}
#[must_use]
pub fn map_with_resource<Resource, Next, Create, F, Close>(
self,
create: Create,
f: F,
close: Close,
) -> Source<Next, Mat>
where
Resource: Send + 'static,
Next: Send + 'static,
Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
F: Fn(&mut Resource, Out) -> StreamResult<Next> + Send + Sync + 'static,
Close: Fn(Resource) -> StreamResult<Option<Next>> + Send + Sync + 'static,
{
self.via(Flow::identity().map_with_resource(create, f, close))
}
#[must_use]
pub fn fold_result<Acc, F>(self, zero: Acc, f: F) -> Source<Acc, Mat>
where
Acc: Clone + Send + Sync + 'static,
F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
{
self.via(Flow::identity().fold_result(zero, f))
}
#[must_use]
pub fn fold_result_with_supervision<Acc, F>(
self,
zero: Acc,
f: F,
decider: SupervisionDecider,
) -> Source<Acc, Mat>
where
Acc: Clone + Send + Sync + 'static,
F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
{
self.via(Flow::identity().fold_result_with_supervision(zero, f, decider))
}
#[must_use]
pub fn reduce<F>(self, f: F) -> Source<Out, Mat>
where
F: Fn(Out, Out) -> Out + Send + Sync + 'static,
{
self.via(Flow::identity().reduce(f))
}
#[must_use]
pub fn reduce_result<F>(self, f: F) -> Source<Out, Mat>
where
Out: Clone,
F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
{
self.via(Flow::identity().reduce_result(f))
}
#[must_use]
pub fn reduce_result_with_supervision<F>(
self,
f: F,
decider: SupervisionDecider,
) -> Source<Out, Mat>
where
Out: Clone,
F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
{
self.via(Flow::identity().reduce_result_with_supervision(f, decider))
}
#[must_use]
pub fn map_error<F>(self, f: F) -> Source<Out, Mat>
where
F: Fn(StreamError) -> StreamError + Send + Sync + 'static,
{
self.via(Flow::identity().map_error(f))
}
#[must_use]
pub fn recover<F>(self, f: F) -> Source<Out, Mat>
where
F: Fn(StreamError) -> Option<Out> + Send + Sync + 'static,
{
self.via(Flow::identity().recover(f))
}
#[must_use]
pub fn recover_with<F>(self, f: F) -> Source<Out, Mat>
where
F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
{
self.via(Flow::identity().recover_with(f))
}
#[must_use]
pub fn recover_with_retries<F>(self, retries: usize, f: F) -> Source<Out, Mat>
where
F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
{
self.via(Flow::identity().recover_with_retries(retries, f))
}
#[must_use]
pub fn on_error_complete(self) -> Source<Out, Mat> {
self.via(Flow::identity().on_error_complete())
}
#[must_use]
pub fn concat<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
where
Mat2: Send + 'static,
{
let factory = self.factory;
let hints = self.hints;
let that_factory = that.factory;
Source::from_materialized_factory_with_hints(
move |materializer| {
let (primary, mat) = Arc::clone(&factory).create(materializer)?;
let secondary = match Arc::clone(&that_factory).create(materializer) {
Ok((stream, _)) => stream,
Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
};
Ok((concat_source_streams(vec![primary, secondary]), mat))
},
hints,
)
}
#[must_use]
pub fn concat_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
where
Mat2: Send + 'static,
{
let factory = self.factory;
let hints = self.hints;
let that_factory = that.factory;
Source::from_materialized_factory_with_hints(
move |materializer| {
let (primary, mat) = Arc::clone(&factory).create(materializer)?;
Ok((
concat_source_streams_lazy(
primary,
vec![Arc::clone(&that_factory)],
materializer,
),
mat,
))
},
hints,
)
}
#[must_use]
pub fn concat_all_lazy<Mat2, I>(self, those: I) -> Source<Out, Mat>
where
Mat2: Send + 'static,
I: IntoIterator<Item = Source<Out, Mat2>>,
{
let factory = self.factory;
let hints = self.hints;
let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
Source::from_materialized_factory_with_hints(
move |materializer| {
let (primary, mat) = Arc::clone(&factory).create(materializer)?;
Ok((
concat_source_streams_lazy(primary, other_factories.clone(), materializer),
mat,
))
},
hints,
)
}
#[must_use]
pub fn prepend<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
where
Mat2: Send + 'static,
{
let factory = self.factory;
let hints = self.hints;
let that_factory = that.factory;
Source::from_materialized_factory_with_hints(
move |materializer| {
let (primary, mat) = Arc::clone(&factory).create(materializer)?;
let secondary = match Arc::clone(&that_factory).create(materializer) {
Ok((stream, _)) => stream,
Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
};
Ok((concat_source_streams(vec![secondary, primary]), mat))
},
hints,
)
}
#[must_use]
pub fn prepend_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
where
Mat2: Send + 'static,
{
self.prepend(that)
}
#[must_use]
pub fn or_else<Mat2>(self, secondary: Source<Out, Mat2>) -> Source<Out, Mat>
where
Mat2: Send + 'static,
{
let factory = self.factory;
let hints = self.hints;
let secondary_factory = secondary.factory;
Source::from_materialized_factory_with_hints(
move |materializer| {
let (primary, mat) = Arc::clone(&factory).create(materializer)?;
let secondary = match Arc::clone(&secondary_factory).create(materializer) {
Ok((stream, _)) => stream,
Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
};
Ok((or_else_source_stream(primary, secondary), mat))
},
hints,
)
}
#[must_use]
pub fn interleave<Mat2>(self, that: Source<Out, Mat2>, segment_size: usize) -> Source<Out, Mat>
where
Mat2: Send + 'static,
{
self.interleave_all([that], segment_size, false)
}
#[must_use]
pub fn interleave_all<Mat2, I>(
self,
those: I,
segment_size: usize,
eager_close: bool,
) -> Source<Out, Mat>
where
Mat2: Send + 'static,
I: IntoIterator<Item = Source<Out, Mat2>>,
{
let factory = self.factory;
let hints = self.hints;
let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
Source::from_materialized_factory_with_hints(
move |materializer| {
let (primary, mat) = Arc::clone(&factory).create(materializer)?;
let mut streams = Vec::with_capacity(other_factories.len() + 1);
streams.push(primary);
for other in &other_factories {
let stream = match Arc::clone(other).create(materializer) {
Ok((stream, _)) => stream,
Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
};
streams.push(stream);
}
Ok((
interleave_source_streams(streams, segment_size, eager_close),
mat,
))
},
hints,
)
}
#[must_use]
pub fn merge_sorted<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
where
Out: Ord,
Mat2: Send + 'static,
{
self.via(Flow::identity().merge_sorted(that))
}
#[must_use]
pub fn merge_latest<Mat2>(
self,
that: Source<Out, Mat2>,
eager_complete: bool,
) -> Source<Vec<Out>, Mat>
where
Out: Clone,
Mat2: Send + 'static,
{
let factory = self.factory;
let hints = self.hints;
let that_factory = that.factory;
Source::from_materialized_factory_with_hints(
move |materializer| {
let (left, mat) = Arc::clone(&factory).create(materializer)?;
let right = match Arc::clone(&that_factory).create(materializer) {
Ok((stream, _)) => stream,
Err(error) => {
return Ok((
Box::new(std::iter::once(Err(error))) as BoxStream<Vec<Out>>,
mat,
));
}
};
Ok((merge_latest_streams(vec![left, right], eager_complete), mat))
},
hints,
)
}
#[must_use]
pub fn merge_all<Mat2, I>(self, those: I, eager_complete: bool) -> Source<Out, Mat>
where
Mat2: Send + 'static,
I: IntoIterator<Item = Source<Out, Mat2>>,
{
let factory = self.factory;
let hints = self.hints;
let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
Source::from_materialized_factory_with_hints(
move |materializer| {
let (primary, mat) = Arc::clone(&factory).create(materializer)?;
let mut streams = Vec::with_capacity(other_factories.len() + 1);
streams.push(primary);
for other in &other_factories {
let stream = match Arc::clone(other).create(materializer) {
Ok((stream, _)) => stream,
Err(error) => {
return Ok((
Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
mat,
));
}
};
streams.push(stream);
}
Ok((merge_streams(streams, eager_complete), mat))
},
hints,
)
}
#[must_use]
pub fn zip_with<Mat2, Out2, Next, F>(
self,
that: Source<Out2, Mat2>,
combine: F,
) -> Source<Next, Mat>
where
Out2: Send + 'static,
Next: Send + 'static,
Mat2: Send + 'static,
F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
{
self.via(Flow::identity().zip_with(that, combine))
}
#[must_use]
pub fn zip_latest<Mat2, Out2>(self, that: Source<Out2, Mat2>) -> Source<(Out, Out2), Mat>
where
Out: Clone,
Out2: Clone + Send + 'static,
Mat2: Send + 'static,
{
self.zip_latest_with(that, true, |left, right| (left, right))
}
#[must_use]
pub fn zip_latest_with<Mat2, Out2, Next, F>(
self,
that: Source<Out2, Mat2>,
eager_complete: bool,
combine: F,
) -> Source<Next, Mat>
where
Out: Clone,
Out2: Clone + Send + 'static,
Next: Send + 'static,
Mat2: Send + 'static,
F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
{
let factory = self.factory;
let hints = self.hints;
let that_factory = that.factory;
let combine = Arc::new(combine);
Source::from_materialized_factory_with_hints(
move |materializer| {
let (left, mat) = Arc::clone(&factory).create(materializer)?;
let right = match Arc::clone(&that_factory).create(materializer) {
Ok((stream, _)) => stream,
Err(error) => {
return Ok((
Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
mat,
));
}
};
Ok((
zip_latest_with_stream(left, right, eager_complete, Arc::clone(&combine)),
mat,
))
},
hints,
)
}
#[must_use]
pub fn zip_with_index(self) -> Source<(Out, u64), Mat> {
let factory = self.factory;
let hints = self.hints;
Source::from_materialized_factory_with_hints(
move |materializer| {
let (mut stream, mat) = Arc::clone(&factory).create(materializer)?;
let mut index = 0_u64;
Ok((
Box::new(std::iter::from_fn(move || {
stream.next().map(|item| {
item.map(|value| {
let pair = (value, index);
index = index.wrapping_add(1);
pair
})
})
})) as BoxStream<(Out, u64)>,
mat,
))
},
hints,
)
}
#[must_use]
pub fn zip_all<Mat2, Out2>(
self,
that: Source<Out2, Mat2>,
this_elem: Out,
that_elem: Out2,
) -> Source<(Out, Out2), Mat>
where
Out: Clone + Sync,
Out2: Clone + Send + Sync + 'static,
Mat2: Send + 'static,
{
let factory = self.factory;
let hints = self.hints;
let that_factory = that.factory;
Source::from_materialized_factory_with_hints(
move |materializer| {
let (left, mat) = Arc::clone(&factory).create(materializer)?;
let right = match Arc::clone(&that_factory).create(materializer) {
Ok((stream, _)) => stream,
Err(error) => {
return Ok((
Box::new(std::iter::once(Err(error))) as BoxStream<(Out, Out2)>,
mat,
));
}
};
Ok((
zip_all_stream(left, right, this_elem.clone(), that_elem.clone()),
mat,
))
},
hints,
)
}
#[must_use]
pub fn also_to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Source<Out, Mat>
where
Out: Clone,
SinkMat: Send + 'static,
{
self.via(Flow::identity().also_to(sink))
}
#[must_use]
pub fn also_to_all<SinkMat, I>(self, sinks: I) -> Source<Out, Mat>
where
Out: Clone,
SinkMat: Send + 'static,
I: IntoIterator<Item = Sink<Out, SinkMat>>,
{
self.via(Flow::identity().also_to_all(sinks))
}
#[must_use]
pub fn divert_to<SinkMat, F>(self, sink: Sink<Out, SinkMat>, predicate: F) -> Source<Out, Mat>
where
SinkMat: Send + 'static,
F: Fn(&Out) -> bool + Send + Sync + 'static,
{
self.via(Flow::identity().divert_to(sink, predicate))
}
#[must_use]
pub fn wire_tap<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Source<Out, Mat>
where
Out: Clone,
SinkMat: Send + 'static,
{
self.via(Flow::identity().wire_tap(sink))
}
pub fn run_with<SinkMat: Send + 'static>(
self,
sink: Sink<Out, SinkMat>,
) -> StreamResult<SinkMat> {
let fast_result = self
.split_hook
.as_ref()
.zip(sink.fold_fp.as_deref())
.and_then(|(hook, fp)| fp.try_register(Arc::clone(hook)));
if let Some(result) = fast_result {
return result?.downcast::<SinkMat>().map(|b| *b).map_err(|_| {
StreamError::Failed("split fast path: unexpected mat type (internal error)".into())
});
}
self.to_mat(sink, Keep::right).run()
}
pub fn run_with_materializer<SinkMat: Send + 'static>(
self,
sink: Sink<Out, SinkMat>,
materializer: &Materializer,
) -> StreamResult<SinkMat> {
self.to_mat(sink, Keep::right)
.run_with_materializer(materializer)
}
#[must_use]
pub fn to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> RunnableGraph<Mat>
where
SinkMat: Send + 'static,
{
self.to_mat(sink, Keep::left)
}
#[must_use]
pub fn to_mat<SinkMat, Combined, F>(
self,
sink: Sink<Out, SinkMat>,
combine: F,
) -> RunnableGraph<Combined>
where
SinkMat: Send + 'static,
Combined: Send + 'static,
F: Fn(Mat, SinkMat) -> Combined + Send + Sync + 'static,
{
let factory = self.factory;
let hints = self.hints;
let combine = Arc::new(combine);
RunnableGraph::from_runner(move |materializer| {
let (stream, source_mat) = Arc::clone(&factory).create(materializer)?;
let sink_mat = if hints.inline_head_terminal && sink.can_inline() {
let stream =
runtime_checked_stream(stream, Arc::clone(&materializer.inner.state), None);
sink.run_inline(stream, materializer)?
} else {
sink.run_from_source(stream, materializer, hints.runtime())?
};
Ok(combine(source_mat, sink_mat))
})
}
pub fn run_collect(self) -> StreamResult<Vec<Out>> {
self.run_with(Sink::collect())?.wait()
}
#[must_use]
pub fn map_materialized_value<NextMat, F>(self, f: F) -> Source<Out, NextMat>
where
NextMat: Send + 'static,
F: Fn(Mat) -> NextMat + Send + Sync + 'static,
{
let factory = self.factory;
let hints = self.hints;
let f = Arc::new(f);
Source::from_materialized_factory_with_hints(
move |materializer| {
let (stream, mat) = Arc::clone(&factory).create(materializer)?;
Ok((stream, f(mat)))
},
hints,
)
}
}
impl<Out: Clone + Send + Sync + 'static> Source<Out, NotUsed> {
#[must_use]
pub fn combine<Mat1, Mat2, MatRest, I>(
first: Source<Out, Mat1>,
second: Source<Out, Mat2>,
rest: I,
strategy: SourceCombineStrategy,
) -> Source<Out, NotUsed>
where
Mat1: Send + 'static,
Mat2: Send + 'static,
MatRest: Send + 'static,
I: IntoIterator<Item = Source<Out, MatRest>>,
{
let mut factories: Vec<Arc<CombinedSourceFactory<Out>>> = vec![
Arc::new(move |materializer| {
Arc::clone(&first.factory)
.create(materializer)
.map(|(stream, _)| stream)
}),
Arc::new(move |materializer| {
Arc::clone(&second.factory)
.create(materializer)
.map(|(stream, _)| stream)
}),
];
factories.extend(rest.into_iter().map(|source| {
Arc::new(move |materializer: &Materializer| {
Arc::clone(&source.factory)
.create(materializer)
.map(|(stream, _)| stream)
}) as Arc<CombinedSourceFactory<Out>>
}));
Source::from_materialized_factory(move |materializer| {
let mut streams = Vec::with_capacity(factories.len());
for factory in &factories {
let stream = match factory(materializer) {
Ok(stream) => stream,
Err(error) => {
return Ok((
Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
NotUsed,
));
}
};
streams.push(stream);
}
let stream = match &strategy {
SourceCombineStrategy::Merge { eager_complete } => {
merge_streams(streams, *eager_complete)
}
SourceCombineStrategy::Concat => concat_source_streams(streams),
SourceCombineStrategy::Prioritized {
priorities,
eager_complete,
} => {
if priorities.len() != streams.len() {
return Err(StreamError::GraphValidation(format!(
"combine priorities length {} did not match source count {}",
priorities.len(),
streams.len()
)));
}
merge_prioritized_streams(streams, priorities.clone(), *eager_complete)
}
};
Ok((stream, NotUsed))
})
}
#[must_use]
pub fn zip_n<Mat2, I>(sources: I) -> Source<Vec<Out>, NotUsed>
where
I: IntoIterator<Item = Source<Out, Mat2>>,
Mat2: Send + 'static,
Out: Clone,
{
Self::zip_with_n(sources, |values| values)
}
#[must_use]
pub fn zip_with_n<Mat2, I, Next, F>(sources: I, zipper: F) -> Source<Next, NotUsed>
where
I: IntoIterator<Item = Source<Out, Mat2>>,
Mat2: Send + 'static,
Next: Send + 'static,
F: Fn(Vec<Out>) -> Next + Send + Sync + 'static,
{
let factories: Vec<_> = sources.into_iter().map(|source| source.factory).collect();
let zipper = Arc::new(zipper);
Source::from_materialized_factory(move |materializer| {
let mut streams = Vec::with_capacity(factories.len());
for factory in &factories {
let stream = match Arc::clone(factory).create(materializer) {
Ok((stream, _)) => stream,
Err(error) => {
return Ok((
Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
NotUsed,
));
}
};
streams.push(stream);
}
Ok((zip_n_streams(streams, Arc::clone(&zipper)), NotUsed))
})
}
#[must_use]
pub fn merge_prioritized_n<Mat2, I>(
sources_and_priorities: I,
eager_complete: bool,
) -> Source<Out, NotUsed>
where
I: IntoIterator<Item = (Source<Out, Mat2>, usize)>,
Mat2: Send + 'static,
{
let sources_and_priorities: Vec<_> = sources_and_priorities.into_iter().collect();
if sources_and_priorities.is_empty() {
return Source::empty();
}
let (factories, priorities): (Vec<_>, Vec<_>) = sources_and_priorities
.into_iter()
.map(|(source, priority)| (source.factory, priority))
.unzip();
Source::from_materialized_factory(move |materializer| {
let mut streams = Vec::with_capacity(factories.len());
for factory in &factories {
let stream = match Arc::clone(factory).create(materializer) {
Ok((stream, _)) => stream,
Err(error) => {
return Ok((
Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
NotUsed,
));
}
};
streams.push(stream);
}
Ok((
merge_prioritized_streams(streams, priorities.clone(), eager_complete),
NotUsed,
))
})
}
#[must_use]
pub fn maybe() -> (MaybeHandle<Out>, Self) {
let value = Arc::new(Mutex::new(None));
let handle = MaybeHandle {
value: Arc::clone(&value),
};
let source = Self::from_factory(move || {
let result = value
.lock()
.expect("maybe source poisoned")
.clone()
.unwrap_or(Err(StreamError::MaybeIncomplete));
Box::new(std::iter::once(result))
});
(handle, source)
}
#[must_use]
pub fn single(item: Out) -> Self {
Self::from_factory_with_hints(
move || Box::new(std::iter::once(Ok(item.clone()))),
SourceHints::with_inline_micro(1),
)
}
#[must_use]
pub fn repeat(item: Out) -> Self {
Self::from_factory(move || {
let item = item.clone();
Box::new(std::iter::repeat_with(move || Ok(item.clone())))
})
}
#[must_use]
pub fn from_iterable<I>(items: I) -> Self
where
I: IntoIterator<Item = Out>,
{
items.into_iter().collect()
}
}
impl<Out: Clone + Send + Sync + 'static> FromIterator<Out> for Source<Out, NotUsed> {
fn from_iter<T: IntoIterator<Item = Out>>(iter: T) -> Self {
let items: Arc<[Out]> = iter.into_iter().collect();
let len = items.len();
Self::from_factory_with_hints(
move || {
let items = Arc::clone(&items);
let mut index = 0;
Box::new(std::iter::from_fn(move || {
let item = items.get(index)?.clone();
index += 1;
Some(Ok(item))
}))
},
SourceHints::with_inline_micro(len),
)
}
}
#[cfg(test)]
pub(in crate::stream) fn test_source_with_inline_micro_hint<Out: Send + 'static>(
factory: impl Fn() -> BoxStream<Out> + Send + Sync + 'static,
max_success_items: usize,
) -> Source<Out, NotUsed> {
Source::from_factory_with_hints(factory, SourceHints::with_inline_micro(max_success_items))
}
struct UnfoldResourceStream<Resource, Out, Create, Read, Close>
where
Create: Fn() -> StreamResult<Resource>,
Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
Close: Fn(Resource) -> StreamResult<()>,
{
create: Arc<Create>,
read: Arc<Read>,
close: Arc<Close>,
resource: Option<Resource>,
created: bool,
terminated: bool,
_marker: PhantomData<fn() -> Out>,
}
impl<Resource, Out, Create, Read, Close> UnfoldResourceStream<Resource, Out, Create, Read, Close>
where
Create: Fn() -> StreamResult<Resource>,
Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
Close: Fn(Resource) -> StreamResult<()>,
{
fn ensure_created(&mut self) -> StreamResult<()> {
if self.created {
return Ok(());
}
self.created = true;
let resource = catch_unwind_failed("unfold_resource create", || (self.create)())
.and_then(|result| result)?;
self.resource = Some(resource);
Ok(())
}
fn close_resource(&mut self) -> StreamResult<()> {
match self.resource.take() {
Some(resource) => {
catch_unwind_failed("unfold_resource close", || (self.close)(resource))
.and_then(|result| result)
}
None => Ok(()),
}
}
}
impl<Resource, Out, Create, Read, Close> Iterator
for UnfoldResourceStream<Resource, Out, Create, Read, Close>
where
Create: Fn() -> StreamResult<Resource>,
Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
Close: Fn(Resource) -> StreamResult<()>,
{
type Item = StreamResult<Out>;
fn next(&mut self) -> Option<Self::Item> {
if self.terminated {
return None;
}
if let Err(error) = self.ensure_created() {
self.terminated = true;
return Some(Err(error));
}
let result = {
let resource = self
.resource
.as_mut()
.expect("unfold_resource resource is open");
catch_unwind_failed("unfold_resource read", || (self.read)(resource))
.and_then(|result| result)
};
match result {
Ok(Some(item)) => Some(Ok(item)),
Ok(None) => {
self.terminated = true;
match self.close_resource() {
Ok(()) => None,
Err(error) => Some(Err(error)),
}
}
Err(read_error) => {
self.terminated = true;
let _ = self.close_resource();
Some(Err(read_error))
}
}
}
}
impl<Resource, Out, Create, Read, Close> Drop
for UnfoldResourceStream<Resource, Out, Create, Read, Close>
where
Create: Fn() -> StreamResult<Resource>,
Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
Close: Fn(Resource) -> StreamResult<()>,
{
fn drop(&mut self) {
let _ = self.close_resource();
}
}
type UnfoldResourceAsyncMarker<Out, CreateFut, ReadFut, CloseFut> =
fn() -> (Out, CreateFut, ReadFut, CloseFut);
struct UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
where
Resource: Send + 'static,
Create: Fn() -> CreateFut,
CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
Read: Fn(&mut Resource) -> ReadFut,
ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
Close: Fn(Resource) -> CloseFut,
CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
{
create: Arc<Create>,
read: Arc<Read>,
close: Arc<Close>,
resource: Option<Resource>,
created: bool,
terminated: bool,
_marker: PhantomData<UnfoldResourceAsyncMarker<Out, CreateFut, ReadFut, CloseFut>>,
}
impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
where
Resource: Send + 'static,
Create: Fn() -> CreateFut,
CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
Read: Fn(&mut Resource) -> ReadFut,
ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
Close: Fn(Resource) -> CloseFut,
CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
{
fn ensure_created(&mut self) -> StreamResult<()> {
if self.created {
return Ok(());
}
self.created = true;
let resource = catch_unwind_failed("unfold_resource_async create", || (self.create)())
.and_then(flow::run_future_inline_or_spawn)?;
self.resource = Some(resource);
Ok(())
}
fn close_resource(&mut self) -> StreamResult<()> {
match self.resource.take() {
Some(resource) => {
catch_unwind_failed("unfold_resource_async close", || (self.close)(resource))
.and_then(flow::run_future_inline_or_spawn)
}
None => Ok(()),
}
}
}
impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut> Iterator
for UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
where
Resource: Send + 'static,
Out: Send + 'static,
Create: Fn() -> CreateFut,
CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
Read: Fn(&mut Resource) -> ReadFut,
ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
Close: Fn(Resource) -> CloseFut,
CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
{
type Item = StreamResult<Out>;
fn next(&mut self) -> Option<Self::Item> {
if self.terminated {
return None;
}
if let Err(error) = self.ensure_created() {
self.terminated = true;
return Some(Err(error));
}
let result = {
let resource = self
.resource
.as_mut()
.expect("unfold_resource_async resource is open");
catch_unwind_failed("unfold_resource_async read", || (self.read)(resource))
.and_then(flow::run_future_inline_or_spawn)
};
match result {
Ok(Some(item)) => Some(Ok(item)),
Ok(None) => {
self.terminated = true;
match self.close_resource() {
Ok(()) => None,
Err(error) => Some(Err(error)),
}
}
Err(read_error) => {
self.terminated = true;
let _ = self.close_resource();
Some(Err(read_error))
}
}
}
}
impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut> Drop
for UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
where
Resource: Send + 'static,
Create: Fn() -> CreateFut,
CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
Read: Fn(&mut Resource) -> ReadFut,
ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
Close: Fn(Resource) -> CloseFut,
CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
{
fn drop(&mut self) {
let _ = self.close_resource();
}
}
struct LazySourceStream<Out, InnerMat, F> {
create: Arc<F>,
materializer: Materializer,
current: Option<BoxStream<Out>>,
mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
initialized: bool,
terminated: bool,
}
impl<Out, InnerMat, F> LazySourceStream<Out, InnerMat, F>
where
Out: Send + 'static,
InnerMat: Send + 'static,
F: Fn() -> Source<Out, InnerMat>,
{
fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
if let Some(sender) = self.mat_sender.take() {
let _ = sender.send(result);
}
}
fn initialize(&mut self) -> StreamResult<()> {
if self.initialized {
return Ok(());
}
self.initialized = true;
let source = match catch_unwind_failed("lazy_source factory", || (self.create)()) {
Ok(source) => source,
Err(error) => {
self.complete_mat(Err(error.clone()));
return Err(error);
}
};
match Arc::clone(&source.factory).create(&self.materializer) {
Ok((stream, mat)) => {
self.current = Some(stream);
self.complete_mat(Ok(mat));
Ok(())
}
Err(error) => {
self.complete_mat(Err(error.clone()));
Err(error)
}
}
}
}
impl<Out, InnerMat, F> Iterator for LazySourceStream<Out, InnerMat, F>
where
Out: Send + 'static,
InnerMat: Send + 'static,
F: Fn() -> Source<Out, InnerMat>,
{
type Item = StreamResult<Out>;
fn next(&mut self) -> Option<Self::Item> {
if self.terminated {
return None;
}
if let Err(error) = self.initialize() {
self.terminated = true;
return Some(Err(error));
}
match self
.current
.as_mut()
.expect("lazy_source current stream initialized")
.next()
{
Some(Ok(item)) => Some(Ok(item)),
Some(Err(error)) => {
self.terminated = true;
Some(Err(error))
}
None => {
self.terminated = true;
None
}
}
}
}
impl<Out, InnerMat, F> Drop for LazySourceStream<Out, InnerMat, F> {
fn drop(&mut self) {
if !self.initialized
&& let Some(sender) = self.mat_sender.take()
{
let _ = sender.send(Err(StreamError::Failed(
"lazy source was never materialized".into(),
)));
}
}
}
struct LazyFutureSourceStream<Out, InnerMat, F, Fut> {
create: Arc<F>,
materializer: Materializer,
current: Option<BoxStream<Out>>,
mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
initialized: bool,
terminated: bool,
_marker: PhantomData<fn() -> Fut>,
}
impl<Out, InnerMat, F, Fut> LazyFutureSourceStream<Out, InnerMat, F, Fut>
where
Out: Send + 'static,
InnerMat: Send + 'static,
F: Fn() -> Fut,
Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
{
fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
if let Some(sender) = self.mat_sender.take() {
let _ = sender.send(result);
}
}
fn initialize(&mut self) -> StreamResult<()> {
if self.initialized {
return Ok(());
}
self.initialized = true;
let source = match catch_unwind_failed("lazy_future_source factory", || (self.create)())
.and_then(flow::run_future_inline_or_spawn)
{
Ok(source) => source,
Err(error) => {
self.complete_mat(Err(error.clone()));
return Err(error);
}
};
match Arc::clone(&source.factory).create(&self.materializer) {
Ok((stream, mat)) => {
self.current = Some(stream);
self.complete_mat(Ok(mat));
Ok(())
}
Err(error) => {
self.complete_mat(Err(error.clone()));
Err(error)
}
}
}
}
impl<Out, InnerMat, F, Fut> Iterator for LazyFutureSourceStream<Out, InnerMat, F, Fut>
where
Out: Send + 'static,
InnerMat: Send + 'static,
F: Fn() -> Fut,
Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
{
type Item = StreamResult<Out>;
fn next(&mut self) -> Option<Self::Item> {
if self.terminated {
return None;
}
if let Err(error) = self.initialize() {
self.terminated = true;
return Some(Err(error));
}
match self
.current
.as_mut()
.expect("lazy_future_source current stream initialized")
.next()
{
Some(Ok(item)) => Some(Ok(item)),
Some(Err(error)) => {
self.terminated = true;
Some(Err(error))
}
None => {
self.terminated = true;
None
}
}
}
}
impl<Out, InnerMat, F, Fut> Drop for LazyFutureSourceStream<Out, InnerMat, F, Fut> {
fn drop(&mut self) {
if !self.initialized
&& let Some(sender) = self.mat_sender.take()
{
let _ = sender.send(Err(StreamError::Failed(
"lazy future source was never materialized".into(),
)));
}
}
}
fn concat_source_streams<Out>(streams: Vec<BoxStream<Out>>) -> BoxStream<Out>
where
Out: Send + 'static,
{
let mut streams: VecDeque<_> = streams.into();
let mut current = streams.pop_front();
Box::new(std::iter::from_fn(move || {
loop {
match current.as_mut() {
Some(stream) => match stream.next() {
Some(item) => return Some(item),
None => current = streams.pop_front(),
},
None => return None,
}
}
}))
}
fn concat_source_streams_lazy<Out, Mat>(
initial: BoxStream<Out>,
factories: Vec<Arc<dyn SourceFactory<Out, Mat>>>,
materializer: &Materializer,
) -> BoxStream<Out>
where
Out: Send + 'static,
Mat: Send + 'static,
{
let mut current = Some(initial);
let mut remaining: VecDeque<_> = factories.into();
let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
Box::new(std::iter::from_fn(move || {
loop {
match current.as_mut() {
Some(stream) => match stream.next() {
Some(item) => return Some(item),
None => {
current = remaining.pop_front().map(|factory| {
match factory.create(&materializer) {
Ok((stream, _)) => stream,
Err(error) => {
Box::new(std::iter::once(Err(error))) as BoxStream<Out>
}
}
});
}
},
None => return None,
}
}
}))
}
fn or_else_source_stream<Out>(
mut primary: BoxStream<Out>,
mut secondary: BoxStream<Out>,
) -> BoxStream<Out>
where
Out: Send + 'static,
{
let mut primary_emitted = false;
let mut using_secondary = false;
Box::new(std::iter::from_fn(move || {
loop {
if using_secondary {
return secondary.next();
}
match primary.next() {
Some(Ok(item)) => {
primary_emitted = true;
return Some(Ok(item));
}
Some(Err(error)) => return Some(Err(error)),
None if primary_emitted => return None,
None => using_secondary = true,
}
}
}))
}
fn interleave_source_streams<Out>(
streams: Vec<BoxStream<Out>>,
segment_size: usize,
eager_close: bool,
) -> BoxStream<Out>
where
Out: Send + 'static,
{
if segment_size == 0 {
return Box::new(std::iter::once(Err(StreamError::GraphValidation(
"interleave segment size must be greater than zero".into(),
))));
}
let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
let mut pending: Vec<Option<StreamResult<Out>>> = (0..streams.len()).map(|_| None).collect();
let mut current = 0usize;
let mut emitted = 0usize;
Box::new(std::iter::from_fn(move || {
loop {
if streams.iter().all(Option::is_none) {
return None;
}
if streams[current].is_none() {
match next_active_source_stream(&streams, current) {
Some(next) => {
current = next;
emitted = 0;
}
None => return None,
}
}
let Some(stream) = streams[current].as_mut() else {
continue;
};
let next_item = pending[current].take().or_else(|| stream.next());
match next_item {
Some(Ok(item)) => {
emitted += 1;
if emitted == segment_size {
emitted = 0;
if let Some(next) = next_active_source_stream(&streams, current) {
current = next;
}
}
return Some(Ok(item));
}
Some(Err(error)) => return Some(Err(error)),
None => {
streams[current] = None;
emitted = 0;
if eager_close {
return None;
}
match next_active_source_stream(&streams, current) {
Some(next) => current = next,
None => return None,
}
}
}
}
}))
}
fn next_active_source_stream<Out>(
streams: &[Option<BoxStream<Out>>],
current: usize,
) -> Option<usize>
where
Out: Send + 'static,
{
if streams.is_empty() {
return None;
}
for offset in 1..=streams.len() {
let index = (current + offset) % streams.len();
if streams[index].is_some() {
return Some(index);
}
}
None
}