use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use chrono::offset::LocalResult;
use chrono::{
DateTime, Datelike, Local, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc, Weekday,
};
use chrono_tz::Tz;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::common::message::{Message, SharedMessage};
use crate::common::types::{Notify, Severity};
use crate::config::{
NotifyActiveWindowDay, SystemNotifyActiveWindowConfig, SystemNotifySilenceConfig,
};
use crate::error::{Error, Result};
use crate::sink::Sink;
use crate::transform::value::ValueSource;
#[cfg(feature = "http-client")]
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
#[cfg(feature = "http-client")]
use reqwest::{Client, Method, Url};
#[cfg(feature = "notify")]
use lettre::message::Mailbox;
#[cfg(feature = "notify")]
use lettre::transport::smtp::authentication::Credentials;
#[cfg(feature = "notify")]
use lettre::{AsyncSmtpTransport, AsyncTransport, Message as EmailMessage, Tokio1Executor};
fn default_timeout() -> Duration {
Duration::from_secs(30)
}
fn default_webhook_body() -> WebhookBody {
WebhookBody::Full
}
fn default_telegram_api_base() -> String {
"https://api.telegram.org".to_string()
}
fn default_security() -> SmtpSecurity {
SmtpSecurity::StartTls
}
fn severity_label(severity: Severity) -> &'static str {
match severity {
Severity::Debug => "debug",
Severity::Info => "info",
Severity::Warning => "warning",
Severity::Error => "error",
Severity::Critical => "critical",
}
}
fn parse_notify(msg: &Message) -> Option<Notify> {
serde_json::from_value::<Notify>(msg.payload.clone()).ok()
}
fn format_label_value(value: &Value) -> String {
match value {
Value::String(s) => s.clone(),
other => other.to_string(),
}
}
fn build_labels_text(labels: &serde_json::Map<String, Value>) -> Option<String> {
if labels.is_empty() {
return None;
}
let mut pairs: Vec<_> = labels.iter().collect();
pairs.sort_by(|a, b| a.0.cmp(b.0));
let formatted = pairs
.into_iter()
.map(|(k, v)| format!("{}: {}", k, format_label_value(v)))
.collect::<Vec<_>>()
.join("\n");
Some(formatted)
}
fn message_for_template<'a>(msg: &'a Message) -> Cow<'a, Message> {
if let Value::Object(payload) = &msg.payload {
if payload.contains_key("labels_text") {
return Cow::Borrowed(msg);
}
if let Some(Value::Object(labels)) = payload.get("labels")
&& let Some(text) = build_labels_text(labels)
{
let mut cloned = msg.clone();
if let Value::Object(payload) = &mut cloned.payload {
payload.insert("labels_text".to_string(), Value::String(text));
}
return Cow::Owned(cloned);
}
}
Cow::Borrowed(msg)
}
fn default_message_text(msg: &Message) -> String {
if let Some(notify) = parse_notify(msg) {
let mut text = format!(
"[{}] {}: {}",
severity_label(notify.severity),
notify.name,
notify.message
);
if !notify.labels.is_empty() {
let mut labels: Vec<_> = notify.labels.iter().collect();
labels.sort_by(|a, b| a.0.cmp(b.0));
let formatted = labels
.into_iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>()
.join(", ");
text.push_str(&format!(" | labels: {}", formatted));
}
return text;
}
match &msg.payload {
Value::String(s) => s.clone(),
other => other.to_string(),
}
}
fn default_subject(msg: &Message) -> String {
if let Some(notify) = parse_notify(msg) {
return format!("[{}] {}", severity_label(notify.severity), notify.name);
}
"Pipeflow notification".to_string()
}
fn default_silence_key(msg: &Message) -> String {
if let Some(notify) = parse_notify(msg) {
let mut labels: Vec<_> = notify.labels.iter().collect();
labels.sort_by(|a, b| a.0.cmp(b.0));
let formatted = labels
.into_iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>()
.join(",");
return format!(
"{}|{}|{}|{}",
severity_label(notify.severity),
notify.name,
notify.message,
formatted
);
}
match &msg.payload {
Value::String(s) => s.clone(),
other => other.to_string(),
}
}
fn resolve_silence_key(template: Option<&ValueSource>, msg: &Message) -> String {
let resolved = template
.map(|s| s.resolve_to_string(msg, || default_silence_key(msg)))
.unwrap_or_else(|| default_silence_key(msg));
if resolved.trim().is_empty() {
return default_silence_key(msg);
}
resolved
}
fn compile_template(value: Option<String>, label: &str) -> Result<Option<ValueSource>> {
ValueSource::compile_optional(value.as_deref())
.map_err(|e| Error::config(format!("notify sink invalid {}: {}", label, e)))
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "provider", rename_all = "lowercase")]
pub enum NotifySinkConfig {
Email(EmailNotifyConfig),
Webhook(WebhookNotifyConfig),
Telegram(TelegramNotifyConfig),
}
trait NotifyConfigCommon {
fn min_severity(&self) -> Option<Severity>;
fn active_window(&self) -> Option<&ActiveWindowConfigDetail>;
fn silence(&self) -> Option<&SilenceConfigDetail>;
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WebhookBody {
Payload,
Full,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookNotifyConfig {
pub url: String,
#[serde(default = "default_method")]
pub method: String,
#[serde(default)]
pub headers: HashMap<String, String>,
#[serde(default = "default_timeout", with = "humantime_serde")]
pub timeout: Duration,
#[serde(default = "default_webhook_body")]
pub body: WebhookBody,
#[serde(default)]
pub message: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub min_severity: Option<Severity>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub active_window: Option<ActiveWindowConfigDetail>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub silence: Option<SilenceConfigDetail>,
}
fn default_method() -> String {
"POST".to_string()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TelegramNotifyConfig {
pub bot_token: String,
pub chat_id: String,
#[serde(default = "default_telegram_api_base")]
pub api_base_url: String,
#[serde(default)]
pub parse_mode: Option<String>,
#[serde(default)]
pub disable_web_page_preview: Option<bool>,
#[serde(default = "default_timeout", with = "humantime_serde")]
pub timeout: Duration,
#[serde(default)]
pub message: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub min_severity: Option<Severity>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub active_window: Option<ActiveWindowConfigDetail>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub silence: Option<SilenceConfigDetail>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailNotifyConfig {
pub smtp: SmtpConfig,
pub from: String,
pub to: Vec<String>,
#[serde(default)]
pub subject: Option<String>,
#[serde(default)]
pub message: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub min_severity: Option<Severity>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub active_window: Option<ActiveWindowConfigDetail>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub silence: Option<SilenceConfigDetail>,
}
impl NotifyConfigCommon for WebhookNotifyConfig {
fn min_severity(&self) -> Option<Severity> {
self.min_severity
}
fn active_window(&self) -> Option<&ActiveWindowConfigDetail> {
self.active_window.as_ref()
}
fn silence(&self) -> Option<&SilenceConfigDetail> {
self.silence.as_ref()
}
}
impl NotifyConfigCommon for TelegramNotifyConfig {
fn min_severity(&self) -> Option<Severity> {
self.min_severity
}
fn active_window(&self) -> Option<&ActiveWindowConfigDetail> {
self.active_window.as_ref()
}
fn silence(&self) -> Option<&SilenceConfigDetail> {
self.silence.as_ref()
}
}
impl NotifyConfigCommon for EmailNotifyConfig {
fn min_severity(&self) -> Option<Severity> {
self.min_severity
}
fn active_window(&self) -> Option<&ActiveWindowConfigDetail> {
self.active_window.as_ref()
}
fn silence(&self) -> Option<&SilenceConfigDetail> {
self.silence.as_ref()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SilenceConfigDetail {
#[serde(default, with = "humantime_serde::option")]
pub window: Option<Duration>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub key: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ActiveWindowConfigDetail {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub start: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub end: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub timezone: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub days: Option<Vec<NotifyActiveWindowDay>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub bypass_severity: Option<Severity>,
}
fn parse_active_window_time(label: &str, value: &str) -> Result<NaiveTime> {
let trimmed = value.trim();
if trimmed.is_empty() {
return Err(Error::config(format!("{} must be non-empty", label)));
}
NaiveTime::parse_from_str(trimmed, "%H:%M").map_err(|e| {
Error::config(format!(
"{} must be in HH:MM format (got '{}'): {}",
label, value, e
))
})
}
fn parse_active_window_timezone(
label: &str,
value: Option<String>,
) -> Result<ActiveWindowTimeZone> {
let Some(value) = value else {
return Ok(ActiveWindowTimeZone::Local);
};
let trimmed = value.trim();
if trimmed.is_empty() {
return Err(Error::config(format!("{} must be non-empty", label)));
}
let tz = trimmed
.parse::<Tz>()
.map_err(|e| Error::config(format!("{} has invalid value '{}': {}", label, trimmed, e)))?;
Ok(ActiveWindowTimeZone::Named(tz))
}
fn severity_rank(severity: Severity) -> u8 {
match severity {
Severity::Debug => 0,
Severity::Info => 1,
Severity::Warning => 2,
Severity::Error => 3,
Severity::Critical => 4,
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SmtpConfig {
pub server: String,
#[serde(default)]
pub port: Option<u16>,
#[serde(default)]
pub username: Option<String>,
#[serde(default)]
pub password: Option<String>,
#[serde(default = "default_security")]
pub security: SmtpSecurity,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum SmtpSecurity {
StartTls,
None,
}
struct WebhookBackend {
client: Client,
url: Url,
method: Method,
headers: HeaderMap,
body: WebhookBody,
message: Option<ValueSource>,
}
struct TelegramBackend {
client: Client,
bot_token: String,
chat_id: String,
api_base_url: Url,
parse_mode: Option<String>,
disable_web_page_preview: Option<bool>,
message: Option<ValueSource>,
}
struct EmailBackend {
mailer: AsyncSmtpTransport<Tokio1Executor>,
from: Mailbox,
to: Vec<Mailbox>,
subject: Option<ValueSource>,
message: Option<ValueSource>,
}
struct SilenceState {
window: Duration,
data_dir: std::path::PathBuf,
key_template: Option<ValueSource>,
}
struct ActiveWindowState {
start: NaiveTime,
end: NaiveTime,
timezone: ActiveWindowTimeZone,
days: Option<HashSet<Weekday>>,
bypass_severity: Option<Severity>,
}
enum ActiveWindowTimeZone {
Local,
Named(Tz),
}
struct ActiveWindowMoment {
date: NaiveDate,
time: NaiveTime,
weekday: Weekday,
utc: DateTime<Utc>,
}
enum ActiveWindowDecision {
SendNow,
Delay(Duration),
}
enum NotifyBackend {
Webhook(WebhookBackend),
Telegram(TelegramBackend),
Email(EmailBackend),
}
struct NotifySinkInner {
id: String,
backend: NotifyBackend,
silence: Option<SilenceState>,
active_window: Option<ActiveWindowState>,
min_severity: Option<Severity>,
}
pub struct NotifySink {
inner: std::sync::Arc<NotifySinkInner>,
}
impl NotifySink {
pub fn new(
id: impl Into<String>,
config: NotifySinkConfig,
system_notify: &crate::config::SystemNotifyConfig,
) -> Result<Self> {
fn build_common(
cfg: &impl NotifyConfigCommon,
system_notify: &crate::config::SystemNotifyConfig,
data_dir: &str,
) -> Result<(
Option<SilenceState>,
Option<ActiveWindowState>,
Option<Severity>,
)> {
let silence = NotifySink::build_silence(
cfg.silence().cloned(),
system_notify.silence.as_ref(),
data_dir,
)?;
let active_window = NotifySink::build_active_window(
cfg.active_window().cloned(),
system_notify.active_window.as_ref(),
)?;
Ok((silence, active_window, cfg.min_severity()))
}
let data_dir = "./data";
let (backend, silence, active_window, min_severity) = match config {
NotifySinkConfig::Webhook(cfg) => {
let (silence, active_window, min_severity) =
build_common(&cfg, system_notify, data_dir)?;
(
NotifyBackend::Webhook(Self::build_webhook(cfg)?),
silence,
active_window,
min_severity,
)
}
NotifySinkConfig::Telegram(cfg) => {
let (silence, active_window, min_severity) =
build_common(&cfg, system_notify, data_dir)?;
(
NotifyBackend::Telegram(Self::build_telegram(cfg)?),
silence,
active_window,
min_severity,
)
}
NotifySinkConfig::Email(cfg) => {
let (silence, active_window, min_severity) =
build_common(&cfg, system_notify, data_dir)?;
(
NotifyBackend::Email(Self::build_email(cfg)?),
silence,
active_window,
min_severity,
)
}
};
let inner = NotifySinkInner {
id: id.into(),
backend,
silence,
active_window,
min_severity,
};
Ok(Self {
inner: Arc::new(inner),
})
}
fn build_silence(
config: Option<SilenceConfigDetail>,
system: Option<&SystemNotifySilenceConfig>,
data_dir: &str,
) -> Result<Option<SilenceState>> {
let system_window = system.and_then(|s| s.window);
let system_key = system.and_then(|s| s.key.clone());
let (window, key) = match config {
None => {
if system.is_none() {
return Ok(None);
}
(system_window, None)
}
Some(detail) => (detail.window, detail.key),
};
let window = window
.or(system_window)
.ok_or_else(|| Error::config("notify silence requires 'window' (sink or system)"))?;
if window.is_zero() {
return Err(Error::config(
"notify silence window must be greater than zero",
));
}
let key_template = compile_template(key.or(system_key), "silence_key")?;
let silence_dir = std::path::Path::new(data_dir).join("silence");
if let Err(e) = std::fs::create_dir_all(&silence_dir) {
return Err(Error::config(format!(
"Failed to create silence directory '{:?}': {}",
silence_dir, e
)));
}
Ok(Some(SilenceState {
window,
data_dir: silence_dir,
key_template,
}))
}
fn build_active_window(
config: Option<ActiveWindowConfigDetail>,
system: Option<&SystemNotifyActiveWindowConfig>,
) -> Result<Option<ActiveWindowState>> {
let system_start = system.and_then(|s| s.start.clone());
let system_end = system.and_then(|s| s.end.clone());
let system_timezone = system.and_then(|s| s.timezone.clone());
let system_days = system.and_then(|s| s.days.clone());
let system_bypass = system.and_then(|s| s.bypass_severity);
let (start, end, timezone, days, bypass_severity) = match config {
None => {
if system.is_none() {
return Ok(None);
}
(None, None, None, None, None)
}
Some(detail) => (
detail.start,
detail.end,
detail.timezone,
detail.days,
detail.bypass_severity,
),
};
let start = start
.or(system_start)
.ok_or_else(|| Error::config("notify active_window requires 'start'"))?;
let end = end
.or(system_end)
.ok_or_else(|| Error::config("notify active_window requires 'end'"))?;
let start_time = parse_active_window_time("notify active_window.start", &start)?;
let end_time = parse_active_window_time("notify active_window.end", &end)?;
if start_time == end_time {
return Err(Error::config(
"notify active_window start and end must not be equal",
));
}
let timezone = timezone.or(system_timezone);
let timezone = parse_active_window_timezone("notify active_window.timezone", timezone)?;
let days = days.or(system_days);
let days = days.map(|values| {
let set: HashSet<Weekday> = values.into_iter().map(|d| d.weekday()).collect();
set
});
if matches!(days.as_ref(), Some(set) if set.is_empty()) {
return Err(Error::config("notify active_window.days must not be empty"));
}
Ok(Some(ActiveWindowState {
start: start_time,
end: end_time,
timezone,
days,
bypass_severity: bypass_severity.or(system_bypass),
}))
}
fn build_webhook(cfg: WebhookNotifyConfig) -> Result<WebhookBackend> {
if cfg.url.is_empty() {
return Err(Error::config("notify webhook requires non-empty 'url'"));
}
let method = cfg.method.trim().to_uppercase();
const SUPPORTED_METHODS: &[&str] = &["POST", "PUT", "PATCH"];
if !SUPPORTED_METHODS.contains(&method.as_str()) {
return Err(Error::config(format!(
"notify webhook has unsupported 'method': {} (supported: POST, PUT, PATCH)",
method
)));
}
let url = Url::parse(&cfg.url)
.map_err(|e| Error::config(format!("notify webhook has invalid 'url': {}", e)))?;
let method = Method::from_bytes(method.as_bytes())
.map_err(|e| Error::config(format!("notify webhook invalid method: {}", e)))?;
let mut headers = HeaderMap::new();
for (key, value) in &cfg.headers {
let name = HeaderName::from_bytes(key.as_bytes()).map_err(|e| {
Error::config(format!(
"notify webhook invalid header name '{}': {}",
key, e
))
})?;
let val = HeaderValue::from_str(value).map_err(|e| {
Error::config(format!(
"notify webhook invalid header value for '{}': {}",
key, e
))
})?;
headers.insert(name, val);
}
let client = Client::builder().timeout(cfg.timeout).build()?;
let message = compile_template(cfg.message, "webhook message")?;
Ok(WebhookBackend {
client,
url,
method,
headers,
body: cfg.body,
message,
})
}
fn build_telegram(cfg: TelegramNotifyConfig) -> Result<TelegramBackend> {
if cfg.bot_token.trim().is_empty() {
return Err(Error::config(
"notify telegram requires non-empty 'bot_token'",
));
}
if cfg.chat_id.trim().is_empty() {
return Err(Error::config(
"notify telegram requires non-empty 'chat_id'",
));
}
if cfg.api_base_url.trim().is_empty() {
return Err(Error::config(
"notify telegram requires non-empty 'api_base_url'",
));
}
let mut base_url = cfg.api_base_url;
if !base_url.ends_with('/') {
base_url.push('/');
}
let api_base_url = Url::parse(&base_url).map_err(|e| {
Error::config(format!("notify telegram has invalid 'api_base_url': {}", e))
})?;
let client = Client::builder().timeout(cfg.timeout).build()?;
let message = compile_template(cfg.message, "telegram message")?;
Ok(TelegramBackend {
client,
bot_token: cfg.bot_token.trim().to_string(),
chat_id: cfg.chat_id.trim().to_string(),
api_base_url,
parse_mode: cfg.parse_mode,
disable_web_page_preview: cfg.disable_web_page_preview,
message,
})
}
fn build_email(cfg: EmailNotifyConfig) -> Result<EmailBackend> {
if cfg.smtp.server.trim().is_empty() {
return Err(Error::config(
"notify email requires non-empty 'smtp.server'",
));
}
if cfg.to.is_empty() {
return Err(Error::config(
"notify email requires at least one recipient in 'to'",
));
}
if cfg.smtp.username.is_some() != cfg.smtp.password.is_some() {
return Err(Error::config(
"notify email requires both 'smtp.username' and 'smtp.password' when using auth",
));
}
let mut builder = match cfg.smtp.security {
SmtpSecurity::StartTls => AsyncSmtpTransport::<Tokio1Executor>::starttls_relay(
&cfg.smtp.server,
)
.map_err(|e| Error::config(format!("notify email invalid smtp server: {}", e)))?,
SmtpSecurity::None => {
AsyncSmtpTransport::<Tokio1Executor>::builder_dangerous(&cfg.smtp.server)
}
};
if let Some(port) = cfg.smtp.port {
builder = builder.port(port);
}
if let (Some(username), Some(password)) = (cfg.smtp.username, cfg.smtp.password) {
builder = builder.credentials(Credentials::new(username, password));
}
let mailer = builder.build();
let from: Mailbox = cfg
.from
.parse()
.map_err(|e| Error::config(format!("notify email invalid 'from' address: {}", e)))?;
let mut to = Vec::with_capacity(cfg.to.len());
for address in cfg.to {
let mailbox: Mailbox = address.parse().map_err(|e| {
Error::config(format!(
"notify email invalid recipient address '{}': {}",
address, e
))
})?;
to.push(mailbox);
}
let subject = compile_template(cfg.subject, "email subject")?;
let message = compile_template(cfg.message, "email message")?;
Ok(EmailBackend {
mailer,
from,
to,
subject,
message,
})
}
async fn send_webhook(backend: &WebhookBackend, msg: &Message) -> Result<()> {
let render_msg = message_for_template(msg);
let text = backend
.message
.as_ref()
.map(|s| s.resolve_to_string(render_msg.as_ref(), || default_message_text(msg)))
.unwrap_or_else(|| default_message_text(msg));
let body = match backend.body {
WebhookBody::Payload => msg.payload.clone(),
WebhookBody::Full => serde_json::json!({
"meta": msg.meta,
"payload": msg.payload,
"text": text,
}),
};
let response = backend
.client
.request(backend.method.clone(), backend.url.clone())
.headers(backend.headers.clone())
.json(&body)
.send()
.await?;
if !response.status().is_success() {
return Err(Error::sink(format!(
"notify webhook request failed: {}",
response.status()
)));
}
Ok(())
}
async fn send_telegram(backend: &TelegramBackend, msg: &Message) -> Result<()> {
let render_msg = message_for_template(msg);
let text = backend
.message
.as_ref()
.map(|s| s.resolve_to_string(render_msg.as_ref(), || default_message_text(msg)))
.unwrap_or_else(|| default_message_text(msg));
let url = backend
.api_base_url
.join(&format!("./bot{}/sendMessage", backend.bot_token))
.map_err(|e| Error::sink(format!("notify telegram invalid api base url: {}", e)))?;
let mut payload = serde_json::json!({
"chat_id": backend.chat_id,
"text": text,
});
if let Some(parse_mode) = backend.parse_mode.as_ref() {
payload["parse_mode"] = Value::String(parse_mode.clone());
}
if let Some(disable) = backend.disable_web_page_preview {
payload["disable_web_page_preview"] = Value::Bool(disable);
}
let response = backend.client.post(url).json(&payload).send().await?;
if !response.status().is_success() {
return Err(Error::sink(format!(
"notify telegram request failed: {}",
response.status()
)));
}
Ok(())
}
async fn send_email(backend: &EmailBackend, msg: &Message) -> Result<()> {
let render_msg = message_for_template(msg);
let subject = backend
.subject
.as_ref()
.map(|s| s.resolve_to_string(render_msg.as_ref(), || default_subject(msg)))
.unwrap_or_else(|| default_subject(msg));
let body = backend
.message
.as_ref()
.map(|s| s.resolve_to_string(render_msg.as_ref(), || default_message_text(msg)))
.unwrap_or_else(|| default_message_text(msg));
let mut builder = EmailMessage::builder().from(backend.from.clone());
for recipient in &backend.to {
builder = builder.to(recipient.clone());
}
let email = builder
.subject(subject)
.body(body)
.map_err(|e| Error::sink(format!("notify email build failed: {}", e)))?;
backend
.mailer
.send(email)
.await
.map_err(|e| Error::sink(format!("notify email send failed: {}", e)))?;
Ok(())
}
}
impl ActiveWindowState {
fn now(&self) -> ActiveWindowMoment {
self.timezone.moment_now()
}
fn is_active(&self, moment: &ActiveWindowMoment) -> bool {
if self.start < self.end {
if !self.allows_day(moment.weekday) {
return false;
}
return moment.time >= self.start && moment.time <= self.end;
}
if moment.time >= self.start {
return self.allows_day(moment.weekday);
}
if moment.time <= self.end {
return self.allows_day(previous_weekday(moment.weekday));
}
false
}
fn should_bypass(&self, msg: &Message) -> bool {
let Some(threshold) = self.bypass_severity else {
return false;
};
let Some(notify) = parse_notify(msg) else {
return false;
};
severity_rank(notify.severity) >= severity_rank(threshold)
}
fn next_start(&self, moment: &ActiveWindowMoment) -> Result<DateTime<Utc>> {
for offset in 0..=7i64 {
let date = moment
.date
.checked_add_signed(chrono::Duration::days(offset))
.ok_or_else(|| Error::sink("notify active window date overflow"))?;
if !self.allows_day(date.weekday()) {
continue;
}
let Some(target) = self.timezone.to_utc(date, self.start) else {
continue;
};
if target <= moment.utc {
continue;
}
return Ok(target);
}
Err(Error::sink(
"notify active window failed to find the next start time",
))
}
fn allows_day(&self, weekday: Weekday) -> bool {
self.days
.as_ref()
.map(|days| days.contains(&weekday))
.unwrap_or(true)
}
}
impl ActiveWindowTimeZone {
fn moment_now(&self) -> ActiveWindowMoment {
match self {
ActiveWindowTimeZone::Local => {
let now = Local::now();
ActiveWindowMoment {
date: now.date_naive(),
time: now.time(),
weekday: now.weekday(),
utc: now.with_timezone(&Utc),
}
}
ActiveWindowTimeZone::Named(tz) => {
let now = Utc::now().with_timezone(tz);
ActiveWindowMoment {
date: now.date_naive(),
time: now.time(),
weekday: now.weekday(),
utc: now.with_timezone(&Utc),
}
}
}
}
fn to_utc(&self, date: NaiveDate, time: NaiveTime) -> Option<DateTime<Utc>> {
let local = NaiveDateTime::new(date, time);
match self {
ActiveWindowTimeZone::Local => match Local.from_local_datetime(&local) {
LocalResult::Single(dt) => Some(dt.with_timezone(&Utc)),
LocalResult::Ambiguous(dt, _) => Some(dt.with_timezone(&Utc)),
LocalResult::None => None,
},
ActiveWindowTimeZone::Named(tz) => match tz.from_local_datetime(&local) {
LocalResult::Single(dt) => Some(dt.with_timezone(&Utc)),
LocalResult::Ambiguous(dt, _) => Some(dt.with_timezone(&Utc)),
LocalResult::None => None,
},
}
}
}
impl NotifySinkInner {
fn active_window_decision(&self, msg: &Message) -> Result<ActiveWindowDecision> {
let Some(active_window) = &self.active_window else {
return Ok(ActiveWindowDecision::SendNow);
};
let now = active_window.now();
if active_window.is_active(&now) {
return Ok(ActiveWindowDecision::SendNow);
}
if active_window.should_bypass(msg) {
return Ok(ActiveWindowDecision::SendNow);
}
let next_start = active_window.next_start(&now)?;
let delay = next_start
.signed_duration_since(now.utc)
.to_std()
.map_err(|e| {
Error::sink(format!(
"notify active window delay conversion failed: {}",
e
))
})?;
if delay.is_zero() {
return Ok(ActiveWindowDecision::SendNow);
}
Ok(ActiveWindowDecision::Delay(delay))
}
async fn should_silence(&self, msg: &Message) -> Result<bool> {
let Some(state) = &self.silence else {
return Ok(false);
};
let key = resolve_silence_key(state.key_template.as_ref(), msg);
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(key.as_bytes());
let hash = hex::encode(hasher.finalize());
let path = state.data_dir.join(format!("{}.silence", hash));
if path.exists()
&& let Ok(metadata) = std::fs::metadata(&path)
&& let Ok(modified) = metadata.modified()
{
let elapsed = match modified.elapsed() {
Ok(duration) => duration,
Err(_) => {
return Ok(false);
}
};
if elapsed < state.window {
return Ok(true);
}
}
if let Err(e) = std::fs::write(&path, &key) {
tracing::warn!(
sink_id = %self.id,
error = %e,
path = ?path,
"Failed to write silence record"
);
}
Ok(false)
}
async fn dispatch(&self, msg: SharedMessage) -> Result<()> {
if self.should_silence(&msg).await? {
return Ok(());
}
match &self.backend {
NotifyBackend::Webhook(backend) => NotifySink::send_webhook(backend, &msg).await,
NotifyBackend::Telegram(backend) => NotifySink::send_telegram(backend, &msg).await,
NotifyBackend::Email(backend) => NotifySink::send_email(backend, &msg).await,
}
}
}
fn previous_weekday(day: Weekday) -> Weekday {
day.pred()
}
#[async_trait]
impl Sink for NotifySink {
fn id(&self) -> &str {
&self.inner.id
}
async fn process(&self, msg: SharedMessage) -> Result<()> {
if let Some(min_severity) = self.inner.min_severity {
let msg_severity = parse_notify(&msg)
.map(|n| n.severity)
.unwrap_or(Severity::Info);
if severity_rank(msg_severity) < severity_rank(min_severity) {
tracing::info!(
sink_id = %self.inner.id,
msg_severity = ?msg_severity,
min_severity = ?min_severity,
"Notification dropped: severity below threshold"
);
return Ok(());
}
}
match self.inner.active_window_decision(&msg)? {
ActiveWindowDecision::SendNow => self.inner.dispatch(msg).await,
ActiveWindowDecision::Delay(delay) => {
let inner = Arc::clone(&self.inner);
tokio::spawn(async move {
tokio::time::sleep(delay).await;
if let Err(err) = inner.dispatch(msg).await {
tracing::error!(error = %err, "notify delayed send failed");
}
});
Ok(())
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::common::message::Message;
use crate::config::{SystemNotifyConfig, SystemNotifySilenceConfig};
use chrono::{Datelike, NaiveDate, NaiveDateTime, NaiveTime, Utc, Weekday};
use chrono_tz::UTC;
use serde_json::json;
use std::collections::HashSet;
#[test]
fn test_default_message_text_from_notify() {
let notify =
Notify::new("cpu_high", Severity::Warning, "CPU > 90%").with_label("host", "api-1");
let msg = Message::new("source", serde_json::to_value(notify).unwrap());
let text = default_message_text(&msg);
assert!(text.contains("cpu_high"));
assert!(text.contains("CPU > 90%"));
assert!(text.contains("labels"));
}
#[test]
fn test_default_message_text_fallback() {
let msg = Message::new("source", json!({"k": "v"}));
let text = default_message_text(&msg);
assert!(text.contains("\"k\":\"v\""));
}
#[test]
fn test_default_silence_key_from_notify() {
let notify =
Notify::new("disk_full", Severity::Error, "disk 95%").with_label("host", "api-1");
let msg = Message::new("source", serde_json::to_value(notify).unwrap());
let key = default_silence_key(&msg);
assert!(key.contains("disk_full"));
assert!(key.contains("disk 95%"));
assert!(key.contains("host=api-1"));
}
#[test]
fn test_silence_inherits_system_defaults() {
let system = SystemNotifyConfig {
silence: Some(SystemNotifySilenceConfig {
window: Some(Duration::from_secs(3600)),
key: Some("{{ $.name }}".to_string()),
}),
..Default::default()
};
let silence = NotifySink::build_silence(None, system.silence.as_ref(), "./data")
.unwrap()
.unwrap();
assert_eq!(silence.window, Duration::from_secs(3600));
let msg = Message::new("source", json!({"name": "alert"}));
let key = resolve_silence_key(silence.key_template.as_ref(), &msg);
assert_eq!(key, "alert");
}
#[test]
fn test_silence_override_key_uses_system_window() {
let system = SystemNotifyConfig {
silence: Some(SystemNotifySilenceConfig {
window: Some(Duration::from_secs(1800)),
key: Some("{{ $.name }}".to_string()),
}),
..Default::default()
};
let silence = NotifySink::build_silence(
Some(SilenceConfigDetail {
window: None,
key: Some("{{ $.message }}".to_string()),
}),
system.silence.as_ref(),
"./data",
)
.unwrap()
.unwrap();
assert_eq!(silence.window, Duration::from_secs(1800));
let msg = Message::new("source", json!({"name": "n1", "message": "m1"}));
let key = resolve_silence_key(silence.key_template.as_ref(), &msg);
assert_eq!(key, "m1");
}
#[test]
fn test_silence_requires_window_without_system_default() {
let system = SystemNotifyConfig::default();
let err = NotifySink::build_silence(
Some(SilenceConfigDetail {
window: None,
key: Some("{{ $.message }}".to_string()),
}),
system.silence.as_ref(),
"./data",
)
.err()
.unwrap();
assert!(err.to_string().contains("notify silence requires 'window'"));
}
#[test]
fn test_resolve_silence_key_template() {
let notify = Notify::new("disk_full", Severity::Error, "disk 95%");
let msg = Message::new("source", serde_json::to_value(notify).unwrap());
let template = ValueSource::compile_optional(Some("{{ $.name }}"))
.unwrap()
.unwrap();
let key = resolve_silence_key(Some(&template), &msg);
assert_eq!(key, "disk_full");
}
#[test]
fn test_telegram_url_construction() {
let base_url = Url::parse("https://api.telegram.org").unwrap();
let bot_token = "123456789:ABCDefGHIjkLmnOPQRstuVWxyz";
let joined = base_url
.join(&format!("./bot{}/sendMessage", bot_token))
.unwrap();
assert_eq!(
joined.as_str(),
"https://api.telegram.org/bot123456789:ABCDefGHIjkLmnOPQRstuVWxyz/sendMessage"
);
}
fn moment_at(date: NaiveDate, time: NaiveTime) -> ActiveWindowMoment {
let naive = NaiveDateTime::new(date, time);
let local = UTC.from_local_datetime(&naive).single().unwrap();
ActiveWindowMoment {
date,
time,
weekday: local.weekday(),
utc: local.with_timezone(&Utc),
}
}
#[test]
fn test_active_window_same_day() {
let window = ActiveWindowState {
start: NaiveTime::from_hms_opt(8, 0, 0).unwrap(),
end: NaiveTime::from_hms_opt(22, 0, 0).unwrap(),
timezone: ActiveWindowTimeZone::Named(UTC),
days: None,
bypass_severity: None,
};
let date = NaiveDate::from_ymd_opt(2024, 1, 2).unwrap();
let inside = moment_at(date, NaiveTime::from_hms_opt(9, 0, 0).unwrap());
let outside = moment_at(date, NaiveTime::from_hms_opt(23, 0, 0).unwrap());
assert!(window.is_active(&inside));
assert!(!window.is_active(&outside));
}
#[test]
fn test_active_window_cross_midnight_days() {
let mut days = HashSet::new();
days.insert(Weekday::Mon);
let window = ActiveWindowState {
start: NaiveTime::from_hms_opt(22, 0, 0).unwrap(),
end: NaiveTime::from_hms_opt(8, 0, 0).unwrap(),
timezone: ActiveWindowTimeZone::Named(UTC),
days: Some(days),
bypass_severity: None,
};
let monday = NaiveDate::from_ymd_opt(2024, 1, 1).unwrap();
let tuesday = NaiveDate::from_ymd_opt(2024, 1, 2).unwrap();
let mon_late = moment_at(monday, NaiveTime::from_hms_opt(23, 0, 0).unwrap());
let tue_early = moment_at(tuesday, NaiveTime::from_hms_opt(7, 0, 0).unwrap());
let tue_mid = moment_at(tuesday, NaiveTime::from_hms_opt(12, 0, 0).unwrap());
assert!(window.is_active(&mon_late));
assert!(window.is_active(&tue_early));
assert!(!window.is_active(&tue_mid));
}
#[test]
fn test_active_window_bypass_severity() {
let window = ActiveWindowState {
start: NaiveTime::from_hms_opt(8, 0, 0).unwrap(),
end: NaiveTime::from_hms_opt(22, 0, 0).unwrap(),
timezone: ActiveWindowTimeZone::Named(UTC),
days: None,
bypass_severity: Some(Severity::Error),
};
let notify = Notify::new("test", Severity::Critical, "critical");
let msg = Message::new("source", serde_json::to_value(notify).unwrap());
assert!(window.should_bypass(&msg));
}
#[test]
fn test_severity_rank_ordering() {
assert!(severity_rank(Severity::Info) < severity_rank(Severity::Warning));
assert!(severity_rank(Severity::Warning) < severity_rank(Severity::Error));
assert!(severity_rank(Severity::Error) < severity_rank(Severity::Critical));
}
#[test]
fn test_min_severity_filtering_logic() {
let min = Severity::Warning;
let msg = Severity::Info;
assert!(severity_rank(msg) < severity_rank(min));
let msg = Severity::Warning;
assert!(severity_rank(msg) >= severity_rank(min));
let msg = Severity::Error;
assert!(severity_rank(msg) >= severity_rank(min));
let msg = Severity::Critical;
assert!(severity_rank(msg) >= severity_rank(min));
}
}