1use crate::execution_guard::ExecutionGuardScope;
2use crate::model::{RunSkipReason, RunStatus};
3use crate::store::StoreOperation;
4use chrono::{DateTime, Utc};
5use log::{debug, info, warn};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum StateLoadSource {
9 New,
10 Restored,
11 Repaired,
12}
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum SchedulerStopReason {
16 Cancelled,
17 Shutdown,
18 Terminal,
19 ChannelClosed,
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum PauseScope {
24 Local,
25 Shared,
26}
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum SchedulerEvent {
30 StateLoaded {
31 job_id: String,
32 trigger_count: u32,
33 next_run_at: Option<DateTime<Utc>>,
34 source: StateLoadSource,
35 },
36 StateRepaired {
37 job_id: String,
38 trigger_count: u32,
39 previous_next_run_at: Option<DateTime<Utc>>,
40 repaired_next_run_at: Option<DateTime<Utc>>,
41 },
42 TriggerEmitted {
43 job_id: String,
44 scheduled_at: DateTime<Utc>,
45 catch_up: bool,
46 trigger_count: u32,
47 },
48 RunSkipped {
49 job_id: String,
50 scheduled_at: DateTime<Utc>,
51 catch_up: bool,
52 trigger_count: u32,
53 reason: RunSkipReason,
54 },
55 RunCompleted {
56 job_id: String,
57 scheduled_at: DateTime<Utc>,
58 catch_up: bool,
59 trigger_count: u32,
60 status: RunStatus,
61 error: Option<String>,
62 },
63 ExecutionGuardAcquired {
64 job_id: String,
65 resource_id: String,
66 scope: ExecutionGuardScope,
67 lease_key: String,
68 scheduled_at: Option<DateTime<Utc>>,
69 catch_up: bool,
70 trigger_count: u32,
71 },
72 ExecutionGuardContended {
73 job_id: String,
74 resource_id: String,
75 scope: ExecutionGuardScope,
76 scheduled_at: Option<DateTime<Utc>>,
77 catch_up: bool,
78 trigger_count: u32,
79 },
80 ExecutionGuardRenewed {
81 job_id: String,
82 resource_id: String,
83 scope: ExecutionGuardScope,
84 lease_key: String,
85 scheduled_at: Option<DateTime<Utc>>,
86 catch_up: bool,
87 trigger_count: u32,
88 renewal_count: u32,
89 },
90 ExecutionGuardRenewFailed {
91 job_id: String,
92 resource_id: String,
93 scope: ExecutionGuardScope,
94 lease_key: String,
95 scheduled_at: Option<DateTime<Utc>>,
96 catch_up: bool,
97 trigger_count: u32,
98 renewal_count: u32,
99 failed_renewal_count: u32,
100 error: String,
101 },
102 ExecutionGuardLost {
103 job_id: String,
104 resource_id: String,
105 scope: ExecutionGuardScope,
106 lease_key: String,
107 scheduled_at: Option<DateTime<Utc>>,
108 catch_up: bool,
109 trigger_count: u32,
110 renewal_count: u32,
111 failed_renewal_count: u32,
112 },
113 ExecutionGuardReleased {
114 job_id: String,
115 resource_id: String,
116 scope: ExecutionGuardScope,
117 lease_key: String,
118 scheduled_at: Option<DateTime<Utc>>,
119 catch_up: bool,
120 trigger_count: u32,
121 },
122 ExecutionGuardReleaseFailed {
123 job_id: String,
124 resource_id: String,
125 scope: ExecutionGuardScope,
126 lease_key: String,
127 scheduled_at: Option<DateTime<Utc>>,
128 catch_up: bool,
129 trigger_count: u32,
130 error: String,
131 },
132 StoreDegraded {
133 job_id: String,
134 operation: StoreOperation,
135 error: String,
136 },
137 TerminalStateDeleted {
138 job_id: String,
139 trigger_count: u32,
140 },
141 SchedulerPaused {
142 job_id: String,
143 trigger_count: u32,
144 scope: PauseScope,
145 },
146 SchedulerResumed {
147 job_id: String,
148 trigger_count: u32,
149 scope: PauseScope,
150 },
151 SchedulerStopped {
152 job_id: String,
153 trigger_count: u32,
154 reason: SchedulerStopReason,
155 },
156}
157
158pub trait SchedulerObserver: Send + Sync + 'static {
159 fn on_event(&self, event: &SchedulerEvent);
160}
161
162#[derive(Debug, Clone, Copy, Default)]
163pub struct NoopObserver;
164
165impl SchedulerObserver for NoopObserver {
166 fn on_event(&self, _event: &SchedulerEvent) {}
167}
168
169#[derive(Debug, Clone, Copy, Default)]
170pub struct LogObserver;
171
172impl SchedulerObserver for LogObserver {
173 fn on_event(&self, event: &SchedulerEvent) {
174 match event {
175 SchedulerEvent::StateLoaded {
176 job_id,
177 trigger_count,
178 next_run_at,
179 source,
180 } => info!(
181 "scheduler state loaded job_id={} source={:?} trigger_count={} next_run_at={:?}",
182 job_id, source, trigger_count, next_run_at
183 ),
184 SchedulerEvent::StateRepaired {
185 job_id,
186 trigger_count,
187 previous_next_run_at,
188 repaired_next_run_at,
189 } => warn!(
190 "scheduler repaired state job_id={} trigger_count={} previous_next_run_at={:?} repaired_next_run_at={:?}",
191 job_id, trigger_count, previous_next_run_at, repaired_next_run_at
192 ),
193 SchedulerEvent::TriggerEmitted {
194 job_id,
195 scheduled_at,
196 catch_up,
197 trigger_count,
198 } => debug!(
199 "scheduler trigger emitted job_id={} scheduled_at={} catch_up={} trigger_count={}",
200 job_id, scheduled_at, catch_up, trigger_count
201 ),
202 SchedulerEvent::RunSkipped {
203 job_id,
204 scheduled_at,
205 catch_up,
206 trigger_count,
207 reason,
208 } => debug!(
209 "scheduler run skipped job_id={} scheduled_at={} catch_up={} trigger_count={} reason={}",
210 job_id, scheduled_at, catch_up, trigger_count, reason
211 ),
212 SchedulerEvent::RunCompleted {
213 job_id,
214 scheduled_at,
215 catch_up,
216 trigger_count,
217 status,
218 error,
219 } => debug!(
220 "scheduler run completed job_id={} scheduled_at={} catch_up={} trigger_count={} status={:?} error={:?}",
221 job_id, scheduled_at, catch_up, trigger_count, status, error
222 ),
223 SchedulerEvent::ExecutionGuardAcquired {
224 job_id,
225 resource_id,
226 scope,
227 lease_key,
228 scheduled_at,
229 catch_up,
230 trigger_count,
231 } => debug!(
232 "scheduler execution guard acquired job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={}",
233 job_id, resource_id, scope, lease_key, scheduled_at, catch_up, trigger_count
234 ),
235 SchedulerEvent::ExecutionGuardContended {
236 job_id,
237 resource_id,
238 scope,
239 scheduled_at,
240 catch_up,
241 trigger_count,
242 } => debug!(
243 "scheduler execution guard contended job_id={} resource_id={} scope={:?} scheduled_at={:?} catch_up={} trigger_count={}",
244 job_id, resource_id, scope, scheduled_at, catch_up, trigger_count
245 ),
246 SchedulerEvent::ExecutionGuardRenewed {
247 job_id,
248 resource_id,
249 scope,
250 lease_key,
251 scheduled_at,
252 catch_up,
253 trigger_count,
254 renewal_count,
255 } => debug!(
256 "scheduler execution guard renewed job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} renewal_count={}",
257 job_id,
258 resource_id,
259 scope,
260 lease_key,
261 scheduled_at,
262 catch_up,
263 trigger_count,
264 renewal_count
265 ),
266 SchedulerEvent::ExecutionGuardRenewFailed {
267 job_id,
268 resource_id,
269 scope,
270 lease_key,
271 scheduled_at,
272 catch_up,
273 trigger_count,
274 renewal_count,
275 failed_renewal_count,
276 error,
277 } => warn!(
278 "scheduler execution guard renew failed job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} renewal_count={} failed_renewal_count={} error={}",
279 job_id,
280 resource_id,
281 scope,
282 lease_key,
283 scheduled_at,
284 catch_up,
285 trigger_count,
286 renewal_count,
287 failed_renewal_count,
288 error
289 ),
290 SchedulerEvent::ExecutionGuardLost {
291 job_id,
292 resource_id,
293 scope,
294 lease_key,
295 scheduled_at,
296 catch_up,
297 trigger_count,
298 renewal_count,
299 failed_renewal_count,
300 } => warn!(
301 "scheduler execution guard lost job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} renewal_count={} failed_renewal_count={}",
302 job_id,
303 resource_id,
304 scope,
305 lease_key,
306 scheduled_at,
307 catch_up,
308 trigger_count,
309 renewal_count,
310 failed_renewal_count
311 ),
312 SchedulerEvent::ExecutionGuardReleased {
313 job_id,
314 resource_id,
315 scope,
316 lease_key,
317 scheduled_at,
318 catch_up,
319 trigger_count,
320 } => debug!(
321 "scheduler execution guard released job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={}",
322 job_id, resource_id, scope, lease_key, scheduled_at, catch_up, trigger_count
323 ),
324 SchedulerEvent::ExecutionGuardReleaseFailed {
325 job_id,
326 resource_id,
327 scope,
328 lease_key,
329 scheduled_at,
330 catch_up,
331 trigger_count,
332 error,
333 } => warn!(
334 "scheduler execution guard release failed job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} error={}",
335 job_id, resource_id, scope, lease_key, scheduled_at, catch_up, trigger_count, error
336 ),
337 SchedulerEvent::StoreDegraded {
338 job_id,
339 operation,
340 error,
341 } => warn!(
342 "scheduler store degraded job_id={} operation={:?} error={}",
343 job_id, operation, error
344 ),
345 SchedulerEvent::TerminalStateDeleted {
346 job_id,
347 trigger_count,
348 } => info!(
349 "scheduler deleted terminal state job_id={} trigger_count={}",
350 job_id, trigger_count
351 ),
352 SchedulerEvent::SchedulerPaused {
353 job_id,
354 trigger_count,
355 scope,
356 } => info!(
357 "scheduler paused job_id={} trigger_count={} scope={:?}",
358 job_id, trigger_count, scope
359 ),
360 SchedulerEvent::SchedulerResumed {
361 job_id,
362 trigger_count,
363 scope,
364 } => info!(
365 "scheduler resumed job_id={} trigger_count={} scope={:?}",
366 job_id, trigger_count, scope
367 ),
368 SchedulerEvent::SchedulerStopped {
369 job_id,
370 trigger_count,
371 reason,
372 } => info!(
373 "scheduler stopped job_id={} trigger_count={} reason={:?}",
374 job_id, trigger_count, reason
375 ),
376 }
377 }
378}