Skip to main content

scheduler/
observer.rs

1use crate::model::RunStatus;
2use crate::store::StoreOperation;
3use chrono::{DateTime, Utc};
4use log::{debug, info, warn};
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum StateLoadSource {
8    New,
9    Restored,
10    Repaired,
11}
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum SchedulerStopReason {
15    Cancelled,
16    Shutdown,
17    Terminal,
18    ChannelClosed,
19}
20
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub enum SchedulerEvent {
23    StateLoaded {
24        job_id: String,
25        trigger_count: u32,
26        next_run_at: Option<DateTime<Utc>>,
27        source: StateLoadSource,
28    },
29    StateRepaired {
30        job_id: String,
31        trigger_count: u32,
32        previous_next_run_at: Option<DateTime<Utc>>,
33        repaired_next_run_at: Option<DateTime<Utc>>,
34    },
35    TriggerEmitted {
36        job_id: String,
37        scheduled_at: DateTime<Utc>,
38        catch_up: bool,
39        trigger_count: u32,
40    },
41    RunCompleted {
42        job_id: String,
43        scheduled_at: DateTime<Utc>,
44        catch_up: bool,
45        trigger_count: u32,
46        status: RunStatus,
47        error: Option<String>,
48    },
49    ExecutionGuardContended {
50        job_id: String,
51        scheduled_at: DateTime<Utc>,
52        catch_up: bool,
53        trigger_count: u32,
54    },
55    ExecutionGuardLost {
56        job_id: String,
57        scheduled_at: DateTime<Utc>,
58        catch_up: bool,
59        trigger_count: u32,
60    },
61    ExecutionGuardReleaseFailed {
62        job_id: String,
63        scheduled_at: DateTime<Utc>,
64        catch_up: bool,
65        trigger_count: u32,
66        error: String,
67    },
68    StoreDegraded {
69        job_id: String,
70        operation: StoreOperation,
71        error: String,
72    },
73    TerminalStateDeleted {
74        job_id: String,
75        trigger_count: u32,
76    },
77    SchedulerStopped {
78        job_id: String,
79        trigger_count: u32,
80        reason: SchedulerStopReason,
81    },
82}
83
84pub trait SchedulerObserver: Send + Sync + 'static {
85    fn on_event(&self, event: &SchedulerEvent);
86}
87
88#[derive(Debug, Clone, Copy, Default)]
89pub struct NoopObserver;
90
91impl SchedulerObserver for NoopObserver {
92    fn on_event(&self, _event: &SchedulerEvent) {}
93}
94
95#[derive(Debug, Clone, Copy, Default)]
96pub struct LogObserver;
97
98impl SchedulerObserver for LogObserver {
99    fn on_event(&self, event: &SchedulerEvent) {
100        match event {
101            SchedulerEvent::StateLoaded {
102                job_id,
103                trigger_count,
104                next_run_at,
105                source,
106            } => info!(
107                "scheduler state loaded job_id={} source={:?} trigger_count={} next_run_at={:?}",
108                job_id, source, trigger_count, next_run_at
109            ),
110            SchedulerEvent::StateRepaired {
111                job_id,
112                trigger_count,
113                previous_next_run_at,
114                repaired_next_run_at,
115            } => warn!(
116                "scheduler repaired state job_id={} trigger_count={} previous_next_run_at={:?} repaired_next_run_at={:?}",
117                job_id, trigger_count, previous_next_run_at, repaired_next_run_at
118            ),
119            SchedulerEvent::TriggerEmitted {
120                job_id,
121                scheduled_at,
122                catch_up,
123                trigger_count,
124            } => debug!(
125                "scheduler trigger emitted job_id={} scheduled_at={} catch_up={} trigger_count={}",
126                job_id, scheduled_at, catch_up, trigger_count
127            ),
128            SchedulerEvent::RunCompleted {
129                job_id,
130                scheduled_at,
131                catch_up,
132                trigger_count,
133                status,
134                error,
135            } => debug!(
136                "scheduler run completed job_id={} scheduled_at={} catch_up={} trigger_count={} status={:?} error={:?}",
137                job_id, scheduled_at, catch_up, trigger_count, status, error
138            ),
139            SchedulerEvent::ExecutionGuardContended {
140                job_id,
141                scheduled_at,
142                catch_up,
143                trigger_count,
144            } => debug!(
145                "scheduler execution guard contended job_id={} scheduled_at={} catch_up={} trigger_count={}",
146                job_id, scheduled_at, catch_up, trigger_count
147            ),
148            SchedulerEvent::ExecutionGuardLost {
149                job_id,
150                scheduled_at,
151                catch_up,
152                trigger_count,
153            } => warn!(
154                "scheduler execution guard lost job_id={} scheduled_at={} catch_up={} trigger_count={}",
155                job_id, scheduled_at, catch_up, trigger_count
156            ),
157            SchedulerEvent::ExecutionGuardReleaseFailed {
158                job_id,
159                scheduled_at,
160                catch_up,
161                trigger_count,
162                error,
163            } => warn!(
164                "scheduler execution guard release failed job_id={} scheduled_at={} catch_up={} trigger_count={} error={}",
165                job_id, scheduled_at, catch_up, trigger_count, error
166            ),
167            SchedulerEvent::StoreDegraded {
168                job_id,
169                operation,
170                error,
171            } => warn!(
172                "scheduler store degraded job_id={} operation={:?} error={}",
173                job_id, operation, error
174            ),
175            SchedulerEvent::TerminalStateDeleted {
176                job_id,
177                trigger_count,
178            } => info!(
179                "scheduler deleted terminal state job_id={} trigger_count={}",
180                job_id, trigger_count
181            ),
182            SchedulerEvent::SchedulerStopped {
183                job_id,
184                trigger_count,
185                reason,
186            } => info!(
187                "scheduler stopped job_id={} trigger_count={} reason={:?}",
188                job_id, trigger_count, reason
189            ),
190        }
191    }
192}