Skip to main content

scheduler/
observer.rs

1use crate::execution_guard::ExecutionGuardScope;
2use crate::model::{RunSkipReason, RunStatus};
3use crate::store::StoreOperation;
4use chrono::{DateTime, Utc};
5use log::{debug, info, warn};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum StateLoadSource {
9    New,
10    Restored,
11    Repaired,
12}
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum SchedulerStopReason {
16    Cancelled,
17    Shutdown,
18    Terminal,
19    ChannelClosed,
20}
21
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub enum SchedulerEvent {
24    StateLoaded {
25        job_id: String,
26        trigger_count: u32,
27        next_run_at: Option<DateTime<Utc>>,
28        source: StateLoadSource,
29    },
30    StateRepaired {
31        job_id: String,
32        trigger_count: u32,
33        previous_next_run_at: Option<DateTime<Utc>>,
34        repaired_next_run_at: Option<DateTime<Utc>>,
35    },
36    TriggerEmitted {
37        job_id: String,
38        scheduled_at: DateTime<Utc>,
39        catch_up: bool,
40        trigger_count: u32,
41    },
42    RunSkipped {
43        job_id: String,
44        scheduled_at: DateTime<Utc>,
45        catch_up: bool,
46        trigger_count: u32,
47        reason: RunSkipReason,
48    },
49    RunCompleted {
50        job_id: String,
51        scheduled_at: DateTime<Utc>,
52        catch_up: bool,
53        trigger_count: u32,
54        status: RunStatus,
55        error: Option<String>,
56    },
57    ExecutionGuardAcquired {
58        job_id: String,
59        resource_id: String,
60        scope: ExecutionGuardScope,
61        lease_key: String,
62        scheduled_at: Option<DateTime<Utc>>,
63        catch_up: bool,
64        trigger_count: u32,
65    },
66    ExecutionGuardContended {
67        job_id: String,
68        resource_id: String,
69        scope: ExecutionGuardScope,
70        scheduled_at: Option<DateTime<Utc>>,
71        catch_up: bool,
72        trigger_count: u32,
73    },
74    ExecutionGuardRenewed {
75        job_id: String,
76        resource_id: String,
77        scope: ExecutionGuardScope,
78        lease_key: String,
79        scheduled_at: Option<DateTime<Utc>>,
80        catch_up: bool,
81        trigger_count: u32,
82        renewal_count: u32,
83    },
84    ExecutionGuardRenewFailed {
85        job_id: String,
86        resource_id: String,
87        scope: ExecutionGuardScope,
88        lease_key: String,
89        scheduled_at: Option<DateTime<Utc>>,
90        catch_up: bool,
91        trigger_count: u32,
92        renewal_count: u32,
93        failed_renewal_count: u32,
94        error: String,
95    },
96    ExecutionGuardLost {
97        job_id: String,
98        resource_id: String,
99        scope: ExecutionGuardScope,
100        lease_key: String,
101        scheduled_at: Option<DateTime<Utc>>,
102        catch_up: bool,
103        trigger_count: u32,
104        renewal_count: u32,
105        failed_renewal_count: u32,
106    },
107    ExecutionGuardReleased {
108        job_id: String,
109        resource_id: String,
110        scope: ExecutionGuardScope,
111        lease_key: String,
112        scheduled_at: Option<DateTime<Utc>>,
113        catch_up: bool,
114        trigger_count: u32,
115    },
116    ExecutionGuardReleaseFailed {
117        job_id: String,
118        resource_id: String,
119        scope: ExecutionGuardScope,
120        lease_key: String,
121        scheduled_at: Option<DateTime<Utc>>,
122        catch_up: bool,
123        trigger_count: u32,
124        error: String,
125    },
126    StoreDegraded {
127        job_id: String,
128        operation: StoreOperation,
129        error: String,
130    },
131    TerminalStateDeleted {
132        job_id: String,
133        trigger_count: u32,
134    },
135    SchedulerStopped {
136        job_id: String,
137        trigger_count: u32,
138        reason: SchedulerStopReason,
139    },
140}
141
142pub trait SchedulerObserver: Send + Sync + 'static {
143    fn on_event(&self, event: &SchedulerEvent);
144}
145
146#[derive(Debug, Clone, Copy, Default)]
147pub struct NoopObserver;
148
149impl SchedulerObserver for NoopObserver {
150    fn on_event(&self, _event: &SchedulerEvent) {}
151}
152
153#[derive(Debug, Clone, Copy, Default)]
154pub struct LogObserver;
155
156impl SchedulerObserver for LogObserver {
157    fn on_event(&self, event: &SchedulerEvent) {
158        match event {
159            SchedulerEvent::StateLoaded {
160                job_id,
161                trigger_count,
162                next_run_at,
163                source,
164            } => info!(
165                "scheduler state loaded job_id={} source={:?} trigger_count={} next_run_at={:?}",
166                job_id, source, trigger_count, next_run_at
167            ),
168            SchedulerEvent::StateRepaired {
169                job_id,
170                trigger_count,
171                previous_next_run_at,
172                repaired_next_run_at,
173            } => warn!(
174                "scheduler repaired state job_id={} trigger_count={} previous_next_run_at={:?} repaired_next_run_at={:?}",
175                job_id, trigger_count, previous_next_run_at, repaired_next_run_at
176            ),
177            SchedulerEvent::TriggerEmitted {
178                job_id,
179                scheduled_at,
180                catch_up,
181                trigger_count,
182            } => debug!(
183                "scheduler trigger emitted job_id={} scheduled_at={} catch_up={} trigger_count={}",
184                job_id, scheduled_at, catch_up, trigger_count
185            ),
186            SchedulerEvent::RunSkipped {
187                job_id,
188                scheduled_at,
189                catch_up,
190                trigger_count,
191                reason,
192            } => debug!(
193                "scheduler run skipped job_id={} scheduled_at={} catch_up={} trigger_count={} reason={}",
194                job_id, scheduled_at, catch_up, trigger_count, reason
195            ),
196            SchedulerEvent::RunCompleted {
197                job_id,
198                scheduled_at,
199                catch_up,
200                trigger_count,
201                status,
202                error,
203            } => debug!(
204                "scheduler run completed job_id={} scheduled_at={} catch_up={} trigger_count={} status={:?} error={:?}",
205                job_id, scheduled_at, catch_up, trigger_count, status, error
206            ),
207            SchedulerEvent::ExecutionGuardAcquired {
208                job_id,
209                resource_id,
210                scope,
211                lease_key,
212                scheduled_at,
213                catch_up,
214                trigger_count,
215            } => debug!(
216                "scheduler execution guard acquired job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={}",
217                job_id, resource_id, scope, lease_key, scheduled_at, catch_up, trigger_count
218            ),
219            SchedulerEvent::ExecutionGuardContended {
220                job_id,
221                resource_id,
222                scope,
223                scheduled_at,
224                catch_up,
225                trigger_count,
226            } => debug!(
227                "scheduler execution guard contended job_id={} resource_id={} scope={:?} scheduled_at={:?} catch_up={} trigger_count={}",
228                job_id, resource_id, scope, scheduled_at, catch_up, trigger_count
229            ),
230            SchedulerEvent::ExecutionGuardRenewed {
231                job_id,
232                resource_id,
233                scope,
234                lease_key,
235                scheduled_at,
236                catch_up,
237                trigger_count,
238                renewal_count,
239            } => debug!(
240                "scheduler execution guard renewed job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} renewal_count={}",
241                job_id,
242                resource_id,
243                scope,
244                lease_key,
245                scheduled_at,
246                catch_up,
247                trigger_count,
248                renewal_count
249            ),
250            SchedulerEvent::ExecutionGuardRenewFailed {
251                job_id,
252                resource_id,
253                scope,
254                lease_key,
255                scheduled_at,
256                catch_up,
257                trigger_count,
258                renewal_count,
259                failed_renewal_count,
260                error,
261            } => warn!(
262                "scheduler execution guard renew failed job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} renewal_count={} failed_renewal_count={} error={}",
263                job_id,
264                resource_id,
265                scope,
266                lease_key,
267                scheduled_at,
268                catch_up,
269                trigger_count,
270                renewal_count,
271                failed_renewal_count,
272                error
273            ),
274            SchedulerEvent::ExecutionGuardLost {
275                job_id,
276                resource_id,
277                scope,
278                lease_key,
279                scheduled_at,
280                catch_up,
281                trigger_count,
282                renewal_count,
283                failed_renewal_count,
284            } => warn!(
285                "scheduler execution guard lost job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} renewal_count={} failed_renewal_count={}",
286                job_id,
287                resource_id,
288                scope,
289                lease_key,
290                scheduled_at,
291                catch_up,
292                trigger_count,
293                renewal_count,
294                failed_renewal_count
295            ),
296            SchedulerEvent::ExecutionGuardReleased {
297                job_id,
298                resource_id,
299                scope,
300                lease_key,
301                scheduled_at,
302                catch_up,
303                trigger_count,
304            } => debug!(
305                "scheduler execution guard released job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={}",
306                job_id, resource_id, scope, lease_key, scheduled_at, catch_up, trigger_count
307            ),
308            SchedulerEvent::ExecutionGuardReleaseFailed {
309                job_id,
310                resource_id,
311                scope,
312                lease_key,
313                scheduled_at,
314                catch_up,
315                trigger_count,
316                error,
317            } => warn!(
318                "scheduler execution guard release failed job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} error={}",
319                job_id, resource_id, scope, lease_key, scheduled_at, catch_up, trigger_count, error
320            ),
321            SchedulerEvent::StoreDegraded {
322                job_id,
323                operation,
324                error,
325            } => warn!(
326                "scheduler store degraded job_id={} operation={:?} error={}",
327                job_id, operation, error
328            ),
329            SchedulerEvent::TerminalStateDeleted {
330                job_id,
331                trigger_count,
332            } => info!(
333                "scheduler deleted terminal state job_id={} trigger_count={}",
334                job_id, trigger_count
335            ),
336            SchedulerEvent::SchedulerStopped {
337                job_id,
338                trigger_count,
339                reason,
340            } => info!(
341                "scheduler stopped job_id={} trigger_count={} reason={:?}",
342                job_id, trigger_count, reason
343            ),
344        }
345    }
346}