effect-rs 0.1.0

A high-performance, strictly-typed, functional effect system for Rust.
Documentation
use crate::core::*;
use std::sync::Arc;
use std::time::Duration;

// use std::sync::Mutex;

/// A Schedule determines whether to continue and how long to wait.
/// It maintains internal state.
/// For simplicity in this MVP, we use a trait object or a struct with a closure that returns state.
/// Refined: Schedule<In, Out>
/// We need to run it.
/// Schedule with generic composition support
pub struct Schedule<In, Out> {
    pub(crate) driver:
        Arc<dyn Fn() -> Box<dyn ScheduleDriver<In, Out> + Send + Sync> + Send + Sync>,
}

impl<In, Out> Clone for Schedule<In, Out> {
    fn clone(&self) -> Self {
        Self {
            driver: self.driver.clone(),
        }
    }
}

impl<In, Out> Schedule<In, Out>
where
    In: Send + Sync + 'static,
    Out: Send + Sync + 'static + Clone,
{
    pub fn recurs(n: usize) -> Schedule<In, usize> {
        Schedule {
            driver: Arc::new(move || Box::new(RecursDriver { max: n, count: 0 })),
        }
    }

    pub fn spaced(duration: Duration) -> Schedule<In, ()> {
        Schedule {
            driver: Arc::new(move || Box::new(SpacedDriver { duration })),
        }
    }

    pub fn exponential(base: Duration, factor: f64) -> Schedule<In, Duration> {
        Schedule {
            driver: Arc::new(move || {
                Box::new(ExponentialDriver {
                    current: base,
                    factor,
                    count: 0,
                })
            }),
        }
    }

    pub fn jittered(self, min: f64, max: f64) -> Schedule<In, Out>
    where
        Out: Send + Sync + 'static + Clone, // Clone needed? Maybe not if driver copies? Jitter driver reuses out.
    {
        Schedule {
            driver: Arc::new(move || {
                let inner_driver = (self.driver)();
                Box::new(JitteredDriver {
                    inner: inner_driver,
                    min,
                    max,
                })
            }),
        }
    }

    pub fn intersect<Out2>(self, other: Schedule<In, Out2>) -> Schedule<In, (Out, Out2)>
    where
        Out2: Send + Sync + 'static + Clone,
        In: Clone,
    {
        Schedule {
            driver: Arc::new(move || {
                Box::new(IntersectDriver {
                    d1: (self.driver)(),
                    d2: (other.driver)(),
                })
            }),
        }
    }

    pub fn union<Out2>(self, other: Schedule<In, Out2>) -> Schedule<In, (Option<Out>, Option<Out2>)>
    where
        Out2: Send + Sync + 'static + Clone,
        In: Clone,
    {
        Schedule {
            driver: Arc::new(move || {
                Box::new(UnionDriver {
                    d1: (self.driver)(),
                    d2: (other.driver)(),
                })
            }),
        }
    }

    pub fn driver(&self) -> Box<dyn ScheduleDriver<In, Out> + Send + Sync> {
        (self.driver)()
    }
}

// Ensure Driver Structs don't need closure maps anymore if we duplicate logic or assume identity?
// Wait, Recurs returns usize. Spaced returns (). Exponential returns Duration.
// The map was for potential mapping. I'll remove maps for now to simplify.

struct RecursDriver {
    max: usize,
    count: usize,
}

impl<In> ScheduleDriver<In, usize> for RecursDriver {
    fn next(&mut self, _input: In) -> Option<(usize, Duration)> {
        if self.count < self.max {
            self.count += 1;
            Some((self.count, Duration::ZERO))
        } else {
            None
        }
    }
}

struct SpacedDriver {
    duration: Duration,
}

impl<In> ScheduleDriver<In, ()> for SpacedDriver {
    fn next(&mut self, _input: In) -> Option<((), Duration)> {
        Some(((), self.duration))
    }
}

struct ExponentialDriver {
    current: Duration,
    factor: f64,
    count: usize,
}

impl<In> ScheduleDriver<In, Duration> for ExponentialDriver {
    fn next(&mut self, _input: In) -> Option<(Duration, Duration)> {
        let delay = self.current;
        self.count += 1;
        self.current = self.current.mul_f64(self.factor);
        Some((delay, delay))
    }
}

