use futures::stream::FuturesUnordered;
use std::sync::Arc;
use tokio_stream::Stream;
pub trait Processor<I, O> {
#[allow(missing_docs)]
fn process(&self, input: I) -> impl Future<Output = O> + Send;
}
impl<I: Send, O: Send, F: Future<Output = O> + Send> Processor<I, O> for fn(I) -> F {
fn process(&self, input: I) -> impl Future<Output = O> + Send {
(self)(input)
}
}
impl<P, I, O> Processor<I, O> for &mut P
where
P: Processor<I, O> + ?Sized,
{
fn process(&self, request: I) -> impl Future<Output = O> + Send {
(**self).process(request)
}
}
pub trait FinalProcessor<I, O> {
#[allow(missing_docs)]
fn process(state: Arc<Self>, input: I) -> impl Future<Output = O> + Send;
}
pub trait RefProcessor<Borrowed, O, Owned = ()> {
#[allow(missing_docs)]
fn process<'a, 'b>(
&'a self,
deps: &'b Borrowed,
input: Owned,
) -> impl Future<Output = O> + Send + 'a + 'b
where
Owned: 'a + 'b;
}
pub fn parallel_map_borrowed<'input, I, O, RP, Iter>(
iter: Iter,
ref_processor: &RP,
) -> impl Stream<Item = O> + Send
where
I: Send + Sync + 'input,
O: Send + Sync,
RP: RefProcessor<I, O> + Send + Sync,
Iter: Iterator<Item = &'input I> + Send + Sync,
{
let set: FuturesUnordered<_> = iter.map(|input| ref_processor.process(input, ())).collect();
set
}
pub fn parallel_map<'p, I, O, P, Iter>(
iter: Iter,
ref_processor: &'p P,
) -> impl Stream<Item = O> + Send + 'p
where
I: Send + Sync + 'p,
O: Send + Sync + 'p,
P: Processor<I, O> + Send + Sync,
Iter: Iterator<Item = I> + Send + Sync,
{
let set: FuturesUnordered<_> = iter.map(|input| ref_processor.process(input)).collect();
set
}