1use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration};
2
3use jiff::Timestamp;
4use miette::Result;
5use tokio::{
6 sync::RwLock,
7 task::JoinHandle,
8 time::{interval, sleep},
9};
10use tracing::{debug, error, info, warn};
11
12use crate::{
13 EmailConfig, LogError,
14 alert::{AlertDefinition, InternalContext, TicketSource},
15 events::{EventContext, EventManager, EventType},
16 glob_resolver::{GlobResolver, ResolvedPaths},
17 loader::{LoadedAlerts, load_alerts_from_paths},
18 metrics,
19 targets::ResolvedTarget,
20};
21
22#[derive(Debug, Clone)]
23pub struct AlertState {
24 pub definition: AlertDefinition,
25 pub resolved_targets: Vec<ResolvedTarget>,
26 pub triggered_at: Option<Timestamp>,
27 pub last_sent_at: Option<Timestamp>,
28 pub last_output: Option<String>,
29 pub paused_until: Option<Timestamp>,
30}
31
32impl AlertState {
33 pub fn new(definition: AlertDefinition, resolved_targets: Vec<ResolvedTarget>) -> Self {
34 Self {
35 definition,
36 resolved_targets,
37 triggered_at: None,
38 last_sent_at: None,
39 last_output: None,
40 paused_until: None,
41 }
42 }
43
44 pub fn preserve_state_from(&mut self, old_state: &AlertState) {
45 self.triggered_at = old_state.triggered_at;
46 self.last_sent_at = old_state.last_sent_at;
47 self.last_output = old_state.last_output.clone();
48 self.paused_until = old_state.paused_until;
49 }
50}
51
52pub struct Scheduler {
53 glob_resolver: GlobResolver,
54 resolved_paths: Arc<RwLock<ResolvedPaths>>,
55 ctx: Arc<InternalContext>,
56 email: Option<EmailConfig>,
57 dry_run: bool,
58 alerts: Arc<RwLock<HashMap<PathBuf, Arc<RwLock<AlertState>>>>>,
59 tasks: Arc<RwLock<HashMap<PathBuf, JoinHandle<()>>>>,
60 event_manager: Arc<RwLock<Option<EventManager>>>,
61 external_targets:
62 Arc<RwLock<std::collections::HashMap<String, Vec<crate::targets::ExternalTarget>>>>,
63}
64
65impl Scheduler {
66 pub fn new(
67 alert_globs: Vec<String>,
68 ctx: Arc<InternalContext>,
69 email: Option<EmailConfig>,
70 dry_run: bool,
71 ) -> Self {
72 let glob_resolver = GlobResolver::new(alert_globs);
73 Self {
74 glob_resolver,
75 resolved_paths: Arc::new(RwLock::new(ResolvedPaths {
76 dirs: Vec::new(),
77 files: Vec::new(),
78 })),
79 ctx,
80 email,
81 dry_run,
82 alerts: Arc::new(RwLock::new(HashMap::new())),
83 tasks: Arc::new(RwLock::new(HashMap::new())),
84 event_manager: Arc::new(RwLock::new(None)),
85 external_targets: Arc::new(RwLock::new(HashMap::new())),
86 }
87 }
88
89 pub fn get_event_manager(&self) -> Arc<RwLock<Option<EventManager>>> {
90 self.event_manager.clone()
91 }
92
93 pub async fn get_loaded_alerts(&self) -> Vec<PathBuf> {
94 let alerts = self.alerts.read().await;
95 let mut files: Vec<PathBuf> = alerts.keys().cloned().collect();
96 files.sort();
97 files
98 }
99
100 pub async fn get_alert_states(&self) -> HashMap<PathBuf, AlertState> {
101 let alerts = self.alerts.read().await;
102 let mut states = HashMap::new();
103 for (path, state_lock) in alerts.iter() {
104 let state = state_lock.read().await;
105 states.insert(path.clone(), state.clone());
106 }
107 states
108 }
109
110 pub async fn pause_alert(&self, path: &PathBuf, until: Timestamp) -> Result<bool> {
111 let alerts = self.alerts.read().await;
112 if let Some(alert_state) = alerts.get(path) {
113 let mut state = alert_state.write().await;
114 state.paused_until = Some(until);
115 info!(?path, until = %until, "paused alert");
116 Ok(true)
117 } else {
118 Ok(false)
119 }
120 }
121
122 pub async fn get_external_targets(
123 &self,
124 ) -> std::collections::HashMap<String, Vec<crate::targets::ExternalTarget>> {
125 self.external_targets.read().await.clone()
126 }
127
128 pub async fn load_and_schedule_alerts(&self) -> Result<()> {
129 info!("resolving glob patterns and loading alerts");
130
131 let resolved = self.glob_resolver.resolve()?;
132 debug!(
133 dirs = resolved.dirs.len(),
134 files = resolved.files.len(),
135 "resolved paths from globs"
136 );
137
138 let LoadedAlerts {
139 alerts,
140 external_targets,
141 definition_errors,
142 } = load_alerts_from_paths(&resolved)?;
143
144 *self.resolved_paths.write().await = resolved;
146
147 let (event_alerts, regular_alerts): (Vec<_>, Vec<_>) = alerts
149 .into_iter()
150 .partition(|(alert, _)| matches!(alert.source, TicketSource::Event { .. }));
151
152 *self.external_targets.write().await = external_targets.clone();
154
155 let event_manager = EventManager::new(event_alerts, &external_targets);
157 *self.event_manager.write().await = Some(event_manager.clone());
158
159 if !definition_errors.is_empty() {
161 info!(
162 count = definition_errors.len(),
163 "triggering definition-error events for failed alerts"
164 );
165 }
166 for def_err in definition_errors {
167 info!(
168 file = %def_err.file.display(),
169 "triggering definition-error event"
170 );
171 let event_context = EventContext::DefinitionError {
172 alert_file: def_err.file.display().to_string(),
173 error_message: def_err.error.clone(),
174 };
175 if let Err(err) = event_manager
176 .trigger_event(
177 EventType::DefinitionError,
178 &self.ctx,
179 self.email.as_ref(),
180 self.dry_run,
181 event_context,
182 )
183 .await
184 {
185 error!(
186 "failed to trigger definition-error event: {}",
187 LogError(&err)
188 );
189 }
190 }
191
192 if regular_alerts.is_empty() {
193 warn!("no regular alerts found");
194 return Ok(());
195 }
196
197 info!(count = regular_alerts.len(), "scheduling regular alerts");
198
199 let old_alerts = self.alerts.read().await.clone();
201
202 let mut new_alerts = HashMap::new();
203 let mut tasks = HashMap::new();
204
205 for (definition, resolved_targets) in regular_alerts {
206 let file = definition.file.clone();
207
208 let mut new_state = AlertState::new(definition.clone(), resolved_targets.clone());
210
211 if let Some(old_alert_lock) = old_alerts.get(&file) {
213 let old_state = old_alert_lock.read().await;
214 new_state.preserve_state_from(&old_state);
215 }
216
217 let state_lock = Arc::new(RwLock::new(new_state));
218 let task = self.spawn_alert_task(state_lock.clone());
219
220 new_alerts.insert(file.clone(), state_lock);
221 tasks.insert(file, task);
222 }
223
224 *self.alerts.write().await = new_alerts;
226 *self.tasks.write().await = tasks;
227
228 metrics::set_alerts_loaded(self.alerts.read().await.len());
230
231 Ok(())
232 }
233
234 pub async fn execute_all_alerts_once(&self) -> Result<()> {
235 info!("executing all alerts once");
236
237 let resolved = self.glob_resolver.resolve()?;
238 let LoadedAlerts {
239 alerts,
240 external_targets,
241 definition_errors,
242 } = load_alerts_from_paths(&resolved)?;
243
244 let (event_alerts, regular_alerts): (Vec<_>, Vec<_>) = alerts
246 .into_iter()
247 .partition(|(alert, _)| matches!(alert.source, TicketSource::Event { .. }));
248
249 *self.external_targets.write().await = external_targets.clone();
251
252 let event_manager = EventManager::new(event_alerts, &external_targets);
254 *self.event_manager.write().await = Some(event_manager.clone());
255
256 if !definition_errors.is_empty() {
258 info!(
259 count = definition_errors.len(),
260 "triggering definition-error events for failed alerts"
261 );
262 }
263 for def_err in definition_errors {
264 info!(
265 file = %def_err.file.display(),
266 "triggering definition-error event"
267 );
268 let event_context = EventContext::DefinitionError {
269 alert_file: def_err.file.display().to_string(),
270 error_message: def_err.error.clone(),
271 };
272 if let Err(err) = event_manager
273 .trigger_event(
274 EventType::DefinitionError,
275 &self.ctx,
276 self.email.as_ref(),
277 self.dry_run,
278 event_context,
279 )
280 .await
281 {
282 error!(
283 "failed to trigger definition-error event: {}",
284 LogError(&err)
285 );
286 }
287 }
288
289 if regular_alerts.is_empty() {
290 warn!("no regular alerts found");
291 return Ok(());
292 }
293
294 info!(count = regular_alerts.len(), "executing alerts");
295
296 for (alert, resolved_targets) in regular_alerts {
297 let ctx = self.ctx.clone();
298 let email = self.email.clone();
299 let dry_run = self.dry_run;
300 let file = alert.file.clone();
301
302 debug!(?file, "executing alert");
303 if let Err(err) = alert
304 .execute(ctx, email.as_ref(), dry_run, &resolved_targets)
305 .await
306 {
307 error!(?file, "error executing alert: {}", LogError(&err));
308 }
309 }
310
311 Ok(())
312 }
313
314 pub async fn check_and_reload_if_paths_changed(&self) -> Result<()> {
315 debug!("checking if resolved paths have changed");
316
317 let new_resolved = self.glob_resolver.resolve()?;
318 let old_resolved = self.resolved_paths.read().await;
319
320 if new_resolved.differs_from(&old_resolved) {
321 drop(old_resolved); info!("resolved paths have changed, reloading alerts");
323 self.reload_alerts().await?;
324 }
325
326 Ok(())
327 }
328
329 pub async fn get_resolved_paths(&self) -> Vec<PathBuf> {
330 let resolved = self.resolved_paths.read().await;
331 resolved
332 .all_paths()
333 .iter()
334 .map(|p| p.to_path_buf())
335 .collect()
336 }
337
338 pub async fn reload_alerts(&self) -> Result<()> {
339 info!("reloading alerts");
340
341 {
343 let mut tasks = self.tasks.write().await;
344 for (path, handle) in tasks.drain() {
345 debug!(?path, "cancelling alert task");
346 handle.abort();
347 }
348 }
349
350 self.load_and_schedule_alerts().await
352 }
353
354 fn spawn_alert_task(&self, alert_state: Arc<RwLock<AlertState>>) -> JoinHandle<()> {
355 fn serialize_context_for_comparison(
356 context: &tera::Context,
357 when_changed: &crate::alert::WhenChanged,
358 ) -> String {
359 use crate::alert::WhenChanged;
360
361 let rows = match context.get("rows") {
363 Some(value) => value,
364 None => return String::new(),
365 };
366
367 let rows_array = match rows.as_array() {
369 Some(arr) => arr,
370 None => return serde_json::to_string(rows).unwrap_or_default(),
371 };
372
373 match when_changed {
374 WhenChanged::Boolean(true) => {
375 serde_json::to_string(rows).unwrap_or_default()
377 }
378 WhenChanged::Boolean(false) => {
379 String::new()
381 }
382 WhenChanged::Detailed(config) => {
383 let filtered_rows: Vec<serde_json::Map<String, serde_json::Value>> = rows_array
385 .iter()
386 .filter_map(|row| {
387 let obj = row.as_object()?;
388 let mut filtered = serde_json::Map::new();
389
390 for (key, value) in obj {
391 let include = if !config.only.is_empty() {
392 config.only.contains(key)
394 } else if !config.except.is_empty() {
395 !config.except.contains(key)
397 } else {
398 true
400 };
401
402 if include {
403 filtered.insert(key.clone(), value.clone());
404 }
405 }
406
407 Some(filtered)
408 })
409 .collect();
410
411 serde_json::to_string(&filtered_rows).unwrap_or_default()
412 }
413 }
414 }
415
416 let ctx = self.ctx.clone();
417 let email = self.email.clone();
418 let dry_run = self.dry_run;
419 let event_manager = self.event_manager.clone();
420
421 tokio::spawn(async move {
422 let (file, interval_duration) = {
424 let state = alert_state.read().await;
425 (
426 state.definition.file.clone(),
427 state.definition.interval_duration,
428 )
429 };
430 debug!(?file, ?interval_duration, "starting alert task");
431
432 let jitter = Duration::from_millis(rand::random::<u64>() % 5000);
434 sleep(jitter).await;
435
436 let mut ticker = interval(interval_duration);
437 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
438
439 loop {
440 ticker.tick().await;
441
442 let is_paused = {
444 let state = alert_state.read().await;
445 if let Some(until) = state.paused_until {
446 let now = Timestamp::now();
447 now < until
448 } else {
449 false
450 }
451 };
452
453 if is_paused {
454 debug!(?file, "alert is paused, skipping execution");
455 continue;
456 }
457
458 debug!(?file, "executing alert");
459
460 let (alert, resolved_targets, was_triggered, always_send, when_changed) = {
462 let state = alert_state.read().await;
463 (
464 state.definition.clone(),
465 state.resolved_targets.clone(),
466 state.triggered_at.is_some(),
467 state.definition.always_send.clone(),
468 state.definition.when_changed.clone(),
469 )
470 };
471
472 let mut tera_ctx = crate::templates::build_context(&alert, chrono::Utc::now());
474 let now = chrono::Utc::now();
475 let not_before = now - alert.interval_duration;
476
477 let is_triggering = match alert
478 .read_sources(&ctx.pg_pool, not_before, &mut tera_ctx, was_triggered)
479 .await
480 {
481 Ok(flow) => flow.is_continue(),
482 Err(err) => {
483 error!(?file, "error reading sources: {}", LogError(&err));
484 metrics::inc_alerts_failed();
485
486 if let Some(ref event_mgr) = *event_manager.read().await {
488 let event_context = EventContext::SourceError {
489 alert_file: file.display().to_string(),
490 error_message: format!("{err:?}"),
491 };
492 if let Err(event_err) = event_mgr
493 .trigger_event(
494 EventType::SourceError,
495 &ctx,
496 email.as_ref(),
497 dry_run,
498 event_context,
499 )
500 .await
501 {
502 error!(
503 "failed to trigger source_error event: {}",
504 LogError(&event_err)
505 );
506 }
507 }
508 continue;
509 }
510 };
511
512 if is_triggering {
513 let mut state = alert_state.write().await;
515
516 let mut should_send = match &always_send {
517 crate::alert::AlwaysSend::Boolean(true) => true,
518 crate::alert::AlwaysSend::Boolean(false) => !was_triggered,
519 crate::alert::AlwaysSend::Timed(config) => {
520 match state.last_sent_at {
522 Some(last_sent_time) => {
523 let now = Timestamp::now();
524 let elapsed = now.duration_since(last_sent_time);
525 if let Ok(after_duration) =
526 jiff::SignedDuration::try_from(config.after_duration)
527 {
528 elapsed >= after_duration
529 } else {
530 false
531 }
532 }
533 None => true, }
535 }
536 };
537
538 if should_send
540 && !matches!(when_changed, crate::alert::WhenChanged::Boolean(false))
541 {
542 let current_output =
543 serialize_context_for_comparison(&tera_ctx, &when_changed);
544
545 let output_changed = match &state.last_output {
546 Some(prev_output) => prev_output != ¤t_output,
547 None => true, };
549
550 if output_changed {
551 debug!(?file, "output changed, will send");
552 state.last_output = Some(current_output);
553 } else {
554 debug!(?file, "output unchanged, skipping");
555 should_send = false;
556 }
557 }
558
559 if should_send {
560 debug!(?file, "alert triggered, sending notifications");
561
562 for target in &resolved_targets {
564 if let Err(err) = target
565 .send(&alert, &mut tera_ctx, email.as_ref(), dry_run)
566 .await
567 {
568 error!("sending: {}", LogError(&err));
569 }
570 }
571
572 metrics::inc_alerts_sent();
573
574 state.last_sent_at = Some(Timestamp::now());
576 } else {
577 debug!(?file, "alert still triggered, not sending (already sent)");
578 }
579
580 if !was_triggered {
582 state.triggered_at = Some(Timestamp::now());
583 }
584 } else {
585 if was_triggered {
587 info!(?file, "alert cleared");
588 let mut state = alert_state.write().await;
589 state.triggered_at = None;
590 state.last_sent_at = None;
591
592 if !matches!(when_changed, crate::alert::WhenChanged::Boolean(false)) {
594 state.last_output = None;
595 }
596 }
597 }
598 }
599 })
600 }
601
602 pub async fn shutdown(&self) {
603 info!("shutting down scheduler");
604 let mut tasks = self.tasks.write().await;
605 for (path, handle) in tasks.drain() {
606 debug!(?path, "cancelling alert task");
607 handle.abort();
608 }
609 }
610}