1use crate::model::RunStatus;
2use crate::store::StoreOperation;
3use chrono::{DateTime, Utc};
4use log::{debug, info, warn};
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum StateLoadSource {
8 New,
9 Restored,
10 Repaired,
11}
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum SchedulerStopReason {
15 Cancelled,
16 Shutdown,
17 Terminal,
18 ChannelClosed,
19}
20
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub enum SchedulerEvent {
23 StateLoaded {
24 job_id: String,
25 trigger_count: u32,
26 next_run_at: Option<DateTime<Utc>>,
27 source: StateLoadSource,
28 },
29 StateRepaired {
30 job_id: String,
31 trigger_count: u32,
32 previous_next_run_at: Option<DateTime<Utc>>,
33 repaired_next_run_at: Option<DateTime<Utc>>,
34 },
35 TriggerEmitted {
36 job_id: String,
37 scheduled_at: DateTime<Utc>,
38 catch_up: bool,
39 trigger_count: u32,
40 },
41 RunCompleted {
42 job_id: String,
43 scheduled_at: DateTime<Utc>,
44 catch_up: bool,
45 trigger_count: u32,
46 status: RunStatus,
47 error: Option<String>,
48 },
49 ExecutionGuardContended {
50 job_id: String,
51 scheduled_at: DateTime<Utc>,
52 catch_up: bool,
53 trigger_count: u32,
54 },
55 ExecutionGuardLost {
56 job_id: String,
57 scheduled_at: DateTime<Utc>,
58 catch_up: bool,
59 trigger_count: u32,
60 },
61 ExecutionGuardReleaseFailed {
62 job_id: String,
63 scheduled_at: DateTime<Utc>,
64 catch_up: bool,
65 trigger_count: u32,
66 error: String,
67 },
68 StoreDegraded {
69 job_id: String,
70 operation: StoreOperation,
71 error: String,
72 },
73 TerminalStateDeleted {
74 job_id: String,
75 trigger_count: u32,
76 },
77 SchedulerStopped {
78 job_id: String,
79 trigger_count: u32,
80 reason: SchedulerStopReason,
81 },
82}
83
84pub trait SchedulerObserver: Send + Sync + 'static {
85 fn on_event(&self, event: &SchedulerEvent);
86}
87
88#[derive(Debug, Clone, Copy, Default)]
89pub struct NoopObserver;
90
91impl SchedulerObserver for NoopObserver {
92 fn on_event(&self, _event: &SchedulerEvent) {}
93}
94
95#[derive(Debug, Clone, Copy, Default)]
96pub struct LogObserver;
97
98impl SchedulerObserver for LogObserver {
99 fn on_event(&self, event: &SchedulerEvent) {
100 match event {
101 SchedulerEvent::StateLoaded {
102 job_id,
103 trigger_count,
104 next_run_at,
105 source,
106 } => info!(
107 "scheduler state loaded job_id={} source={:?} trigger_count={} next_run_at={:?}",
108 job_id, source, trigger_count, next_run_at
109 ),
110 SchedulerEvent::StateRepaired {
111 job_id,
112 trigger_count,
113 previous_next_run_at,
114 repaired_next_run_at,
115 } => warn!(
116 "scheduler repaired state job_id={} trigger_count={} previous_next_run_at={:?} repaired_next_run_at={:?}",
117 job_id, trigger_count, previous_next_run_at, repaired_next_run_at
118 ),
119 SchedulerEvent::TriggerEmitted {
120 job_id,
121 scheduled_at,
122 catch_up,
123 trigger_count,
124 } => debug!(
125 "scheduler trigger emitted job_id={} scheduled_at={} catch_up={} trigger_count={}",
126 job_id, scheduled_at, catch_up, trigger_count
127 ),
128 SchedulerEvent::RunCompleted {
129 job_id,
130 scheduled_at,
131 catch_up,
132 trigger_count,
133 status,
134 error,
135 } => debug!(
136 "scheduler run completed job_id={} scheduled_at={} catch_up={} trigger_count={} status={:?} error={:?}",
137 job_id, scheduled_at, catch_up, trigger_count, status, error
138 ),
139 SchedulerEvent::ExecutionGuardContended {
140 job_id,
141 scheduled_at,
142 catch_up,
143 trigger_count,
144 } => debug!(
145 "scheduler execution guard contended job_id={} scheduled_at={} catch_up={} trigger_count={}",
146 job_id, scheduled_at, catch_up, trigger_count
147 ),
148 SchedulerEvent::ExecutionGuardLost {
149 job_id,
150 scheduled_at,
151 catch_up,
152 trigger_count,
153 } => warn!(
154 "scheduler execution guard lost job_id={} scheduled_at={} catch_up={} trigger_count={}",
155 job_id, scheduled_at, catch_up, trigger_count
156 ),
157 SchedulerEvent::ExecutionGuardReleaseFailed {
158 job_id,
159 scheduled_at,
160 catch_up,
161 trigger_count,
162 error,
163 } => warn!(
164 "scheduler execution guard release failed job_id={} scheduled_at={} catch_up={} trigger_count={} error={}",
165 job_id, scheduled_at, catch_up, trigger_count, error
166 ),
167 SchedulerEvent::StoreDegraded {
168 job_id,
169 operation,
170 error,
171 } => warn!(
172 "scheduler store degraded job_id={} operation={:?} error={}",
173 job_id, operation, error
174 ),
175 SchedulerEvent::TerminalStateDeleted {
176 job_id,
177 trigger_count,
178 } => info!(
179 "scheduler deleted terminal state job_id={} trigger_count={}",
180 job_id, trigger_count
181 ),
182 SchedulerEvent::SchedulerStopped {
183 job_id,
184 trigger_count,
185 reason,
186 } => info!(
187 "scheduler stopped job_id={} trigger_count={} reason={:?}",
188 job_id, trigger_count, reason
189 ),
190 }
191 }
192}