use crate::core::*;
use std::sync::Arc;
use std::time::Duration;
pub struct Schedule<In, Out> {
pub(crate) driver:
Arc<dyn Fn() -> Box<dyn ScheduleDriver<In, Out> + Send + Sync> + Send + Sync>,
}
impl<In, Out> Clone for Schedule<In, Out> {
fn clone(&self) -> Self {
Self {
driver: self.driver.clone(),
}
}
}
impl<In, Out> Schedule<In, Out>
where
In: Send + Sync + 'static,
Out: Send + Sync + 'static + Clone,
{
pub fn recurs(n: usize) -> Schedule<In, usize> {
Schedule {
driver: Arc::new(move || Box::new(RecursDriver { max: n, count: 0 })),
}
}
pub fn spaced(duration: Duration) -> Schedule<In, ()> {
Schedule {
driver: Arc::new(move || Box::new(SpacedDriver { duration })),
}
}
pub fn exponential(base: Duration, factor: f64) -> Schedule<In, Duration> {
Schedule {
driver: Arc::new(move || {
Box::new(ExponentialDriver {
current: base,
factor,
count: 0,
})
}),
}
}
pub fn jittered(self, min: f64, max: f64) -> Schedule<In, Out>
where
Out: Send + Sync + 'static + Clone, {
Schedule {
driver: Arc::new(move || {
let inner_driver = (self.driver)();
Box::new(JitteredDriver {
inner: inner_driver,
min,
max,
})
}),
}
}
pub fn intersect<Out2>(self, other: Schedule<In, Out2>) -> Schedule<In, (Out, Out2)>
where
Out2: Send + Sync + 'static + Clone,
In: Clone,
{
Schedule {
driver: Arc::new(move || {
Box::new(IntersectDriver {
d1: (self.driver)(),
d2: (other.driver)(),
})
}),
}
}
pub fn union<Out2>(self, other: Schedule<In, Out2>) -> Schedule<In, (Option<Out>, Option<Out2>)>
where
Out2: Send + Sync + 'static + Clone,
In: Clone,
{
Schedule {
driver: Arc::new(move || {
Box::new(UnionDriver {
d1: (self.driver)(),
d2: (other.driver)(),
})
}),
}
}
pub fn driver(&self) -> Box<dyn ScheduleDriver<In, Out> + Send + Sync> {
(self.driver)()
}
}
struct RecursDriver {
max: usize,
count: usize,
}
impl<In> ScheduleDriver<In, usize> for RecursDriver {
fn next(&mut self, _input: In) -> Option<(usize, Duration)> {
if self.count < self.max {
self.count += 1;
Some((self.count, Duration::ZERO))
} else {
None
}
}
}
struct SpacedDriver {
duration: Duration,
}
impl<In> ScheduleDriver<In, ()> for SpacedDriver {
fn next(&mut self, _input: In) -> Option<((), Duration)> {
Some(((), self.duration))
}
}
struct ExponentialDriver {
current: Duration,
factor: f64,
count: usize,
}
impl<In> ScheduleDriver<In, Duration> for ExponentialDriver {
fn next(&mut self, _input: In) -> Option<(Duration, Duration)> {
let delay = self.current;
self.count += 1;
self.current = self.current.mul_f64(self.factor);
Some((delay, delay))
}
}
struct IntersectDriver<In, Out1, Out2> {
d1: Box<dyn ScheduleDriver<In, Out1> + Send + Sync>,
d2: Box<dyn ScheduleDriver<In, Out2> + Send + Sync>,
}
impl<In, Out1, Out2> ScheduleDriver<In, (Out1, Out2)> for IntersectDriver<In, Out1, Out2>
where
In: Clone,
{
fn next(&mut self, input: In) -> Option<((Out1, Out2), Duration)> {
let r1 = self.d1.next(input.clone());
let r2 = self.d2.next(input);
match (r1, r2) {
(Some((out1, delay1)), Some((out2, delay2))) => {
Some(((out1, out2), std::cmp::max(delay1, delay2)))
}
_ => None,
}
}
}
struct UnionDriver<In, Out1, Out2> {
d1: Box<dyn ScheduleDriver<In, Out1> + Send + Sync>,
d2: Box<dyn ScheduleDriver<In, Out2> + Send + Sync>,
}
impl<In, Out1, Out2> ScheduleDriver<In, (Option<Out1>, Option<Out2>)>
for UnionDriver<In, Out1, Out2>
where
In: Clone,
{
fn next(&mut self, input: In) -> Option<((Option<Out1>, Option<Out2>), Duration)> {
let r1 = self.d1.next(input.clone());
let r2 = self.d2.next(input);
match (r1, r2) {
(Some((out1, delay1)), Some((out2, delay2))) => {
Some(((Some(out1), Some(out2)), std::cmp::min(delay1, delay2)))
}
(Some((out1, delay1)), None) => Some(((Some(out1), None), delay1)),
(None, Some((out2, delay2))) => Some(((None, Some(out2)), delay2)),
(None, None) => None,
}
}
}
struct JitteredDriver<In, Out> {
inner: Box<dyn ScheduleDriver<In, Out> + Send + Sync>,
min: f64,
max: f64,
}
pub trait ScheduleDriver<In, Out> {
fn next(&mut self, input: In) -> Option<(Out, Duration)>;
}
impl<In, Out> ScheduleDriver<In, Out> for JitteredDriver<In, Out> {
fn next(&mut self, input: In) -> Option<(Out, Duration)> {
if let Some((out, delay)) = self.inner.next(input) {
let jitter = rand::random::<f64>() * (self.max - self.min) + self.min;
let jittered: Duration = delay.mul_f64(1.0 + jitter);
Some((out, jittered))
} else {
None
}
}
}
impl<R, E, A> Effect<R, E, A>
where
R: Send + Sync + 'static + Clone,
E: Send + Sync + 'static + Clone,
A: Send + Sync + 'static + Clone,
{
pub fn retry<Out>(self, policy: Schedule<E, Out>) -> Effect<R, E, A>
where
Out: Send + Sync + 'static + Clone,
{
Effect {
inner: Arc::new(move |env: EnvRef<R>, ctx: Ctx| {
let effect = self.clone();
let policy = policy.clone();
Box::pin(async move {
let mut driver = policy.driver();
loop {
let result = (effect.inner)(env.clone(), ctx.clone()).await;
match result {
Exit::Success(a) => return Exit::Success(a),
Exit::Failure(cause) => {
match cause {
Cause::Fail(e) => {
if let Some((_, delay)) = driver.next(e.clone()) {
if !delay.is_zero() {
ctx.clock.sleep(delay).await;
}
continue;
} else {
return Exit::Failure(Cause::Fail(e));
}
}
_ => return Exit::Failure(cause),
}
}
}
}
})
}),
}
}
}