1use crate::model::RunStatus;
2use crate::store::StoreOperation;
3use chrono::{DateTime, Utc};
4use log::{debug, info, warn};
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum StateLoadSource {
8 New,
9 Restored,
10 Repaired,
11}
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum SchedulerStopReason {
15 Cancelled,
16 Shutdown,
17 Terminal,
18 ChannelClosed,
19}
20
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub enum SchedulerEvent {
23 StateLoaded {
24 job_id: String,
25 trigger_count: u32,
26 next_run_at: Option<DateTime<Utc>>,
27 source: StateLoadSource,
28 },
29 StateRepaired {
30 job_id: String,
31 trigger_count: u32,
32 previous_next_run_at: Option<DateTime<Utc>>,
33 repaired_next_run_at: Option<DateTime<Utc>>,
34 },
35 TriggerEmitted {
36 job_id: String,
37 scheduled_at: DateTime<Utc>,
38 catch_up: bool,
39 trigger_count: u32,
40 },
41 RunCompleted {
42 job_id: String,
43 scheduled_at: DateTime<Utc>,
44 catch_up: bool,
45 trigger_count: u32,
46 status: RunStatus,
47 error: Option<String>,
48 },
49 StoreDegraded {
50 job_id: String,
51 operation: StoreOperation,
52 error: String,
53 },
54 TerminalStateDeleted {
55 job_id: String,
56 trigger_count: u32,
57 },
58 SchedulerStopped {
59 job_id: String,
60 trigger_count: u32,
61 reason: SchedulerStopReason,
62 },
63}
64
65pub trait SchedulerObserver: Send + Sync + 'static {
66 fn on_event(&self, event: &SchedulerEvent);
67}
68
69#[derive(Debug, Clone, Copy, Default)]
70pub struct NoopObserver;
71
72impl SchedulerObserver for NoopObserver {
73 fn on_event(&self, _event: &SchedulerEvent) {}
74}
75
76#[derive(Debug, Clone, Copy, Default)]
77pub struct LogObserver;
78
79impl SchedulerObserver for LogObserver {
80 fn on_event(&self, event: &SchedulerEvent) {
81 match event {
82 SchedulerEvent::StateLoaded {
83 job_id,
84 trigger_count,
85 next_run_at,
86 source,
87 } => info!(
88 "scheduler state loaded job_id={} source={:?} trigger_count={} next_run_at={:?}",
89 job_id, source, trigger_count, next_run_at
90 ),
91 SchedulerEvent::StateRepaired {
92 job_id,
93 trigger_count,
94 previous_next_run_at,
95 repaired_next_run_at,
96 } => warn!(
97 "scheduler repaired state job_id={} trigger_count={} previous_next_run_at={:?} repaired_next_run_at={:?}",
98 job_id, trigger_count, previous_next_run_at, repaired_next_run_at
99 ),
100 SchedulerEvent::TriggerEmitted {
101 job_id,
102 scheduled_at,
103 catch_up,
104 trigger_count,
105 } => debug!(
106 "scheduler trigger emitted job_id={} scheduled_at={} catch_up={} trigger_count={}",
107 job_id, scheduled_at, catch_up, trigger_count
108 ),
109 SchedulerEvent::RunCompleted {
110 job_id,
111 scheduled_at,
112 catch_up,
113 trigger_count,
114 status,
115 error,
116 } => debug!(
117 "scheduler run completed job_id={} scheduled_at={} catch_up={} trigger_count={} status={:?} error={:?}",
118 job_id, scheduled_at, catch_up, trigger_count, status, error
119 ),
120 SchedulerEvent::StoreDegraded {
121 job_id,
122 operation,
123 error,
124 } => warn!(
125 "scheduler store degraded job_id={} operation={:?} error={}",
126 job_id, operation, error
127 ),
128 SchedulerEvent::TerminalStateDeleted {
129 job_id,
130 trigger_count,
131 } => info!(
132 "scheduler deleted terminal state job_id={} trigger_count={}",
133 job_id, trigger_count
134 ),
135 SchedulerEvent::SchedulerStopped {
136 job_id,
137 trigger_count,
138 reason,
139 } => info!(
140 "scheduler stopped job_id={} trigger_count={} reason={:?}",
141 job_id, trigger_count, reason
142 ),
143 }
144 }
145}