1use crate::core::*;
2use std::sync::Arc;
3use std::time::Duration;
4
5pub struct Schedule<In, Out> {
14 pub(crate) driver:
15 Arc<dyn Fn() -> Box<dyn ScheduleDriver<In, Out> + Send + Sync> + Send + Sync>,
16}
17
18impl<In, Out> Clone for Schedule<In, Out> {
19 fn clone(&self) -> Self {
20 Self {
21 driver: self.driver.clone(),
22 }
23 }
24}
25
26impl<In, Out> Schedule<In, Out>
27where
28 In: Send + Sync + 'static,
29 Out: Send + Sync + 'static + Clone,
30{
31 pub fn recurs(n: usize) -> Schedule<In, usize> {
32 Schedule {
33 driver: Arc::new(move || Box::new(RecursDriver { max: n, count: 0 })),
34 }
35 }
36
37 pub fn spaced(duration: Duration) -> Schedule<In, ()> {
38 Schedule {
39 driver: Arc::new(move || Box::new(SpacedDriver { duration })),
40 }
41 }
42
43 pub fn exponential(base: Duration, factor: f64) -> Schedule<In, Duration> {
44 Schedule {
45 driver: Arc::new(move || {
46 Box::new(ExponentialDriver {
47 current: base,
48 factor,
49 count: 0,
50 })
51 }),
52 }
53 }
54
55 pub fn jittered(self, min: f64, max: f64) -> Schedule<In, Out>
56 where
57 Out: Send + Sync + 'static + Clone, {
59 Schedule {
60 driver: Arc::new(move || {
61 let inner_driver = (self.driver)();
62 Box::new(JitteredDriver {
63 inner: inner_driver,
64 min,
65 max,
66 })
67 }),
68 }
69 }
70
71 pub fn intersect<Out2>(self, other: Schedule<In, Out2>) -> Schedule<In, (Out, Out2)>
72 where
73 Out2: Send + Sync + 'static + Clone,
74 In: Clone,
75 {
76 Schedule {
77 driver: Arc::new(move || {
78 Box::new(IntersectDriver {
79 d1: (self.driver)(),
80 d2: (other.driver)(),
81 })
82 }),
83 }
84 }
85
86 pub fn union<Out2>(self, other: Schedule<In, Out2>) -> Schedule<In, (Option<Out>, Option<Out2>)>
87 where
88 Out2: Send + Sync + 'static + Clone,
89 In: Clone,
90 {
91 Schedule {
92 driver: Arc::new(move || {
93 Box::new(UnionDriver {
94 d1: (self.driver)(),
95 d2: (other.driver)(),
96 })
97 }),
98 }
99 }
100
101 pub fn driver(&self) -> Box<dyn ScheduleDriver<In, Out> + Send + Sync> {
102 (self.driver)()
103 }
104}
105
106struct RecursDriver {
111 max: usize,
112 count: usize,
113}
114
115impl<In> ScheduleDriver<In, usize> for RecursDriver {
116 fn next(&mut self, _input: In) -> Option<(usize, Duration)> {
117 if self.count < self.max {
118 self.count += 1;
119 Some((self.count, Duration::ZERO))
120 } else {
121 None
122 }
123 }
124}
125
126struct SpacedDriver {
127 duration: Duration,
128}
129
130impl<In> ScheduleDriver<In, ()> for SpacedDriver {
131 fn next(&mut self, _input: In) -> Option<((), Duration)> {
132 Some(((), self.duration))
133 }
134}
135
136struct ExponentialDriver {
137 current: Duration,
138 factor: f64,
139 count: usize,
140}
141
142impl<In> ScheduleDriver<In, Duration> for ExponentialDriver {
143 fn next(&mut self, _input: In) -> Option<(Duration, Duration)> {
144 let delay = self.current;
145 self.count += 1;
146 self.current = self.current.mul_f64(self.factor);
147 Some((delay, delay))
148 }
149}
150
151struct IntersectDriver<In, Out1, Out2> {
152 d1: Box<dyn ScheduleDriver<In, Out1> + Send + Sync>,
153 d2: Box<dyn ScheduleDriver<In, Out2> + Send + Sync>,
154}
155
156impl<In, Out1, Out2> ScheduleDriver<In, (Out1, Out2)> for IntersectDriver<In, Out1, Out2>
157where
158 In: Clone,
159{
160 fn next(&mut self, input: In) -> Option<((Out1, Out2), Duration)> {
161 let r1 = self.d1.next(input.clone());
162 let r2 = self.d2.next(input);
163
164 match (r1, r2) {
165 (Some((out1, delay1)), Some((out2, delay2))) => {
166 Some(((out1, out2), std::cmp::max(delay1, delay2)))
167 }
168 _ => None,
169 }
170 }
171}
172
173struct UnionDriver<In, Out1, Out2> {
174 d1: Box<dyn ScheduleDriver<In, Out1> + Send + Sync>,
175 d2: Box<dyn ScheduleDriver<In, Out2> + Send + Sync>,
176}
177
178impl<In, Out1, Out2> ScheduleDriver<In, (Option<Out1>, Option<Out2>)>
179 for UnionDriver<In, Out1, Out2>
180where
181 In: Clone,
182{
183 fn next(&mut self, input: In) -> Option<((Option<Out1>, Option<Out2>), Duration)> {
184 let r1 = self.d1.next(input.clone());
185 let r2 = self.d2.next(input);
186
187 match (r1, r2) {
188 (Some((out1, delay1)), Some((out2, delay2))) => {
189 Some(((Some(out1), Some(out2)), std::cmp::min(delay1, delay2)))
190 }
191 (Some((out1, delay1)), None) => Some(((Some(out1), None), delay1)),
192 (None, Some((out2, delay2))) => Some(((None, Some(out2)), delay2)),
193 (None, None) => None,
194 }
195 }
196}
197
198struct JitteredDriver<In, Out> {
199 inner: Box<dyn ScheduleDriver<In, Out> + Send + Sync>,
200 min: f64,
201 max: f64,
202}
203
204pub trait ScheduleDriver<In, Out> {
205 fn next(&mut self, input: In) -> Option<(Out, Duration)>;
206}
207
208impl<In, Out> ScheduleDriver<In, Out> for JitteredDriver<In, Out> {
210 fn next(&mut self, input: In) -> Option<(Out, Duration)> {
211 if let Some((out, delay)) = self.inner.next(input) {
212 let jitter = rand::random::<f64>() * (self.max - self.min) + self.min;
213 let jittered: Duration = delay.mul_f64(1.0 + jitter);
214 Some((out, jittered))
215 } else {
216 None
217 }
218 }
219}
220
221impl<R, E, A> Effect<R, E, A>
223where
224 R: Send + Sync + 'static + Clone,
225 E: Send + Sync + 'static + Clone,
226 A: Send + Sync + 'static + Clone,
227{
228 pub fn retry<Out>(self, policy: Schedule<E, Out>) -> Effect<R, E, A>
229 where
230 Out: Send + Sync + 'static + Clone,
231 {
232 Effect {
233 inner: Arc::new(move |env: EnvRef<R>, ctx: Ctx| {
234 let effect = self.clone();
235 let policy = policy.clone();
242
243 Box::pin(async move {
244 let mut driver = policy.driver();
246
247 loop {
248 let result = (effect.inner)(env.clone(), ctx.clone()).await;
250 match result {
251 Exit::Success(a) => return Exit::Success(a),
252 Exit::Failure(cause) => {
253 match cause {
254 Cause::Fail(e) => {
255 if let Some((_, delay)) = driver.next(e.clone()) {
257 if !delay.is_zero() {
258 ctx.clock.sleep(delay).await;
259 }
260 continue;
261 } else {
262 return Exit::Failure(Cause::Fail(e));
263 }
264 }
265 _ => return Exit::Failure(cause),
266 }
267 }
268 }
269 }
270 })
271 }),
272 }
273 }
274}