struct IntersectDriver<In, Out1, Out2> {
    d1: Box<dyn ScheduleDriver<In, Out1> + Send + Sync>,
    d2: Box<dyn ScheduleDriver<In, Out2> + Send + Sync>,
}

impl<In, Out1, Out2> ScheduleDriver<In, (Out1, Out2)> for IntersectDriver<In, Out1, Out2>
where
    In: Clone,
{
    fn next(&mut self, input: In) -> Option<((Out1, Out2), Duration)> {
        let r1 = self.d1.next(input.clone());
        let r2 = self.d2.next(input);

        match (r1, r2) {
            (Some((out1, delay1)), Some((out2, delay2))) => {
                Some(((out1, out2), std::cmp::max(delay1, delay2)))
            }
            _ => None,
        }
    }
}

struct UnionDriver<In, Out1, Out2> {
    d1: Box<dyn ScheduleDriver<In, Out1> + Send + Sync>,
    d2: Box<dyn ScheduleDriver<In, Out2> + Send + Sync>,
}

impl<In, Out1, Out2> ScheduleDriver<In, (Option<Out1>, Option<Out2>)>
    for UnionDriver<In, Out1, Out2>
where
    In: Clone,
{
    fn next(&mut self, input: In) -> Option<((Option<Out1>, Option<Out2>), Duration)> {
        let r1 = self.d1.next(input.clone());
        let r2 = self.d2.next(input);

        match (r1, r2) {
            (Some((out1, delay1)), Some((out2, delay2))) => {
                Some(((Some(out1), Some(out2)), std::cmp::min(delay1, delay2)))
            }
            (Some((out1, delay1)), None) => Some(((Some(out1), None), delay1)),
            (None, Some((out2, delay2))) => Some(((None, Some(out2)), delay2)),
            (None, None) => None,
        }
    }
}

struct JitteredDriver<In, Out> {
    inner: Box<dyn ScheduleDriver<In, Out> + Send + Sync>,
    min: f64,
    max: f64,
}

pub trait ScheduleDriver<In, Out> {
    fn next(&mut self, input: In) -> Option<(Out, Duration)>;
}

// Drivers
impl<In, Out> ScheduleDriver<In, Out> for JitteredDriver<In, Out> {
    fn next(&mut self, input: In) -> Option<(Out, Duration)> {
        if let Some((out, delay)) = self.inner.next(input) {
            let jitter = rand::random::<f64>() * (self.max - self.min) + self.min;
            let jittered: Duration = delay.mul_f64(1.0 + jitter);
            Some((out, jittered))
        } else {
            None
        }
    }
}

// Effect extension for retry
impl<R, E, A> Effect<R, E, A>
where
    R: Send + Sync + 'static + Clone,
    E: Send + Sync + 'static + Clone,
    A: Send + Sync + 'static + Clone,
{
    pub fn retry<Out>(self, policy: Schedule<E, Out>) -> Effect<R, E, A>
    where
        Out: Send + Sync + 'static + Clone,
    {
        Effect {
            inner: Arc::new(move |env: EnvRef<R>, ctx: Ctx| {
                let effect = self.clone();
                // Driver state must be owned by the retry loop, but unique per execution.
                // Since `policy` is shared (Arc-like clone in original, but new Enum clone now),
                // we can create a driver.
                // BUT `Effect` is lazy and `inner` is called multiple times.
                // We need to create the driver INSIDE the effect.

                let policy = policy.clone();

                Box::pin(async move {
                    // Create driver *inside* the execution
                    let mut driver = policy.driver();

                    loop {
                        // Try to run the effect
                        let result = (effect.inner)(env.clone(), ctx.clone()).await;
                        match result {
                            Exit::Success(a) => return Exit::Success(a),
                            Exit::Failure(cause) => {
                                match cause {
                                    Cause::Fail(e) => {
                                        // Check policy via driver
                                        if let Some((_, delay)) = driver.next(e.clone()) {
                                            if !delay.is_zero() {
                                                ctx.clock.sleep(delay).await;
                                            }
                                            continue;
                                        } else {
                                            return Exit::Failure(Cause::Fail(e));
                                        }
                                    }
                                    _ => return Exit::Failure(cause),
                                }
                            }
                        }
                    }
                })
            }),
        }
    }
}