Skip to main content

scheduler/
observer.rs

1use crate::model::RunStatus;
2use crate::store::StoreOperation;
3use chrono::{DateTime, Utc};
4use log::{debug, info, warn};
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum StateLoadSource {
8    New,
9    Restored,
10    Repaired,
11}
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum SchedulerStopReason {
15    Cancelled,
16    Shutdown,
17    Terminal,
18    ChannelClosed,
19}
20
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub enum SchedulerEvent {
23    StateLoaded {
24        job_id: String,
25        trigger_count: u32,
26        next_run_at: Option<DateTime<Utc>>,
27        source: StateLoadSource,
28    },
29    StateRepaired {
30        job_id: String,
31        trigger_count: u32,
32        previous_next_run_at: Option<DateTime<Utc>>,
33        repaired_next_run_at: Option<DateTime<Utc>>,
34    },
35    TriggerEmitted {
36        job_id: String,
37        scheduled_at: DateTime<Utc>,
38        catch_up: bool,
39        trigger_count: u32,
40    },
41    RunCompleted {
42        job_id: String,
43        scheduled_at: DateTime<Utc>,
44        catch_up: bool,
45        trigger_count: u32,
46        status: RunStatus,
47        error: Option<String>,
48    },
49    StoreDegraded {
50        job_id: String,
51        operation: StoreOperation,
52        error: String,
53    },
54    TerminalStateDeleted {
55        job_id: String,
56        trigger_count: u32,
57    },
58    SchedulerStopped {
59        job_id: String,
60        trigger_count: u32,
61        reason: SchedulerStopReason,
62    },
63}
64
65pub trait SchedulerObserver: Send + Sync + 'static {
66    fn on_event(&self, event: &SchedulerEvent);
67}
68
69#[derive(Debug, Clone, Copy, Default)]
70pub struct NoopObserver;
71
72impl SchedulerObserver for NoopObserver {
73    fn on_event(&self, _event: &SchedulerEvent) {}
74}
75
76#[derive(Debug, Clone, Copy, Default)]
77pub struct LogObserver;
78
79impl SchedulerObserver for LogObserver {
80    fn on_event(&self, event: &SchedulerEvent) {
81        match event {
82            SchedulerEvent::StateLoaded {
83                job_id,
84                trigger_count,
85                next_run_at,
86                source,
87            } => info!(
88                "scheduler state loaded job_id={} source={:?} trigger_count={} next_run_at={:?}",
89                job_id, source, trigger_count, next_run_at
90            ),
91            SchedulerEvent::StateRepaired {
92                job_id,
93                trigger_count,
94                previous_next_run_at,
95                repaired_next_run_at,
96            } => warn!(
97                "scheduler repaired state job_id={} trigger_count={} previous_next_run_at={:?} repaired_next_run_at={:?}",
98                job_id, trigger_count, previous_next_run_at, repaired_next_run_at
99            ),
100            SchedulerEvent::TriggerEmitted {
101                job_id,
102                scheduled_at,
103                catch_up,
104                trigger_count,
105            } => debug!(
106                "scheduler trigger emitted job_id={} scheduled_at={} catch_up={} trigger_count={}",
107                job_id, scheduled_at, catch_up, trigger_count
108            ),
109            SchedulerEvent::RunCompleted {
110                job_id,
111                scheduled_at,
112                catch_up,
113                trigger_count,
114                status,
115                error,
116            } => debug!(
117                "scheduler run completed job_id={} scheduled_at={} catch_up={} trigger_count={} status={:?} error={:?}",
118                job_id, scheduled_at, catch_up, trigger_count, status, error
119            ),
120            SchedulerEvent::StoreDegraded {
121                job_id,
122                operation,
123                error,
124            } => warn!(
125                "scheduler store degraded job_id={} operation={:?} error={}",
126                job_id, operation, error
127            ),
128            SchedulerEvent::TerminalStateDeleted {
129                job_id,
130                trigger_count,
131            } => info!(
132                "scheduler deleted terminal state job_id={} trigger_count={}",
133                job_id, trigger_count
134            ),
135            SchedulerEvent::SchedulerStopped {
136                job_id,
137                trigger_count,
138                reason,
139            } => info!(
140                "scheduler stopped job_id={} trigger_count={} reason={:?}",
141                job_id, trigger_count, reason
142            ),
143        }
144    }
145}