Skip to main content

scheduler/
observer.rs

1use crate::execution_guard::ExecutionGuardScope;
2use crate::model::RunStatus;
3use crate::store::StoreOperation;
4use chrono::{DateTime, Utc};
5use log::{debug, info, warn};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum StateLoadSource {
9    New,
10    Restored,
11    Repaired,
12}
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum SchedulerStopReason {
16    Cancelled,
17    Shutdown,
18    Terminal,
19    ChannelClosed,
20}
21
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub enum SchedulerEvent {
24    StateLoaded {
25        job_id: String,
26        trigger_count: u32,
27        next_run_at: Option<DateTime<Utc>>,
28        source: StateLoadSource,
29    },
30    StateRepaired {
31        job_id: String,
32        trigger_count: u32,
33        previous_next_run_at: Option<DateTime<Utc>>,
34        repaired_next_run_at: Option<DateTime<Utc>>,
35    },
36    TriggerEmitted {
37        job_id: String,
38        scheduled_at: DateTime<Utc>,
39        catch_up: bool,
40        trigger_count: u32,
41    },
42    RunCompleted {
43        job_id: String,
44        scheduled_at: DateTime<Utc>,
45        catch_up: bool,
46        trigger_count: u32,
47        status: RunStatus,
48        error: Option<String>,
49    },
50    ExecutionGuardAcquired {
51        job_id: String,
52        resource_id: String,
53        scope: ExecutionGuardScope,
54        lease_key: String,
55        scheduled_at: Option<DateTime<Utc>>,
56        catch_up: bool,
57        trigger_count: u32,
58    },
59    ExecutionGuardContended {
60        job_id: String,
61        resource_id: String,
62        scope: ExecutionGuardScope,
63        scheduled_at: Option<DateTime<Utc>>,
64        catch_up: bool,
65        trigger_count: u32,
66    },
67    ExecutionGuardRenewed {
68        job_id: String,
69        resource_id: String,
70        scope: ExecutionGuardScope,
71        lease_key: String,
72        scheduled_at: Option<DateTime<Utc>>,
73        catch_up: bool,
74        trigger_count: u32,
75        renewal_count: u32,
76    },
77    ExecutionGuardRenewFailed {
78        job_id: String,
79        resource_id: String,
80        scope: ExecutionGuardScope,
81        lease_key: String,
82        scheduled_at: Option<DateTime<Utc>>,
83        catch_up: bool,
84        trigger_count: u32,
85        renewal_count: u32,
86        failed_renewal_count: u32,
87        error: String,
88    },
89    ExecutionGuardLost {
90        job_id: String,
91        resource_id: String,
92        scope: ExecutionGuardScope,
93        lease_key: String,
94        scheduled_at: Option<DateTime<Utc>>,
95        catch_up: bool,
96        trigger_count: u32,
97        renewal_count: u32,
98        failed_renewal_count: u32,
99    },
100    ExecutionGuardReleased {
101        job_id: String,
102        resource_id: String,
103        scope: ExecutionGuardScope,
104        lease_key: String,
105        scheduled_at: Option<DateTime<Utc>>,
106        catch_up: bool,
107        trigger_count: u32,
108    },
109    ExecutionGuardReleaseFailed {
110        job_id: String,
111        resource_id: String,
112        scope: ExecutionGuardScope,
113        lease_key: String,
114        scheduled_at: Option<DateTime<Utc>>,
115        catch_up: bool,
116        trigger_count: u32,
117        error: String,
118    },
119    StoreDegraded {
120        job_id: String,
121        operation: StoreOperation,
122        error: String,
123    },
124    TerminalStateDeleted {
125        job_id: String,
126        trigger_count: u32,
127    },
128    SchedulerStopped {
129        job_id: String,
130        trigger_count: u32,
131        reason: SchedulerStopReason,
132    },
133}
134
135pub trait SchedulerObserver: Send + Sync + 'static {
136    fn on_event(&self, event: &SchedulerEvent);
137}
138
139#[derive(Debug, Clone, Copy, Default)]
140pub struct NoopObserver;
141
142impl SchedulerObserver for NoopObserver {
143    fn on_event(&self, _event: &SchedulerEvent) {}
144}
145
146#[derive(Debug, Clone, Copy, Default)]
147pub struct LogObserver;
148
149impl SchedulerObserver for LogObserver {
150    fn on_event(&self, event: &SchedulerEvent) {
151        match event {
152            SchedulerEvent::StateLoaded {
153                job_id,
154                trigger_count,
155                next_run_at,
156                source,
157            } => info!(
158                "scheduler state loaded job_id={} source={:?} trigger_count={} next_run_at={:?}",
159                job_id, source, trigger_count, next_run_at
160            ),
161            SchedulerEvent::StateRepaired {
162                job_id,
163                trigger_count,
164                previous_next_run_at,
165                repaired_next_run_at,
166            } => warn!(
167                "scheduler repaired state job_id={} trigger_count={} previous_next_run_at={:?} repaired_next_run_at={:?}",
168                job_id, trigger_count, previous_next_run_at, repaired_next_run_at
169            ),
170            SchedulerEvent::TriggerEmitted {
171                job_id,
172                scheduled_at,
173                catch_up,
174                trigger_count,
175            } => debug!(
176                "scheduler trigger emitted job_id={} scheduled_at={} catch_up={} trigger_count={}",
177                job_id, scheduled_at, catch_up, trigger_count
178            ),
179            SchedulerEvent::RunCompleted {
180                job_id,
181                scheduled_at,
182                catch_up,
183                trigger_count,
184                status,
185                error,
186            } => debug!(
187                "scheduler run completed job_id={} scheduled_at={} catch_up={} trigger_count={} status={:?} error={:?}",
188                job_id, scheduled_at, catch_up, trigger_count, status, error
189            ),
190            SchedulerEvent::ExecutionGuardAcquired {
191                job_id,
192                resource_id,
193                scope,
194                lease_key,
195                scheduled_at,
196                catch_up,
197                trigger_count,
198            } => debug!(
199                "scheduler execution guard acquired job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={}",
200                job_id, resource_id, scope, lease_key, scheduled_at, catch_up, trigger_count
201            ),
202            SchedulerEvent::ExecutionGuardContended {
203                job_id,
204                resource_id,
205                scope,
206                scheduled_at,
207                catch_up,
208                trigger_count,
209            } => debug!(
210                "scheduler execution guard contended job_id={} resource_id={} scope={:?} scheduled_at={:?} catch_up={} trigger_count={}",
211                job_id, resource_id, scope, scheduled_at, catch_up, trigger_count
212            ),
213            SchedulerEvent::ExecutionGuardRenewed {
214                job_id,
215                resource_id,
216                scope,
217                lease_key,
218                scheduled_at,
219                catch_up,
220                trigger_count,
221                renewal_count,
222            } => debug!(
223                "scheduler execution guard renewed job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} renewal_count={}",
224                job_id,
225                resource_id,
226                scope,
227                lease_key,
228                scheduled_at,
229                catch_up,
230                trigger_count,
231                renewal_count
232            ),
233            SchedulerEvent::ExecutionGuardRenewFailed {
234                job_id,
235                resource_id,
236                scope,
237                lease_key,
238                scheduled_at,
239                catch_up,
240                trigger_count,
241                renewal_count,
242                failed_renewal_count,
243                error,
244            } => warn!(
245                "scheduler execution guard renew failed job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} renewal_count={} failed_renewal_count={} error={}",
246                job_id,
247                resource_id,
248                scope,
249                lease_key,
250                scheduled_at,
251                catch_up,
252                trigger_count,
253                renewal_count,
254                failed_renewal_count,
255                error
256            ),
257            SchedulerEvent::ExecutionGuardLost {
258                job_id,
259                resource_id,
260                scope,
261                lease_key,
262                scheduled_at,
263                catch_up,
264                trigger_count,
265                renewal_count,
266                failed_renewal_count,
267            } => warn!(
268                "scheduler execution guard lost job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} renewal_count={} failed_renewal_count={}",
269                job_id,
270                resource_id,
271                scope,
272                lease_key,
273                scheduled_at,
274                catch_up,
275                trigger_count,
276                renewal_count,
277                failed_renewal_count
278            ),
279            SchedulerEvent::ExecutionGuardReleased {
280                job_id,
281                resource_id,
282                scope,
283                lease_key,
284                scheduled_at,
285                catch_up,
286                trigger_count,
287            } => debug!(
288                "scheduler execution guard released job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={}",
289                job_id, resource_id, scope, lease_key, scheduled_at, catch_up, trigger_count
290            ),
291            SchedulerEvent::ExecutionGuardReleaseFailed {
292                job_id,
293                resource_id,
294                scope,
295                lease_key,
296                scheduled_at,
297                catch_up,
298                trigger_count,
299                error,
300            } => warn!(
301                "scheduler execution guard release failed job_id={} resource_id={} scope={:?} lease_key={} scheduled_at={:?} catch_up={} trigger_count={} error={}",
302                job_id, resource_id, scope, lease_key, scheduled_at, catch_up, trigger_count, error
303            ),
304            SchedulerEvent::StoreDegraded {
305                job_id,
306                operation,
307                error,
308            } => warn!(
309                "scheduler store degraded job_id={} operation={:?} error={}",
310                job_id, operation, error
311            ),
312            SchedulerEvent::TerminalStateDeleted {
313                job_id,
314                trigger_count,
315            } => info!(
316                "scheduler deleted terminal state job_id={} trigger_count={}",
317                job_id, trigger_count
318            ),
319            SchedulerEvent::SchedulerStopped {
320                job_id,
321                trigger_count,
322                reason,
323            } => info!(
324                "scheduler stopped job_id={} trigger_count={} reason={:?}",
325                job_id, trigger_count, reason
326            ),
327        }
328    }
329}