Skip to main content

scheduler/
guarded_runner.rs

1use crate::error::ExecutionGuardError;
2use crate::observer::{NoopObserver, SchedulerEvent, SchedulerObserver};
3use crate::{ExecutionGuard, ExecutionGuardAcquire, ExecutionGuardRenewal, ExecutionSlot};
4use std::future::Future;
5use std::sync::Arc;
6use tokio::time::{Instant, interval_at};
7
8#[derive(Debug, Clone, PartialEq, Eq)]
9pub enum GuardedRunResult<T> {
10    Completed(T),
11    Contended,
12}
13
14#[derive(Clone)]
15pub struct GuardedSession<G> {
16    guard: Arc<G>,
17    observer: Arc<dyn SchedulerObserver>,
18    lease: crate::ExecutionLease,
19    catch_up: bool,
20    trigger_count: u32,
21}
22
23#[derive(Clone)]
24pub struct GuardedRunner<G> {
25    guard: Arc<G>,
26    observer: Arc<dyn SchedulerObserver>,
27}
28
29impl<G> GuardedRunner<G>
30where
31    G: ExecutionGuard + Send + Sync + 'static,
32{
33    pub fn new(guard: G) -> Self {
34        Self::with_observer(guard, NoopObserver)
35    }
36
37    pub fn with_observer<O>(guard: G, observer: O) -> Self
38    where
39        O: SchedulerObserver,
40    {
41        Self {
42            guard: Arc::new(guard),
43            observer: Arc::new(observer),
44        }
45    }
46
47    pub async fn run<F, Fut, T>(
48        &self,
49        slot: ExecutionSlot,
50        run: F,
51    ) -> Result<GuardedRunResult<T>, ExecutionGuardError>
52    where
53        F: FnOnce() -> Fut,
54        Fut: Future<Output = T>,
55    {
56        self.run_with_metadata(slot, false, 0, run).await
57    }
58
59    pub async fn run_with_metadata<F, Fut, T>(
60        &self,
61        slot: ExecutionSlot,
62        catch_up: bool,
63        trigger_count: u32,
64        run: F,
65    ) -> Result<GuardedRunResult<T>, ExecutionGuardError>
66    where
67        F: FnOnce() -> Fut,
68        Fut: Future<Output = T>,
69    {
70        let Some(session) = self
71            .acquire_with_metadata(slot, catch_up, trigger_count)
72            .await?
73        else {
74            return Ok(GuardedRunResult::Contended);
75        };
76
77        Ok(GuardedRunResult::Completed(session.run(run).await))
78    }
79
80    pub async fn acquire(
81        &self,
82        slot: ExecutionSlot,
83    ) -> Result<Option<GuardedSession<G>>, ExecutionGuardError> {
84        self.acquire_with_metadata(slot, false, 0).await
85    }
86
87    async fn acquire_with_metadata(
88        &self,
89        slot: ExecutionSlot,
90        catch_up: bool,
91        trigger_count: u32,
92    ) -> Result<Option<GuardedSession<G>>, ExecutionGuardError> {
93        let acquired = self.guard.acquire(slot.clone()).await.map_err(|error| {
94            let kind = G::classify_error(&error);
95            ExecutionGuardError::new(error, kind)
96        })?;
97
98        let ExecutionGuardAcquire::Acquired(lease) = acquired else {
99            self.observer
100                .on_event(&SchedulerEvent::ExecutionGuardContended {
101                    job_id: slot.job_id,
102                    resource_id: slot.resource_id,
103                    scope: slot.scope,
104                    scheduled_at: slot.scheduled_at,
105                    catch_up,
106                    trigger_count,
107                });
108            return Ok(None);
109        };
110
111        self.observer
112            .on_event(&SchedulerEvent::ExecutionGuardAcquired {
113                job_id: lease.job_id.clone(),
114                resource_id: lease.resource_id.clone(),
115                scope: lease.scope,
116                lease_key: lease.lease_key.clone(),
117                scheduled_at: lease.scheduled_at,
118                catch_up,
119                trigger_count,
120            });
121
122        Ok(Some(GuardedSession {
123            guard: self.guard.clone(),
124            observer: self.observer.clone(),
125            lease,
126            catch_up,
127            trigger_count,
128        }))
129    }
130}
131
132impl<G> GuardedSession<G>
133where
134    G: ExecutionGuard + Send + Sync + 'static,
135{
136    pub async fn run<F, Fut, T>(self, run: F) -> T
137    where
138        F: FnOnce() -> Fut,
139        Fut: Future<Output = T>,
140    {
141        let mut renewal_count = 0u32;
142        let mut failed_renewal_count = 0u32;
143        let mut lost_reported = false;
144        let task = run();
145        tokio::pin!(task);
146        let mut renewal = self
147            .guard
148            .renew_interval(&self.lease)
149            .map(|duration| interval_at(Instant::now() + duration, duration));
150
151        let output = loop {
152            if let Some(ticker) = renewal.as_mut() {
153                let mut disable_renewal = false;
154                let outcome = tokio::select! {
155                    result = &mut task => Some(result),
156                    _ = ticker.tick() => {
157                        match self.guard.renew(&self.lease).await {
158                            Ok(ExecutionGuardRenewal::Renewed) => {
159                                renewal_count += 1;
160                                self.observer.on_event(&SchedulerEvent::ExecutionGuardRenewed {
161                                    job_id: self.lease.job_id.clone(),
162                                    resource_id: self.lease.resource_id.clone(),
163                                    scope: self.lease.scope,
164                                    lease_key: self.lease.lease_key.clone(),
165                                    scheduled_at: self.lease.scheduled_at,
166                                    catch_up: self.catch_up,
167                                    trigger_count: self.trigger_count,
168                                    renewal_count,
169                                });
170                            }
171                            Ok(ExecutionGuardRenewal::Lost) => {
172                                if !lost_reported {
173                                    self.observer.on_event(&SchedulerEvent::ExecutionGuardLost {
174                                        job_id: self.lease.job_id.clone(),
175                                        resource_id: self.lease.resource_id.clone(),
176                                        scope: self.lease.scope,
177                                        lease_key: self.lease.lease_key.clone(),
178                                        scheduled_at: self.lease.scheduled_at,
179                                        catch_up: self.catch_up,
180                                        trigger_count: self.trigger_count,
181                                        renewal_count,
182                                        failed_renewal_count,
183                                    });
184                                    lost_reported = true;
185                                }
186                                disable_renewal = true;
187                            }
188                            Err(error) => {
189                                failed_renewal_count += 1;
190                                self.observer.on_event(&SchedulerEvent::ExecutionGuardRenewFailed {
191                                    job_id: self.lease.job_id.clone(),
192                                    resource_id: self.lease.resource_id.clone(),
193                                    scope: self.lease.scope,
194                                    lease_key: self.lease.lease_key.clone(),
195                                    scheduled_at: self.lease.scheduled_at,
196                                    catch_up: self.catch_up,
197                                    trigger_count: self.trigger_count,
198                                    renewal_count,
199                                    failed_renewal_count,
200                                    error: error.to_string(),
201                                });
202                                if !lost_reported {
203                                    self.observer.on_event(&SchedulerEvent::ExecutionGuardLost {
204                                        job_id: self.lease.job_id.clone(),
205                                        resource_id: self.lease.resource_id.clone(),
206                                        scope: self.lease.scope,
207                                        lease_key: self.lease.lease_key.clone(),
208                                        scheduled_at: self.lease.scheduled_at,
209                                        catch_up: self.catch_up,
210                                        trigger_count: self.trigger_count,
211                                        renewal_count,
212                                        failed_renewal_count,
213                                    });
214                                    lost_reported = true;
215                                }
216                                disable_renewal = true;
217                            }
218                        }
219                        None
220                    }
221                };
222
223                if disable_renewal {
224                    renewal = None;
225                }
226
227                if let Some(result) = outcome {
228                    break result;
229                }
230            } else {
231                break task.await;
232            }
233        };
234
235        if let Err(error) = self.guard.release(&self.lease).await {
236            self.observer
237                .on_event(&SchedulerEvent::ExecutionGuardReleaseFailed {
238                    job_id: self.lease.job_id.clone(),
239                    resource_id: self.lease.resource_id.clone(),
240                    scope: self.lease.scope,
241                    lease_key: self.lease.lease_key.clone(),
242                    scheduled_at: self.lease.scheduled_at,
243                    catch_up: self.catch_up,
244                    trigger_count: self.trigger_count,
245                    error: error.to_string(),
246                });
247        } else {
248            self.observer
249                .on_event(&SchedulerEvent::ExecutionGuardReleased {
250                    job_id: self.lease.job_id.clone(),
251                    resource_id: self.lease.resource_id.clone(),
252                    scope: self.lease.scope,
253                    lease_key: self.lease.lease_key.clone(),
254                    scheduled_at: self.lease.scheduled_at,
255                    catch_up: self.catch_up,
256                    trigger_count: self.trigger_count,
257                });
258        }
259
260        output
261    }
262}