use crate::*;
use futures::future::select_all;
use never::Never;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::{future::Future, time::Instant};
use tokio::{
self, select,
time::{sleep, sleep_until},
};
pub fn map<E, I, O, F, Fut>(source: E, mut f: F) -> Eventual<O>
where
E: IntoReader<Output = I>,
F: 'static + Send + FnMut(I) -> Fut,
I: Value,
O: Value,
Fut: Send + Future<Output = O>,
{
let mut source = source.into_reader();
Eventual::spawn(|mut writer| async move {
loop {
writer.write(f(source.next().await?).await);
}
})
}
pub fn timer(interval: Duration) -> Eventual<Instant> {
Eventual::spawn(move |mut writer| async move {
loop {
writer.write(Instant::now());
sleep(interval).await;
}
})
}
pub trait Joinable {
type Output;
fn join(self) -> Eventual<Self::Output>;
}
macro_rules! impl_tuple {
($len:expr, $($T:ident, $t:ident),*) => {
impl<T, $($T,)*> Selectable for ($($T,)*)
where
$($T: IntoReader<Output = T>,)*
T: Value,
{
type Output = T;
fn select(self) -> Eventual<Self::Output> {
let ($($t),*) = self;
$(let $t = $t.into_reader();)*
#[allow(deprecated)]
vec![$($t),*].select()
}
}
impl<$($T,)*> Joinable for ($($T,)*)
where
$($T: IntoReader,)*
{
type Output = ($($T::Output),*);
#[allow(non_snake_case)]
fn join(self) -> Eventual<Self::Output> {
let ($($T),*) = self;
$(let mut $T = $T.into_reader();)*
Eventual::spawn(move |mut writer| async move {
let mut len: usize = 0;
let mut count: usize = 0;
$(let mut $t = None; len += 1;)*
let ($(mut $t,)*) = loop {
select! {
$(
next = $T.next() => {
if $t.replace(next?).is_none() {
count += 1;
}
}
)*
}
if count == len {
break ($($t.unwrap()),*);
}
};
loop {
writer.write(($($t.clone(),)*));
select! {
$(
next = $T.next() => {
$t = next?;
}
)*
}
}
})
}
}
};
}
macro_rules! impl_tuples {
($len:expr, $A:ident, $a:ident) => { };
($len:expr, $A:ident, $a:ident, $($T:ident, $t:ident),+) => {
impl_tuple!($len, $A, $a, $($T, $t),+);
impl_tuples!($len - 1, $($T, $t),+);
}
}
impl_tuples!(12, A, a, B, b, C, c, D, d, E, e, F, f, G, g, H, h, I, i, J, j, K, k, L, l);
pub fn join<J>(joinable: J) -> Eventual<J::Output>
where
J: Joinable,
{
joinable.join()
}
pub trait Selectable {
type Output;
#[deprecated = "Not deterministic. This doesn't seem as harmful as filter, because it doesn't appear to miss updates."]
fn select(self) -> Eventual<Self::Output>;
}
#[deprecated = "Not deterministic. This doesn't seem as harmful as filter, because it doesn't appear to miss updates."]
pub fn select<S>(selectable: S) -> Eventual<S::Output>
where
S: Selectable,
{
#[allow(deprecated)]
selectable.select()
}
impl<R> Selectable for Vec<R>
where
R: IntoReader,
{
type Output = R::Output;
fn select(self) -> Eventual<Self::Output> {
let mut readers: Vec<_> = self.into_iter().map(|v| v.into_reader()).collect();
Eventual::spawn(move |mut writer| async move {
loop {
if readers.len() == 0 {
return Err(Closed);
}
let read_futs: Vec<_> = readers.iter_mut().map(|r| r.next()).collect();
let (output, index, remainder) = select_all(read_futs).await;
drop(remainder);
match output {
Ok(value) => {
writer.write(value);
}
Err(Closed) => {
readers.remove(index);
}
}
}
})
}
}
pub fn throttle<E>(read: E, duration: Duration) -> Eventual<E::Output>
where
E: IntoReader,
{
let mut read = read.into_reader();
Eventual::spawn(move |mut writer| async move {
loop {
let mut next = read.next().await?;
let end = tokio::time::Instant::now() + duration;
loop {
select! {
n = read.next() => {
next = n?;
}
_ = sleep_until(end) => {
break;
}
}
}
writer.write(next);
}
})
}
pub fn pipe<E, F>(reader: E, mut f: F) -> PipeHandle
where
E: IntoReader,
F: 'static + Send + FnMut(E::Output),
{
let mut reader = reader.into_reader();
#[allow(unreachable_code)]
PipeHandle::new(Eventual::spawn(|_writer| async move {
loop {
f(reader.next().await?);
}
drop(_writer);
}))
}
pub fn pipe_async<E, F, Fut>(reader: E, mut f: F) -> PipeHandle
where
E: IntoReader,
F: 'static + Send + FnMut(E::Output) -> Fut,
Fut: Send + Future<Output = ()>,
{
let mut reader = reader.into_reader();
#[allow(unreachable_code)]
PipeHandle::new(Eventual::spawn(|_writer| async move {
loop {
f(reader.next().await?).await;
}
drop(_writer);
}))
}
#[must_use]
pub struct PipeHandle {
inner: Eventual<Never>,
}
impl PipeHandle {
fn new(eventual: Eventual<Never>) -> Self {
Self { inner: eventual }
}
#[inline]
pub fn forever(self) {
let Self { inner } = self;
tokio::task::spawn(async move {
let _closed = inner.value().await;
});
}
}
#[deprecated = "Not deterministic. This is a special case of filter. Retry should be better"]
pub fn handle_errors<E, F, Ok, Err>(source: E, mut f: F) -> Eventual<Ok>
where
E: IntoReader<Output = Result<Ok, Err>>,
F: 'static + Send + FnMut(Err),
Ok: Value,
Err: Value,
{
let mut reader = source.into_reader();
Eventual::spawn(move |mut writer| async move {
loop {
match reader.next().await? {
Ok(v) => writer.write(v),
Err(e) => f(e),
}
}
})
}
pub fn retry<Ok, Err, F, Fut>(mut f: F) -> Eventual<Ok>
where
Ok: Value,
Err: Value,
Fut: Send + Future<Output = Eventual<Result<Ok, Err>>>,
F: 'static + Send + FnMut(Option<Err>) -> Fut,
{
Eventual::spawn(move |mut writer| async move {
loop {
let mut e = f(None).await.subscribe();
let mut next = e.next().await;
loop {
match next? {
Ok(v) => {
writer.write(v);
next = e.next().await;
}
Err(err) => {
select! {
e_temp = f(Some(err)) => {
e = e_temp.subscribe();
next = e.next().await;
}
n_temp = e.next() => {
next = n_temp;
}
}
}
}
}
}
})
}
pub fn map_with_retry<Ok, Err, F, Fut, E, FutE, R>(source: R, f: F, on_err: E) -> Eventual<Ok>
where
R: IntoReader,
F: 'static + Send + FnMut(R::Output) -> Fut,
E: 'static + Send + Sync + FnMut(Err) -> FutE,
Ok: Value,
Err: Value,
Fut: Send + Future<Output = Result<Ok, Err>>,
FutE: Send + Future<Output = ()>,
{
let source = source.into_reader();
let f = Arc::new(Mutex::new(f));
let on_err = Arc::new(Mutex::new(on_err));
retry(move |e| {
let mut reader = source.clone();
let f = f.clone();
let on_err = on_err.clone();
async move {
if let Some(e) = e {
let fut = {
let mut locked = on_err.lock().unwrap();
locked(e)
};
fut.await;
reader.force_dirty();
}
map(reader, move |value| {
let fut = {
let mut locked = f.lock().unwrap();
locked(value)
};
fut
})
}
})
}
pub fn init_with<R>(source: R, value: R::Output) -> Eventual<R::Output>
where
R: IntoReader,
{
let mut source = source.into_reader();
Eventual::spawn(|mut writer| async move {
writer.write(value);
loop {
writer.write(source.next().await?);
}
})
}
pub fn prefer<R1, R2, T>(source_1: R1, source_2: R2) -> Eventual<T>
where
R1: IntoReader<Output = T>,
R2: IntoReader<Output = T>,
T: Value,
{
let mut source_1 = source_1.into_reader();
let mut source_2 = source_2.into_reader();
Eventual::spawn(|mut writer| async move {
loop {
select! {
biased;
one = source_1.next() => {
if let Ok(one) = one {
writer.write(one);
break;
} else {
loop {
writer.write(source_2.next().await?);
}
}
}
two = source_2.next() => {
if let Ok(two) = two {
writer.write(two);
} else {
break;
}
}
}
}
drop(source_2);
loop {
writer.write(source_1.next().await?);
}
})
}
#[deprecated = "Unsure if this meets determinism requirements"]
pub fn flatten<R1, R2>(outer: R1) -> Eventual<R2::Output>
where
R1: IntoReader<Output = R2>,
R2: IntoReader,
R2: Value,
{
let mut outer = outer.into_reader();
Eventual::spawn(|mut writer| async move {
let mut inner = outer.next().await?.into_reader();
loop {
select! {
next = outer.next() => {
if let Ok(next) = next {
inner = next.into_reader();
} else {
loop {
writer.write(inner.next().await?);
}
}
}
next = inner.next() => {
if let Ok(next) = next {
writer.write(next);
} else {
inner = outer.next().await?.into_reader();
}
}
}
}
})
}