Skip to main content

scheduler/
observer.rs

1use crate::execution_guard::ExecutionGuardScope;
2use crate::model::{RunSkipReason, RunStatus};
3use crate::store::StoreOperation;
4use chrono::{DateTime, Utc};
5use log::{debug, info, warn};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum StateLoadSource {
9    New,
10    Restored,
11    Repaired,
12}
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum SchedulerStopReason {
16    Cancelled,
17    Shutdown,
18    Terminal,
19    ChannelClosed,
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum PauseScope {
24    Local,
25    Shared,
26}
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum SchedulerEvent {
30    StateLoaded {
31        job_id: String,
32        trigger_count: u32,
33        next_run_at: Option<DateTime<Utc>>,
34        source: StateLoadSource,
35    },
36    StateRepaired {
37        job_id: String,
38        trigger_count: u32,
39        previous_next_run_at: Option<DateTime<Utc>>,
40        repaired_next_run_at: Option<DateTime<Utc>>,
41    },
42    TriggerEmitted {
43        job_id: String,
44        scheduled_at: DateTime<Utc>,
45        catch_up: bool,
46        trigger_count: u32,
47    },
48    RunSkipped {
49        job_id: String,
50        scheduled_at: DateTime<Utc>,
51        catch_up: bool,
52        trigger_count: u32,
53        reason: RunSkipReason,
54    },
55    RunCompleted {
56        job_id: String,
57        scheduled_at: DateTime<Utc>,
58        catch_up: bool,
59        trigger_count: u32,
60        status: RunStatus,
61        error: Option<String>,
62    },
63    ExecutionGuardAcquired {
64        job_id: String,
65        resource_id: String,
66        scope: ExecutionGuardScope,
67        lease_key: String,
68        scheduled_at: Option<DateTime<Utc>>,
69        catch_up: bool,
70        trigger_count: u32,
71    },
72    ExecutionGuardContended {
73        job_id: String,
74        resource_id: String,
75        scope: ExecutionGuardScope,
76        scheduled_at: Option<DateTime<Utc>>,
77        catch_up: bool,
78        trigger_count: u32,
79    },
80    ExecutionGuardRenewed {
81        job_id: String,
82        resource_id: String,
83        scope: ExecutionGuardScope,
84        lease_key: String,
85        scheduled_at: Option<DateTime<Utc>>,
86        catch_up: bool,
87        trigger_count: u32,
88        renewal_count: u32,
89    },
90    ExecutionGuardRenewFailed {
91        job_id: String,
92        resource_id: String,
93        scope: ExecutionGuardScope,
94        lease_key: String,
95        scheduled_at: Option<DateTime<Utc>>,
96        catch_up: bool,
97        trigger_count: u32,
98        renewal_count: u32,
99        failed_renewal_count: u32,
100        error: String,
101    },
102    ExecutionGuardLost {
103        job_id: String,
104        resource_id: String,
105        scope: ExecutionGuardScope,
106        lease_key: String,
107        scheduled_at: Option<DateTime<Utc>>,
108        catch_up: bool,
109        trigger_count: u32,
110        renewal_count: u32,
111        failed_renewal_count: u32,
112    },
113    ExecutionGuardReleased {
114        job_id: String,
115        resource_id: String,
116        scope: ExecutionGuardScope,
117        lease_key: String,
118        scheduled_at: Option<DateTime<Utc>>,
119        catch_up: bool,
120        trigger_count: u32,
121    },
122    ExecutionGuardReleaseFailed {
123        job_id: String,
124        resource_id: String,
125        scope: ExecutionGuardScope,
126        lease_key: String,
127        scheduled_at: Option<DateTime<Utc>>,
128        catch_up: bool,
129        trigger_count: u32,
130        error: String,
131    },
132    StoreDegraded {
133        job_id: String,
134        operation: StoreOperation,
135        error: String,
136    },
137    TerminalStateDeleted {
138        job_id: String,
139        trigger_count: u32,
140    },
141    SchedulerPaused {
142        job_id: String,
143        trigger_count: u32,
144        scope: PauseScope,
145    },
146    SchedulerResumed {
147        job_id: String,
148        trigger_count: u32,
149        scope: PauseScope,
150    },
151    SchedulerStopped {
152        job_id: String,
153        trigger_count: u32,
154        reason: SchedulerStopReason,
155    },
156}
157
158pub trait SchedulerObserver: Send + Sync + 'static {
159    fn on_event(&self, event: &SchedulerEvent);
160}
161
162#[derive(Debug, Clone, Copy, Default)]
163pub struct NoopObserver;
164
165impl SchedulerObserver for NoopObserver {
166    fn on_event(&self, _event: &SchedulerEvent) {}
167}
168
169#[derive(Debug, Clone, Copy, Default)]
170pub struct LogObserver;
171
172impl SchedulerObserver for LogObserver {
173    fn on_event(&self, event: &SchedulerEvent) {
174        match event {
175            SchedulerEvent::StateLoaded {
176                job_id,
177                trigger_count,
178                next_run_at,
179                source,
180            } => info!(
181                "scheduler state loaded job_id={} source={:?} trigger_count={} next_run_at={:?}",
182                job_id, source, trigger_count, next_run_at
183            ),
184            SchedulerEvent::StateRepaired {
185                job_id,
186                trigger_count,
187                previous_next_run_at,
188                repaired_next_run_at,
189            } => warn!(
190                "scheduler repaired state job_id={} trigger_count={} previous_next_run_at={:?} repaired_next_run_at={:?}",
191                job_id, trigger_count, previous_next_run_at, repaired_next_run_at
192            ),
193            SchedulerEvent::TriggerEmitted {
194                job_id,
195                scheduled_at,
196                catch_up,
197                trigger_count,
198            } => debug!(
199                "scheduler trigger emitted job_id={} scheduled_at={} catch_up={} trigger_count={}",
200                job_id, scheduled_at, catch_up, trigger_count
201            ),
202            SchedulerEvent::RunSkipped {
203                job_id,
204                scheduled_at,
205                catch_up,
206                trigger_count,
207                reason,
208            } => debug!(
209                "scheduler run skipped job_id={} scheduled_at={} catch_up={} trigger_count={} reason={}",
210                job_id, scheduled_at, catch_up, trigger_count, reason
211            ),
212            SchedulerEvent::RunCompleted {
213                job_id,
214                scheduled_at,
215                catch_up,
216                trigger_count,
217                status,
218                error,
219            } => debug!(
220                "scheduler run completed job_id={} scheduled_at={} catch_up={} trigger_count={} status={:?} error={:?}",
221                job_id, scheduled_at, catch_up, trigger_count, status, error
222            ),
223            SchedulerEvent::ExecutionGuardAcquired {
224                job_id,
225                resource_id,
226                scope,
227                lease_key,
228                scheduled_at,
229                catch_up,
230                trigger_count,
231            } => debug!(
232                "scheduler execution guard acquired job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={}",
233                job_id, resource_id, scope, lease_key, scheduled_at, catch_up, trigger_count
234            ),
235            SchedulerEvent::ExecutionGuardContended {
236                job_id,
237                resource_id,
238                scope,
239                scheduled_at,
240                catch_up,
241                trigger_count,
242            } => debug!(
243                "scheduler execution guard contended job_id={} resource_id={} scope={:?} scheduled_at={:?} catch_up={} trigger_count={}",
244                job_id, resource_id, scope, scheduled_at, catch_up, trigger_count
245            ),
246            SchedulerEvent::ExecutionGuardRenewed {
247                job_id,
248                resource_id,
249                scope,
250                lease_key,
251                scheduled_at,
252                catch_up,
253                trigger_count,
254                renewal_count,
255            } => debug!(
256                "scheduler execution guard renewed job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} renewal_count={}",
257                job_id,
258                resource_id,
259                scope,
260                lease_key,
261                scheduled_at,
262                catch_up,
263                trigger_count,
264                renewal_count
265            ),
266            SchedulerEvent::ExecutionGuardRenewFailed {
267                job_id,
268                resource_id,
269                scope,
270                lease_key,
271                scheduled_at,
272                catch_up,
273                trigger_count,
274                renewal_count,
275                failed_renewal_count,
276                error,
277            } => warn!(
278                "scheduler execution guard renew failed job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} renewal_count={} failed_renewal_count={} error={}",
279                job_id,
280                resource_id,
281                scope,
282                lease_key,
283                scheduled_at,
284                catch_up,
285                trigger_count,
286                renewal_count,
287                failed_renewal_count,
288                error
289            ),
290            SchedulerEvent::ExecutionGuardLost {
291                job_id,
292                resource_id,
293                scope,
294                lease_key,
295                scheduled_at,
296                catch_up,
297                trigger_count,
298                renewal_count,
299                failed_renewal_count,
300            } => warn!(
301                "scheduler execution guard lost job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} renewal_count={} failed_renewal_count={}",
302                job_id,
303                resource_id,
304                scope,
305                lease_key,
306                scheduled_at,
307                catch_up,
308                trigger_count,
309                renewal_count,
310                failed_renewal_count
311            ),
312            SchedulerEvent::ExecutionGuardReleased {
313                job_id,
314                resource_id,
315                scope,
316                lease_key,
317                scheduled_at,
318                catch_up,
319                trigger_count,
320            } => debug!(
321                "scheduler execution guard released job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={}",
322                job_id, resource_id, scope, lease_key, scheduled_at, catch_up, trigger_count
323            ),
324            SchedulerEvent::ExecutionGuardReleaseFailed {
325                job_id,
326                resource_id,
327                scope,
328                lease_key,
329                scheduled_at,
330                catch_up,
331                trigger_count,
332                error,
333            } => warn!(
334                "scheduler execution guard release failed job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} error={}",
335                job_id, resource_id, scope, lease_key, scheduled_at, catch_up, trigger_count, error
336            ),
337            SchedulerEvent::StoreDegraded {
338                job_id,
339                operation,
340                error,
341            } => warn!(
342                "scheduler store degraded job_id={} operation={:?} error={}",
343                job_id, operation, error
344            ),
345            SchedulerEvent::TerminalStateDeleted {
346                job_id,
347                trigger_count,
348            } => info!(
349                "scheduler deleted terminal state job_id={} trigger_count={}",
350                job_id, trigger_count
351            ),
352            SchedulerEvent::SchedulerPaused {
353                job_id,
354                trigger_count,
355                scope,
356            } => info!(
357                "scheduler paused job_id={} trigger_count={} scope={:?}",
358                job_id, trigger_count, scope
359            ),
360            SchedulerEvent::SchedulerResumed {
361                job_id,
362                trigger_count,
363                scope,
364            } => info!(
365                "scheduler resumed job_id={} trigger_count={} scope={:?}",
366                job_id, trigger_count, scope
367            ),
368            SchedulerEvent::SchedulerStopped {
369                job_id,
370                trigger_count,
371                reason,
372            } => info!(
373                "scheduler stopped job_id={} trigger_count={} reason={:?}",
374                job_id, trigger_count, reason
375            ),
376        }
377    }
378}