use std::future::Future;
struct ExecutionSettings {
max_concurrency: usize,
}
pub struct Source<T, F> {
data: Vec<T>,
func: F,
settings: ExecutionSettings,
}
impl<T> Source<T, fn(T) -> Option<T>> {
pub fn new<I>(input: I) -> Self
where
I: IntoIterator<Item = T>,
{
let input = input.into_iter();
Source {
data: input.collect(),
func: Some,
settings: ExecutionSettings { max_concurrency: 1 },
}
}
}
impl<T, F> Source<T, F> {
pub fn map<G, U>(self, mut g: G) -> Source<T, impl FnMut(T) -> Option<U>>
where
F: FnMut(T) -> Option<T>,
G: FnMut(T) -> U,
{
let mut f = self.func;
Source {
data: self.data,
func: move |x| {
let y = f(x);
y.map(&mut g)
},
settings: self.settings,
}
}
pub fn map_async<G, U, Fut, V>(self, mut g: G) -> Source<T, impl FnMut(T) -> Option<Fut>>
where
F: FnMut(T) -> Option<V>,
G: FnMut(V) -> Fut,
Fut: Future<Output = U>,
{
let mut f = self.func;
Source {
data: self.data,
func: move |x| {
let y = f(x);
y.map(&mut g)
},
settings: self.settings,
}
}
pub fn filter<G, U>(self, mut g: G) -> Source<T, impl FnMut(T) -> Option<U>>
where
F: FnMut(T) -> Option<U>,
G: FnMut(&U) -> bool,
{
let mut f = self.func;
Source {
data: self.data,
func: move |x| {
let y = f(x);
y.and_then(|v| if g(&v) { Some(v) } else { None })
},
settings: self.settings,
}
}
pub fn tap<G>(self, mut g: G) -> Source<T, impl FnMut(T) -> Option<T>>
where
F: FnMut(T) -> Option<T>,
G: FnMut(&T),
{
let mut f = self.func;
Source {
data: self.data,
func: move |x| {
let y = f(x);
y.inspect(|v| g(v))
},
settings: self.settings,
}
}
pub fn buffer(self, len: usize) -> Source<T, F> {
let new_settings = ExecutionSettings {
max_concurrency: len,
};
Source {
settings: new_settings,
..self
}
}
pub async fn collect<O>(self) -> Vec<O>
where
F: FnMut(T) -> Option<O>,
{
let mut out: Vec<O> = Vec::new();
self.run(|item| {
out.push(item);
async {}
})
.await;
out
}
pub async fn run<S, Fut, O>(mut self, mut sink: S)
where
F: FnMut(T) -> Option<O>,
S: FnMut(O) -> Fut,
Fut: Future<Output = ()>,
{
let mut queue = vec![];
for item in self.data {
if let Some(out) = (self.func)(item) {
queue.push(out);
if queue.len() >= self.settings.max_concurrency
&& let Some(concurrent) = queue.pop()
{
sink(concurrent).await
}
}
}
while !queue.is_empty() {
if let Some(out) = queue.pop() {
sink(out).await
}
}
}
}