bestool_alertd/
scheduler.rs

1use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration};
2
3use jiff::Timestamp;
4use miette::Result;
5use tokio::{
6	sync::RwLock,
7	task::JoinHandle,
8	time::{interval, sleep},
9};
10use tracing::{debug, error, info, warn};
11
12use crate::{
13	EmailConfig, LogError,
14	alert::{AlertDefinition, InternalContext, TicketSource},
15	events::{EventContext, EventManager, EventType},
16	glob_resolver::{GlobResolver, ResolvedPaths},
17	loader::{LoadedAlerts, load_alerts_from_paths},
18	metrics,
19	targets::ResolvedTarget,
20};
21
22#[derive(Debug, Clone)]
23pub struct AlertState {
24	pub definition: AlertDefinition,
25	pub resolved_targets: Vec<ResolvedTarget>,
26	pub triggered_at: Option<Timestamp>,
27	pub last_sent_at: Option<Timestamp>,
28	pub last_output: Option<String>,
29	pub paused_until: Option<Timestamp>,
30}
31
32impl AlertState {
33	pub fn new(definition: AlertDefinition, resolved_targets: Vec<ResolvedTarget>) -> Self {
34		Self {
35			definition,
36			resolved_targets,
37			triggered_at: None,
38			last_sent_at: None,
39			last_output: None,
40			paused_until: None,
41		}
42	}
43
44	pub fn preserve_state_from(&mut self, old_state: &AlertState) {
45		self.triggered_at = old_state.triggered_at;
46		self.last_sent_at = old_state.last_sent_at;
47		self.last_output = old_state.last_output.clone();
48		self.paused_until = old_state.paused_until;
49	}
50}
51
52pub struct Scheduler {
53	glob_resolver: GlobResolver,
54	resolved_paths: Arc<RwLock<ResolvedPaths>>,
55	ctx: Arc<InternalContext>,
56	email: Option<EmailConfig>,
57	dry_run: bool,
58	alerts: Arc<RwLock<HashMap<PathBuf, Arc<RwLock<AlertState>>>>>,
59	tasks: Arc<RwLock<HashMap<PathBuf, JoinHandle<()>>>>,
60	event_manager: Arc<RwLock<Option<EventManager>>>,
61	external_targets:
62		Arc<RwLock<std::collections::HashMap<String, Vec<crate::targets::ExternalTarget>>>>,
63}
64
65impl Scheduler {
66	pub fn new(
67		alert_globs: Vec<String>,
68		ctx: Arc<InternalContext>,
69		email: Option<EmailConfig>,
70		dry_run: bool,
71	) -> Self {
72		let glob_resolver = GlobResolver::new(alert_globs);
73		Self {
74			glob_resolver,
75			resolved_paths: Arc::new(RwLock::new(ResolvedPaths {
76				dirs: Vec::new(),
77				files: Vec::new(),
78			})),
79			ctx,
80			email,
81			dry_run,
82			alerts: Arc::new(RwLock::new(HashMap::new())),
83			tasks: Arc::new(RwLock::new(HashMap::new())),
84			event_manager: Arc::new(RwLock::new(None)),
85			external_targets: Arc::new(RwLock::new(HashMap::new())),
86		}
87	}
88
89	pub fn get_event_manager(&self) -> Arc<RwLock<Option<EventManager>>> {
90		self.event_manager.clone()
91	}
92
93	pub async fn get_loaded_alerts(&self) -> Vec<PathBuf> {
94		let alerts = self.alerts.read().await;
95		let mut files: Vec<PathBuf> = alerts.keys().cloned().collect();
96		files.sort();
97		files
98	}
99
100	pub async fn get_alert_states(&self) -> HashMap<PathBuf, AlertState> {
101		let alerts = self.alerts.read().await;
102		let mut states = HashMap::new();
103		for (path, state_lock) in alerts.iter() {
104			let state = state_lock.read().await;
105			states.insert(path.clone(), state.clone());
106		}
107		states
108	}
109
110	pub async fn pause_alert(&self, path: &PathBuf, until: Timestamp) -> Result<bool> {
111		let alerts = self.alerts.read().await;
112		if let Some(alert_state) = alerts.get(path) {
113			let mut state = alert_state.write().await;
114			state.paused_until = Some(until);
115			info!(?path, until = %until, "paused alert");
116			Ok(true)
117		} else {
118			Ok(false)
119		}
120	}
121
122	pub async fn get_external_targets(
123		&self,
124	) -> std::collections::HashMap<String, Vec<crate::targets::ExternalTarget>> {
125		self.external_targets.read().await.clone()
126	}
127
128	pub async fn load_and_schedule_alerts(&self) -> Result<()> {
129		info!("resolving glob patterns and loading alerts");
130
131		let resolved = self.glob_resolver.resolve()?;
132		debug!(
133			dirs = resolved.dirs.len(),
134			files = resolved.files.len(),
135			"resolved paths from globs"
136		);
137
138		let LoadedAlerts {
139			alerts,
140			external_targets,
141			definition_errors,
142		} = load_alerts_from_paths(&resolved)?;
143
144		// Update resolved paths
145		*self.resolved_paths.write().await = resolved;
146
147		// Separate event alerts from regular alerts
148		let (event_alerts, regular_alerts): (Vec<_>, Vec<_>) = alerts
149			.into_iter()
150			.partition(|(alert, _)| matches!(alert.source, TicketSource::Event { .. }));
151
152		// Store external targets
153		*self.external_targets.write().await = external_targets.clone();
154
155		// Create event manager with event alerts and external targets
156		let event_manager = EventManager::new(event_alerts, &external_targets);
157		*self.event_manager.write().await = Some(event_manager.clone());
158
159		// Send definition error events for any failed alert loads
160		if !definition_errors.is_empty() {
161			info!(
162				count = definition_errors.len(),
163				"triggering definition-error events for failed alerts"
164			);
165		}
166		for def_err in definition_errors {
167			info!(
168				file = %def_err.file.display(),
169				"triggering definition-error event"
170			);
171			let event_context = EventContext::DefinitionError {
172				alert_file: def_err.file.display().to_string(),
173				error_message: def_err.error.clone(),
174			};
175			if let Err(err) = event_manager
176				.trigger_event(
177					EventType::DefinitionError,
178					&self.ctx,
179					self.email.as_ref(),
180					self.dry_run,
181					event_context,
182				)
183				.await
184			{
185				error!(
186					"failed to trigger definition-error event: {}",
187					LogError(&err)
188				);
189			}
190		}
191
192		if regular_alerts.is_empty() {
193			warn!("no regular alerts found");
194			return Ok(());
195		}
196
197		info!(count = regular_alerts.len(), "scheduling regular alerts");
198
199		// Get old alerts to preserve state
200		let old_alerts = self.alerts.read().await.clone();
201
202		let mut new_alerts = HashMap::new();
203		let mut tasks = HashMap::new();
204
205		for (definition, resolved_targets) in regular_alerts {
206			let file = definition.file.clone();
207
208			// Create new alert state
209			let mut new_state = AlertState::new(definition.clone(), resolved_targets.clone());
210
211			// Preserve state from old alert if it exists
212			if let Some(old_alert_lock) = old_alerts.get(&file) {
213				let old_state = old_alert_lock.read().await;
214				new_state.preserve_state_from(&old_state);
215			}
216
217			let state_lock = Arc::new(RwLock::new(new_state));
218			let task = self.spawn_alert_task(state_lock.clone());
219
220			new_alerts.insert(file.clone(), state_lock);
221			tasks.insert(file, task);
222		}
223
224		// Update alerts and tasks atomically
225		*self.alerts.write().await = new_alerts;
226		*self.tasks.write().await = tasks;
227
228		// Update metrics with count of loaded alerts
229		metrics::set_alerts_loaded(self.alerts.read().await.len());
230
231		Ok(())
232	}
233
234	pub async fn execute_all_alerts_once(&self) -> Result<()> {
235		info!("executing all alerts once");
236
237		let resolved = self.glob_resolver.resolve()?;
238		let LoadedAlerts {
239			alerts,
240			external_targets,
241			definition_errors,
242		} = load_alerts_from_paths(&resolved)?;
243
244		// Separate event alerts from regular alerts
245		let (event_alerts, regular_alerts): (Vec<_>, Vec<_>) = alerts
246			.into_iter()
247			.partition(|(alert, _)| matches!(alert.source, TicketSource::Event { .. }));
248
249		// Store external targets
250		*self.external_targets.write().await = external_targets.clone();
251
252		// Update event manager
253		let event_manager = EventManager::new(event_alerts, &external_targets);
254		*self.event_manager.write().await = Some(event_manager.clone());
255
256		// Send definition error events for any failed alert loads
257		if !definition_errors.is_empty() {
258			info!(
259				count = definition_errors.len(),
260				"triggering definition-error events for failed alerts"
261			);
262		}
263		for def_err in definition_errors {
264			info!(
265				file = %def_err.file.display(),
266				"triggering definition-error event"
267			);
268			let event_context = EventContext::DefinitionError {
269				alert_file: def_err.file.display().to_string(),
270				error_message: def_err.error.clone(),
271			};
272			if let Err(err) = event_manager
273				.trigger_event(
274					EventType::DefinitionError,
275					&self.ctx,
276					self.email.as_ref(),
277					self.dry_run,
278					event_context,
279				)
280				.await
281			{
282				error!(
283					"failed to trigger definition-error event: {}",
284					LogError(&err)
285				);
286			}
287		}
288
289		if regular_alerts.is_empty() {
290			warn!("no regular alerts found");
291			return Ok(());
292		}
293
294		info!(count = regular_alerts.len(), "executing alerts");
295
296		for (alert, resolved_targets) in regular_alerts {
297			let ctx = self.ctx.clone();
298			let email = self.email.clone();
299			let dry_run = self.dry_run;
300			let file = alert.file.clone();
301
302			debug!(?file, "executing alert");
303			if let Err(err) = alert
304				.execute(ctx, email.as_ref(), dry_run, &resolved_targets)
305				.await
306			{
307				error!(?file, "error executing alert: {}", LogError(&err));
308			}
309		}
310
311		Ok(())
312	}
313
314	pub async fn check_and_reload_if_paths_changed(&self) -> Result<()> {
315		debug!("checking if resolved paths have changed");
316
317		let new_resolved = self.glob_resolver.resolve()?;
318		let old_resolved = self.resolved_paths.read().await;
319
320		if new_resolved.differs_from(&old_resolved) {
321			drop(old_resolved); // Release read lock before reloading
322			info!("resolved paths have changed, reloading alerts");
323			self.reload_alerts().await?;
324		}
325
326		Ok(())
327	}
328
329	pub async fn get_resolved_paths(&self) -> Vec<PathBuf> {
330		let resolved = self.resolved_paths.read().await;
331		resolved
332			.all_paths()
333			.iter()
334			.map(|p| p.to_path_buf())
335			.collect()
336	}
337
338	pub async fn reload_alerts(&self) -> Result<()> {
339		info!("reloading alerts");
340
341		// Cancel all existing tasks
342		{
343			let mut tasks = self.tasks.write().await;
344			for (path, handle) in tasks.drain() {
345				debug!(?path, "cancelling alert task");
346				handle.abort();
347			}
348		}
349
350		// Load and schedule new alerts
351		self.load_and_schedule_alerts().await
352	}
353
354	fn spawn_alert_task(&self, alert_state: Arc<RwLock<AlertState>>) -> JoinHandle<()> {
355		fn serialize_context_for_comparison(
356			context: &tera::Context,
357			when_changed: &crate::alert::WhenChanged,
358		) -> String {
359			use crate::alert::WhenChanged;
360
361			// Get the rows from the context
362			let rows = match context.get("rows") {
363				Some(value) => value,
364				None => return String::new(),
365			};
366
367			// Parse rows as array of objects
368			let rows_array = match rows.as_array() {
369				Some(arr) => arr,
370				None => return serde_json::to_string(rows).unwrap_or_default(),
371			};
372
373			match when_changed {
374				WhenChanged::Boolean(true) => {
375					// Simple mode: serialize everything
376					serde_json::to_string(rows).unwrap_or_default()
377				}
378				WhenChanged::Boolean(false) => {
379					// Not enabled
380					String::new()
381				}
382				WhenChanged::Detailed(config) => {
383					// Filter columns based on config
384					let filtered_rows: Vec<serde_json::Map<String, serde_json::Value>> = rows_array
385						.iter()
386						.filter_map(|row| {
387							let obj = row.as_object()?;
388							let mut filtered = serde_json::Map::new();
389
390							for (key, value) in obj {
391								let include = if !config.only.is_empty() {
392									// Only mode: include only specified columns
393									config.only.contains(key)
394								} else if !config.except.is_empty() {
395									// Except mode: include all except specified columns
396									!config.except.contains(key)
397								} else {
398									// No filters specified, include all
399									true
400								};
401
402								if include {
403									filtered.insert(key.clone(), value.clone());
404								}
405							}
406
407							Some(filtered)
408						})
409						.collect();
410
411					serde_json::to_string(&filtered_rows).unwrap_or_default()
412				}
413			}
414		}
415
416		let ctx = self.ctx.clone();
417		let email = self.email.clone();
418		let dry_run = self.dry_run;
419		let event_manager = self.event_manager.clone();
420
421		tokio::spawn(async move {
422			// Read initial values from state
423			let (file, interval_duration) = {
424				let state = alert_state.read().await;
425				(
426					state.definition.file.clone(),
427					state.definition.interval_duration,
428				)
429			};
430			debug!(?file, ?interval_duration, "starting alert task");
431
432			// Add a small random delay to prevent all alerts from firing at exactly the same time
433			let jitter = Duration::from_millis(rand::random::<u64>() % 5000);
434			sleep(jitter).await;
435
436			let mut ticker = interval(interval_duration);
437			ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
438
439			loop {
440				ticker.tick().await;
441
442				// Check if alert is paused
443				let is_paused = {
444					let state = alert_state.read().await;
445					if let Some(until) = state.paused_until {
446						let now = Timestamp::now();
447						now < until
448					} else {
449						false
450					}
451				};
452
453				if is_paused {
454					debug!(?file, "alert is paused, skipping execution");
455					continue;
456				}
457
458				debug!(?file, "executing alert");
459
460				// Get alert definition and state
461				let (alert, resolved_targets, was_triggered, always_send, when_changed) = {
462					let state = alert_state.read().await;
463					(
464						state.definition.clone(),
465						state.resolved_targets.clone(),
466						state.triggered_at.is_some(),
467						state.definition.always_send.clone(),
468						state.definition.when_changed.clone(),
469					)
470				};
471
472				// Check the triggering state
473				let mut tera_ctx = crate::templates::build_context(&alert, chrono::Utc::now());
474				let now = chrono::Utc::now();
475				let not_before = now - alert.interval_duration;
476
477				let is_triggering = match alert
478					.read_sources(&ctx.pg_pool, not_before, &mut tera_ctx, was_triggered)
479					.await
480				{
481					Ok(flow) => flow.is_continue(),
482					Err(err) => {
483						error!(?file, "error reading sources: {}", LogError(&err));
484						metrics::inc_alerts_failed();
485
486						// Trigger source_error event
487						if let Some(ref event_mgr) = *event_manager.read().await {
488							let event_context = EventContext::SourceError {
489								alert_file: file.display().to_string(),
490								error_message: format!("{err:?}"),
491							};
492							if let Err(event_err) = event_mgr
493								.trigger_event(
494									EventType::SourceError,
495									&ctx,
496									email.as_ref(),
497									dry_run,
498									event_context,
499								)
500								.await
501							{
502								error!(
503									"failed to trigger source_error event: {}",
504									LogError(&event_err)
505								);
506							}
507						}
508						continue;
509					}
510				};
511
512				if is_triggering {
513					// Alert is in triggering state
514					let mut state = alert_state.write().await;
515
516					let mut should_send = match &always_send {
517						crate::alert::AlwaysSend::Boolean(true) => true,
518						crate::alert::AlwaysSend::Boolean(false) => !was_triggered,
519						crate::alert::AlwaysSend::Timed(config) => {
520							// Check if enough time has passed since last send
521							match state.last_sent_at {
522								Some(last_sent_time) => {
523									let now = Timestamp::now();
524									let elapsed = now.duration_since(last_sent_time);
525									if let Ok(after_duration) =
526										jiff::SignedDuration::try_from(config.after_duration)
527									{
528										elapsed >= after_duration
529									} else {
530										false
531									}
532								}
533								None => true, // Never sent before, should send
534							}
535						}
536					};
537
538					// Check when-changed logic if configured
539					if should_send
540						&& !matches!(when_changed, crate::alert::WhenChanged::Boolean(false))
541					{
542						let current_output =
543							serialize_context_for_comparison(&tera_ctx, &when_changed);
544
545						let output_changed = match &state.last_output {
546							Some(prev_output) => prev_output != &current_output,
547							None => true, // First run, consider it changed
548						};
549
550						if output_changed {
551							debug!(?file, "output changed, will send");
552							state.last_output = Some(current_output);
553						} else {
554							debug!(?file, "output unchanged, skipping");
555							should_send = false;
556						}
557					}
558
559					if should_send {
560						debug!(?file, "alert triggered, sending notifications");
561
562						// Send to targets
563						for target in &resolved_targets {
564							if let Err(err) = target
565								.send(&alert, &mut tera_ctx, email.as_ref(), dry_run)
566								.await
567							{
568								error!("sending: {}", LogError(&err));
569							}
570						}
571
572						metrics::inc_alerts_sent();
573
574						// Update last sent timestamp
575						state.last_sent_at = Some(Timestamp::now());
576					} else {
577						debug!(?file, "alert still triggered, not sending (already sent)");
578					}
579
580					// Update the triggered timestamp even if we didn't send
581					if !was_triggered {
582						state.triggered_at = Some(Timestamp::now());
583					}
584				} else {
585					// Alert is not in triggering state
586					if was_triggered {
587						info!(?file, "alert cleared");
588						let mut state = alert_state.write().await;
589						state.triggered_at = None;
590						state.last_sent_at = None;
591
592						// Clear last output when alert clears
593						if !matches!(when_changed, crate::alert::WhenChanged::Boolean(false)) {
594							state.last_output = None;
595						}
596					}
597				}
598			}
599		})
600	}
601
602	pub async fn shutdown(&self) {
603		info!("shutting down scheduler");
604		let mut tasks = self.tasks.write().await;
605		for (path, handle) in tasks.drain() {
606			debug!(?path, "cancelling alert task");
607			handle.abort();
608		}
609	}
610}