1use crate::execution_guard::ExecutionGuardScope;
2use crate::model::RunStatus;
3use crate::store::StoreOperation;
4use chrono::{DateTime, Utc};
5use log::{debug, info, warn};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum StateLoadSource {
9 New,
10 Restored,
11 Repaired,
12}
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum SchedulerStopReason {
16 Cancelled,
17 Shutdown,
18 Terminal,
19 ChannelClosed,
20}
21
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub enum SchedulerEvent {
24 StateLoaded {
25 job_id: String,
26 trigger_count: u32,
27 next_run_at: Option<DateTime<Utc>>,
28 source: StateLoadSource,
29 },
30 StateRepaired {
31 job_id: String,
32 trigger_count: u32,
33 previous_next_run_at: Option<DateTime<Utc>>,
34 repaired_next_run_at: Option<DateTime<Utc>>,
35 },
36 TriggerEmitted {
37 job_id: String,
38 scheduled_at: DateTime<Utc>,
39 catch_up: bool,
40 trigger_count: u32,
41 },
42 RunCompleted {
43 job_id: String,
44 scheduled_at: DateTime<Utc>,
45 catch_up: bool,
46 trigger_count: u32,
47 status: RunStatus,
48 error: Option<String>,
49 },
50 ExecutionGuardAcquired {
51 job_id: String,
52 resource_id: String,
53 scope: ExecutionGuardScope,
54 lease_key: String,
55 scheduled_at: Option<DateTime<Utc>>,
56 catch_up: bool,
57 trigger_count: u32,
58 },
59 ExecutionGuardContended {
60 job_id: String,
61 resource_id: String,
62 scope: ExecutionGuardScope,
63 scheduled_at: Option<DateTime<Utc>>,
64 catch_up: bool,
65 trigger_count: u32,
66 },
67 ExecutionGuardRenewed {
68 job_id: String,
69 resource_id: String,
70 scope: ExecutionGuardScope,
71 lease_key: String,
72 scheduled_at: Option<DateTime<Utc>>,
73 catch_up: bool,
74 trigger_count: u32,
75 renewal_count: u32,
76 },
77 ExecutionGuardRenewFailed {
78 job_id: String,
79 resource_id: String,
80 scope: ExecutionGuardScope,
81 lease_key: String,
82 scheduled_at: Option<DateTime<Utc>>,
83 catch_up: bool,
84 trigger_count: u32,
85 renewal_count: u32,
86 failed_renewal_count: u32,
87 error: String,
88 },
89 ExecutionGuardLost {
90 job_id: String,
91 resource_id: String,
92 scope: ExecutionGuardScope,
93 lease_key: String,
94 scheduled_at: Option<DateTime<Utc>>,
95 catch_up: bool,
96 trigger_count: u32,
97 renewal_count: u32,
98 failed_renewal_count: u32,
99 },
100 ExecutionGuardReleased {
101 job_id: String,
102 resource_id: String,
103 scope: ExecutionGuardScope,
104 lease_key: String,
105 scheduled_at: Option<DateTime<Utc>>,
106 catch_up: bool,
107 trigger_count: u32,
108 },
109 ExecutionGuardReleaseFailed {
110 job_id: String,
111 resource_id: String,
112 scope: ExecutionGuardScope,
113 lease_key: String,
114 scheduled_at: Option<DateTime<Utc>>,
115 catch_up: bool,
116 trigger_count: u32,
117 error: String,
118 },
119 StoreDegraded {
120 job_id: String,
121 operation: StoreOperation,
122 error: String,
123 },
124 TerminalStateDeleted {
125 job_id: String,
126 trigger_count: u32,
127 },
128 SchedulerStopped {
129 job_id: String,
130 trigger_count: u32,
131 reason: SchedulerStopReason,
132 },
133}
134
135pub trait SchedulerObserver: Send + Sync + 'static {
136 fn on_event(&self, event: &SchedulerEvent);
137}
138
139#[derive(Debug, Clone, Copy, Default)]
140pub struct NoopObserver;
141
142impl SchedulerObserver for NoopObserver {
143 fn on_event(&self, _event: &SchedulerEvent) {}
144}
145
146#[derive(Debug, Clone, Copy, Default)]
147pub struct LogObserver;
148
149impl SchedulerObserver for LogObserver {
150 fn on_event(&self, event: &SchedulerEvent) {
151 match event {
152 SchedulerEvent::StateLoaded {
153 job_id,
154 trigger_count,
155 next_run_at,
156 source,
157 } => info!(
158 "scheduler state loaded job_id={} source={:?} trigger_count={} next_run_at={:?}",
159 job_id, source, trigger_count, next_run_at
160 ),
161 SchedulerEvent::StateRepaired {
162 job_id,
163 trigger_count,
164 previous_next_run_at,
165 repaired_next_run_at,
166 } => warn!(
167 "scheduler repaired state job_id={} trigger_count={} previous_next_run_at={:?} repaired_next_run_at={:?}",
168 job_id, trigger_count, previous_next_run_at, repaired_next_run_at
169 ),
170 SchedulerEvent::TriggerEmitted {
171 job_id,
172 scheduled_at,
173 catch_up,
174 trigger_count,
175 } => debug!(
176 "scheduler trigger emitted job_id={} scheduled_at={} catch_up={} trigger_count={}",
177 job_id, scheduled_at, catch_up, trigger_count
178 ),
179 SchedulerEvent::RunCompleted {
180 job_id,
181 scheduled_at,
182 catch_up,
183 trigger_count,
184 status,
185 error,
186 } => debug!(
187 "scheduler run completed job_id={} scheduled_at={} catch_up={} trigger_count={} status={:?} error={:?}",
188 job_id, scheduled_at, catch_up, trigger_count, status, error
189 ),
190 SchedulerEvent::ExecutionGuardAcquired {
191 job_id,
192 resource_id,
193 scope,
194 lease_key,
195 scheduled_at,
196 catch_up,
197 trigger_count,
198 } => debug!(
199 "scheduler execution guard acquired job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={}",
200 job_id, resource_id, scope, lease_key, scheduled_at, catch_up, trigger_count
201 ),
202 SchedulerEvent::ExecutionGuardContended {
203 job_id,
204 resource_id,
205 scope,
206 scheduled_at,
207 catch_up,
208 trigger_count,
209 } => debug!(
210 "scheduler execution guard contended job_id={} resource_id={} scope={:?} scheduled_at={:?} catch_up={} trigger_count={}",
211 job_id, resource_id, scope, scheduled_at, catch_up, trigger_count
212 ),
213 SchedulerEvent::ExecutionGuardRenewed {
214 job_id,
215 resource_id,
216 scope,
217 lease_key,
218 scheduled_at,
219 catch_up,
220 trigger_count,
221 renewal_count,
222 } => debug!(
223 "scheduler execution guard renewed job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} renewal_count={}",
224 job_id,
225 resource_id,
226 scope,
227 lease_key,
228 scheduled_at,
229 catch_up,
230 trigger_count,
231 renewal_count
232 ),
233 SchedulerEvent::ExecutionGuardRenewFailed {
234 job_id,
235 resource_id,
236 scope,
237 lease_key,
238 scheduled_at,
239 catch_up,
240 trigger_count,
241 renewal_count,
242 failed_renewal_count,
243 error,
244 } => warn!(
245 "scheduler execution guard renew failed job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} renewal_count={} failed_renewal_count={} error={}",
246 job_id,
247 resource_id,
248 scope,
249 lease_key,
250 scheduled_at,
251 catch_up,
252 trigger_count,
253 renewal_count,
254 failed_renewal_count,
255 error
256 ),
257 SchedulerEvent::ExecutionGuardLost {
258 job_id,
259 resource_id,
260 scope,
261 lease_key,
262 scheduled_at,
263 catch_up,
264 trigger_count,
265 renewal_count,
266 failed_renewal_count,
267 } => warn!(
268 "scheduler execution guard lost job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} renewal_count={} failed_renewal_count={}",
269 job_id,
270 resource_id,
271 scope,
272 lease_key,
273 scheduled_at,
274 catch_up,
275 trigger_count,
276 renewal_count,
277 failed_renewal_count
278 ),
279 SchedulerEvent::ExecutionGuardReleased {
280 job_id,
281 resource_id,
282 scope,
283 lease_key,
284 scheduled_at,
285 catch_up,
286 trigger_count,
287 } => debug!(
288 "scheduler execution guard released job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={}",
289 job_id, resource_id, scope, lease_key, scheduled_at, catch_up, trigger_count
290 ),
291 SchedulerEvent::ExecutionGuardReleaseFailed {
292 job_id,
293 resource_id,
294 scope,
295 lease_key,
296 scheduled_at,
297 catch_up,
298 trigger_count,
299 error,
300 } => warn!(
301 "scheduler execution guard release failed job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} error={}",
302 job_id, resource_id, scope, lease_key, scheduled_at, catch_up, trigger_count, error
303 ),
304 SchedulerEvent::StoreDegraded {
305 job_id,
306 operation,
307 error,
308 } => warn!(
309 "scheduler store degraded job_id={} operation={:?} error={}",
310 job_id, operation, error
311 ),
312 SchedulerEvent::TerminalStateDeleted {
313 job_id,
314 trigger_count,
315 } => info!(
316 "scheduler deleted terminal state job_id={} trigger_count={}",
317 job_id, trigger_count
318 ),
319 SchedulerEvent::SchedulerStopped {
320 job_id,
321 trigger_count,
322 reason,
323 } => info!(
324 "scheduler stopped job_id={} trigger_count={} reason={:?}",
325 job_id, trigger_count, reason
326 ),
327 }
328 }
329}