1use crate::execution_guard::ExecutionGuardScope;
2use crate::model::{RunSkipReason, RunStatus};
3use crate::store::StoreOperation;
4use chrono::{DateTime, Utc};
5use log::{debug, info, warn};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum StateLoadSource {
9 New,
10 Restored,
11 Repaired,
12}
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum SchedulerStopReason {
16 Cancelled,
17 Shutdown,
18 Terminal,
19 ChannelClosed,
20}
21
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub enum SchedulerEvent {
24 StateLoaded {
25 job_id: String,
26 trigger_count: u32,
27 next_run_at: Option<DateTime<Utc>>,
28 source: StateLoadSource,
29 },
30 StateRepaired {
31 job_id: String,
32 trigger_count: u32,
33 previous_next_run_at: Option<DateTime<Utc>>,
34 repaired_next_run_at: Option<DateTime<Utc>>,
35 },
36 TriggerEmitted {
37 job_id: String,
38 scheduled_at: DateTime<Utc>,
39 catch_up: bool,
40 trigger_count: u32,
41 },
42 RunSkipped {
43 job_id: String,
44 scheduled_at: DateTime<Utc>,
45 catch_up: bool,
46 trigger_count: u32,
47 reason: RunSkipReason,
48 },
49 RunCompleted {
50 job_id: String,
51 scheduled_at: DateTime<Utc>,
52 catch_up: bool,
53 trigger_count: u32,
54 status: RunStatus,
55 error: Option<String>,
56 },
57 ExecutionGuardAcquired {
58 job_id: String,
59 resource_id: String,
60 scope: ExecutionGuardScope,
61 lease_key: String,
62 scheduled_at: Option<DateTime<Utc>>,
63 catch_up: bool,
64 trigger_count: u32,
65 },
66 ExecutionGuardContended {
67 job_id: String,
68 resource_id: String,
69 scope: ExecutionGuardScope,
70 scheduled_at: Option<DateTime<Utc>>,
71 catch_up: bool,
72 trigger_count: u32,
73 },
74 ExecutionGuardRenewed {
75 job_id: String,
76 resource_id: String,
77 scope: ExecutionGuardScope,
78 lease_key: String,
79 scheduled_at: Option<DateTime<Utc>>,
80 catch_up: bool,
81 trigger_count: u32,
82 renewal_count: u32,
83 },
84 ExecutionGuardRenewFailed {
85 job_id: String,
86 resource_id: String,
87 scope: ExecutionGuardScope,
88 lease_key: String,
89 scheduled_at: Option<DateTime<Utc>>,
90 catch_up: bool,
91 trigger_count: u32,
92 renewal_count: u32,
93 failed_renewal_count: u32,
94 error: String,
95 },
96 ExecutionGuardLost {
97 job_id: String,
98 resource_id: String,
99 scope: ExecutionGuardScope,
100 lease_key: String,
101 scheduled_at: Option<DateTime<Utc>>,
102 catch_up: bool,
103 trigger_count: u32,
104 renewal_count: u32,
105 failed_renewal_count: u32,
106 },
107 ExecutionGuardReleased {
108 job_id: String,
109 resource_id: String,
110 scope: ExecutionGuardScope,
111 lease_key: String,
112 scheduled_at: Option<DateTime<Utc>>,
113 catch_up: bool,
114 trigger_count: u32,
115 },
116 ExecutionGuardReleaseFailed {
117 job_id: String,
118 resource_id: String,
119 scope: ExecutionGuardScope,
120 lease_key: String,
121 scheduled_at: Option<DateTime<Utc>>,
122 catch_up: bool,
123 trigger_count: u32,
124 error: String,
125 },
126 StoreDegraded {
127 job_id: String,
128 operation: StoreOperation,
129 error: String,
130 },
131 TerminalStateDeleted {
132 job_id: String,
133 trigger_count: u32,
134 },
135 SchedulerStopped {
136 job_id: String,
137 trigger_count: u32,
138 reason: SchedulerStopReason,
139 },
140}
141
142pub trait SchedulerObserver: Send + Sync + 'static {
143 fn on_event(&self, event: &SchedulerEvent);
144}
145
146#[derive(Debug, Clone, Copy, Default)]
147pub struct NoopObserver;
148
149impl SchedulerObserver for NoopObserver {
150 fn on_event(&self, _event: &SchedulerEvent) {}
151}
152
153#[derive(Debug, Clone, Copy, Default)]
154pub struct LogObserver;
155
156impl SchedulerObserver for LogObserver {
157 fn on_event(&self, event: &SchedulerEvent) {
158 match event {
159 SchedulerEvent::StateLoaded {
160 job_id,
161 trigger_count,
162 next_run_at,
163 source,
164 } => info!(
165 "scheduler state loaded job_id={} source={:?} trigger_count={} next_run_at={:?}",
166 job_id, source, trigger_count, next_run_at
167 ),
168 SchedulerEvent::StateRepaired {
169 job_id,
170 trigger_count,
171 previous_next_run_at,
172 repaired_next_run_at,
173 } => warn!(
174 "scheduler repaired state job_id={} trigger_count={} previous_next_run_at={:?} repaired_next_run_at={:?}",
175 job_id, trigger_count, previous_next_run_at, repaired_next_run_at
176 ),
177 SchedulerEvent::TriggerEmitted {
178 job_id,
179 scheduled_at,
180 catch_up,
181 trigger_count,
182 } => debug!(
183 "scheduler trigger emitted job_id={} scheduled_at={} catch_up={} trigger_count={}",
184 job_id, scheduled_at, catch_up, trigger_count
185 ),
186 SchedulerEvent::RunSkipped {
187 job_id,
188 scheduled_at,
189 catch_up,
190 trigger_count,
191 reason,
192 } => debug!(
193 "scheduler run skipped job_id={} scheduled_at={} catch_up={} trigger_count={} reason={}",
194 job_id, scheduled_at, catch_up, trigger_count, reason
195 ),
196 SchedulerEvent::RunCompleted {
197 job_id,
198 scheduled_at,
199 catch_up,
200 trigger_count,
201 status,
202 error,
203 } => debug!(
204 "scheduler run completed job_id={} scheduled_at={} catch_up={} trigger_count={} status={:?} error={:?}",
205 job_id, scheduled_at, catch_up, trigger_count, status, error
206 ),
207 SchedulerEvent::ExecutionGuardAcquired {
208 job_id,
209 resource_id,
210 scope,
211 lease_key,
212 scheduled_at,
213 catch_up,
214 trigger_count,
215 } => debug!(
216 "scheduler execution guard acquired job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={}",
217 job_id, resource_id, scope, lease_key, scheduled_at, catch_up, trigger_count
218 ),
219 SchedulerEvent::ExecutionGuardContended {
220 job_id,
221 resource_id,
222 scope,
223 scheduled_at,
224 catch_up,
225 trigger_count,
226 } => debug!(
227 "scheduler execution guard contended job_id={} resource_id={} scope={:?} scheduled_at={:?} catch_up={} trigger_count={}",
228 job_id, resource_id, scope, scheduled_at, catch_up, trigger_count
229 ),
230 SchedulerEvent::ExecutionGuardRenewed {
231 job_id,
232 resource_id,
233 scope,
234 lease_key,
235 scheduled_at,
236 catch_up,
237 trigger_count,
238 renewal_count,
239 } => debug!(
240 "scheduler execution guard renewed job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} renewal_count={}",
241 job_id,
242 resource_id,
243 scope,
244 lease_key,
245 scheduled_at,
246 catch_up,
247 trigger_count,
248 renewal_count
249 ),
250 SchedulerEvent::ExecutionGuardRenewFailed {
251 job_id,
252 resource_id,
253 scope,
254 lease_key,
255 scheduled_at,
256 catch_up,
257 trigger_count,
258 renewal_count,
259 failed_renewal_count,
260 error,
261 } => warn!(
262 "scheduler execution guard renew failed job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} renewal_count={} failed_renewal_count={} error={}",
263 job_id,
264 resource_id,
265 scope,
266 lease_key,
267 scheduled_at,
268 catch_up,
269 trigger_count,
270 renewal_count,
271 failed_renewal_count,
272 error
273 ),
274 SchedulerEvent::ExecutionGuardLost {
275 job_id,
276 resource_id,
277 scope,
278 lease_key,
279 scheduled_at,
280 catch_up,
281 trigger_count,
282 renewal_count,
283 failed_renewal_count,
284 } => warn!(
285 "scheduler execution guard lost job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} renewal_count={} failed_renewal_count={}",
286 job_id,
287 resource_id,
288 scope,
289 lease_key,
290 scheduled_at,
291 catch_up,
292 trigger_count,
293 renewal_count,
294 failed_renewal_count
295 ),
296 SchedulerEvent::ExecutionGuardReleased {
297 job_id,
298 resource_id,
299 scope,
300 lease_key,
301 scheduled_at,
302 catch_up,
303 trigger_count,
304 } => debug!(
305 "scheduler execution guard released job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={}",
306 job_id, resource_id, scope, lease_key, scheduled_at, catch_up, trigger_count
307 ),
308 SchedulerEvent::ExecutionGuardReleaseFailed {
309 job_id,
310 resource_id,
311 scope,
312 lease_key,
313 scheduled_at,
314 catch_up,
315 trigger_count,
316 error,
317 } => warn!(
318 "scheduler execution guard release failed job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} error={}",
319 job_id, resource_id, scope, lease_key, scheduled_at, catch_up, trigger_count, error
320 ),
321 SchedulerEvent::StoreDegraded {
322 job_id,
323 operation,
324 error,
325 } => warn!(
326 "scheduler store degraded job_id={} operation={:?} error={}",
327 job_id, operation, error
328 ),
329 SchedulerEvent::TerminalStateDeleted {
330 job_id,
331 trigger_count,
332 } => info!(
333 "scheduler deleted terminal state job_id={} trigger_count={}",
334 job_id, trigger_count
335 ),
336 SchedulerEvent::SchedulerStopped {
337 job_id,
338 trigger_count,
339 reason,
340 } => info!(
341 "scheduler stopped job_id={} trigger_count={} reason={:?}",
342 job_id, trigger_count, reason
343 ),
344 }
345 }
346}