effect_rs/
schedule.rs

1use crate::core::*;
2use std::sync::Arc;
3use std::time::Duration;
4
5// use std::sync::Mutex;
6
7/// A Schedule determines whether to continue and how long to wait.
8/// It maintains internal state.
9/// For simplicity in this MVP, we use a trait object or a struct with a closure that returns state.
10/// Refined: Schedule<In, Out>
11/// We need to run it.
12/// Schedule with generic composition support
13pub struct Schedule<In, Out> {
14    pub(crate) driver:
15        Arc<dyn Fn() -> Box<dyn ScheduleDriver<In, Out> + Send + Sync> + Send + Sync>,
16}
17
18impl<In, Out> Clone for Schedule<In, Out> {
19    fn clone(&self) -> Self {
20        Self {
21            driver: self.driver.clone(),
22        }
23    }
24}
25
26impl<In, Out> Schedule<In, Out>
27where
28    In: Send + Sync + 'static,
29    Out: Send + Sync + 'static + Clone,
30{
31    pub fn recurs(n: usize) -> Schedule<In, usize> {
32        Schedule {
33            driver: Arc::new(move || Box::new(RecursDriver { max: n, count: 0 })),
34        }
35    }
36
37    pub fn spaced(duration: Duration) -> Schedule<In, ()> {
38        Schedule {
39            driver: Arc::new(move || Box::new(SpacedDriver { duration })),
40        }
41    }
42
43    pub fn exponential(base: Duration, factor: f64) -> Schedule<In, Duration> {
44        Schedule {
45            driver: Arc::new(move || {
46                Box::new(ExponentialDriver {
47                    current: base,
48                    factor,
49                    count: 0,
50                })
51            }),
52        }
53    }
54
55    pub fn jittered(self, min: f64, max: f64) -> Schedule<In, Out>
56    where
57        Out: Send + Sync + 'static + Clone, // Clone needed? Maybe not if driver copies? Jitter driver reuses out.
58    {
59        Schedule {
60            driver: Arc::new(move || {
61                let inner_driver = (self.driver)();
62                Box::new(JitteredDriver {
63                    inner: inner_driver,
64                    min,
65                    max,
66                })
67            }),
68        }
69    }
70
71    pub fn intersect<Out2>(self, other: Schedule<In, Out2>) -> Schedule<In, (Out, Out2)>
72    where
73        Out2: Send + Sync + 'static + Clone,
74        In: Clone,
75    {
76        Schedule {
77            driver: Arc::new(move || {
78                Box::new(IntersectDriver {
79                    d1: (self.driver)(),
80                    d2: (other.driver)(),
81                })
82            }),
83        }
84    }
85
86    pub fn union<Out2>(self, other: Schedule<In, Out2>) -> Schedule<In, (Option<Out>, Option<Out2>)>
87    where
88        Out2: Send + Sync + 'static + Clone,
89        In: Clone,
90    {
91        Schedule {
92            driver: Arc::new(move || {
93                Box::new(UnionDriver {
94                    d1: (self.driver)(),
95                    d2: (other.driver)(),
96                })
97            }),
98        }
99    }
100
101    pub fn driver(&self) -> Box<dyn ScheduleDriver<In, Out> + Send + Sync> {
102        (self.driver)()
103    }
104}
105
106// Ensure Driver Structs don't need closure maps anymore if we duplicate logic or assume identity?
107// Wait, Recurs returns usize. Spaced returns (). Exponential returns Duration.
108// The map was for potential mapping. I'll remove maps for now to simplify.
109
110struct RecursDriver {
111    max: usize,
112    count: usize,
113}
114
115impl<In> ScheduleDriver<In, usize> for RecursDriver {
116    fn next(&mut self, _input: In) -> Option<(usize, Duration)> {
117        if self.count < self.max {
118            self.count += 1;
119            Some((self.count, Duration::ZERO))
120        } else {
121            None
122        }
123    }
124}
125
126struct SpacedDriver {
127    duration: Duration,
128}
129
130impl<In> ScheduleDriver<In, ()> for SpacedDriver {
131    fn next(&mut self, _input: In) -> Option<((), Duration)> {
132        Some(((), self.duration))
133    }
134}
135
136struct ExponentialDriver {
137    current: Duration,
138    factor: f64,
139    count: usize,
140}
141
142impl<In> ScheduleDriver<In, Duration> for ExponentialDriver {
143    fn next(&mut self, _input: In) -> Option<(Duration, Duration)> {
144        let delay = self.current;
145        self.count += 1;
146        self.current = self.current.mul_f64(self.factor);
147        Some((delay, delay))
148    }
149}
150
151struct IntersectDriver<In, Out1, Out2> {
152    d1: Box<dyn ScheduleDriver<In, Out1> + Send + Sync>,
153    d2: Box<dyn ScheduleDriver<In, Out2> + Send + Sync>,
154}
155
156impl<In, Out1, Out2> ScheduleDriver<In, (Out1, Out2)> for IntersectDriver<In, Out1, Out2>
157where
158    In: Clone,
159{
160    fn next(&mut self, input: In) -> Option<((Out1, Out2), Duration)> {
161        let r1 = self.d1.next(input.clone());
162        let r2 = self.d2.next(input);
163
164        match (r1, r2) {
165            (Some((out1, delay1)), Some((out2, delay2))) => {
166                Some(((out1, out2), std::cmp::max(delay1, delay2)))
167            }
168            _ => None,
169        }
170    }
171}
172
173struct UnionDriver<In, Out1, Out2> {
174    d1: Box<dyn ScheduleDriver<In, Out1> + Send + Sync>,
175    d2: Box<dyn ScheduleDriver<In, Out2> + Send + Sync>,
176}
177
178impl<In, Out1, Out2> ScheduleDriver<In, (Option<Out1>, Option<Out2>)>
179    for UnionDriver<In, Out1, Out2>
180where
181    In: Clone,
182{
183    fn next(&mut self, input: In) -> Option<((Option<Out1>, Option<Out2>), Duration)> {
184        let r1 = self.d1.next(input.clone());
185        let r2 = self.d2.next(input);
186
187        match (r1, r2) {
188            (Some((out1, delay1)), Some((out2, delay2))) => {
189                Some(((Some(out1), Some(out2)), std::cmp::min(delay1, delay2)))
190            }
191            (Some((out1, delay1)), None) => Some(((Some(out1), None), delay1)),
192            (None, Some((out2, delay2))) => Some(((None, Some(out2)), delay2)),
193            (None, None) => None,
194        }
195    }
196}
197
198struct JitteredDriver<In, Out> {
199    inner: Box<dyn ScheduleDriver<In, Out> + Send + Sync>,
200    min: f64,
201    max: f64,
202}
203
204pub trait ScheduleDriver<In, Out> {
205    fn next(&mut self, input: In) -> Option<(Out, Duration)>;
206}
207
208// Drivers
209impl<In, Out> ScheduleDriver<In, Out> for JitteredDriver<In, Out> {
210    fn next(&mut self, input: In) -> Option<(Out, Duration)> {
211        if let Some((out, delay)) = self.inner.next(input) {
212            let jitter = rand::random::<f64>() * (self.max - self.min) + self.min;
213            let jittered: Duration = delay.mul_f64(1.0 + jitter);
214            Some((out, jittered))
215        } else {
216            None
217        }
218    }
219}
220
221// Effect extension for retry
222impl<R, E, A> Effect<R, E, A>
223where
224    R: Send + Sync + 'static + Clone,
225    E: Send + Sync + 'static + Clone,
226    A: Send + Sync + 'static + Clone,
227{
228    pub fn retry<Out>(self, policy: Schedule<E, Out>) -> Effect<R, E, A>
229    where
230        Out: Send + Sync + 'static + Clone,
231    {
232        Effect {
233            inner: Arc::new(move |env: EnvRef<R>, ctx: Ctx| {
234                let effect = self.clone();
235                // Driver state must be owned by the retry loop, but unique per execution.
236                // Since `policy` is shared (Arc-like clone in original, but new Enum clone now),
237                // we can create a driver.
238                // BUT `Effect` is lazy and `inner` is called multiple times.
239                // We need to create the driver INSIDE the effect.
240
241                let policy = policy.clone();
242
243                Box::pin(async move {
244                    // Create driver *inside* the execution
245                    let mut driver = policy.driver();
246
247                    loop {
248                        // Try to run the effect
249                        let result = (effect.inner)(env.clone(), ctx.clone()).await;
250                        match result {
251                            Exit::Success(a) => return Exit::Success(a),
252                            Exit::Failure(cause) => {
253                                match cause {
254                                    Cause::Fail(e) => {
255                                        // Check policy via driver
256                                        if let Some((_, delay)) = driver.next(e.clone()) {
257                                            if !delay.is_zero() {
258                                                ctx.clock.sleep(delay).await;
259                                            }
260                                            continue;
261                                        } else {
262                                            return Exit::Failure(Cause::Fail(e));
263                                        }
264                                    }
265                                    _ => return Exit::Failure(cause),
266                                }
267                            }
268                        }
269                    }
270                })
271            }),
272        }
273    }
274}