use serde::Serialize;
use tracing::{info, warn};
use crate::config::ObservabilityConfig;
#[derive(Debug, Clone)]
pub enum NotificationChannel {
Webhook { url: String },
Email {
smtp_host: String,
smtp_port: u16,
from: String,
to: String,
},
}
#[derive(Debug, Clone)]
pub struct Notifier {
channels: Vec<NotificationChannel>,
client: reqwest::Client,
}
#[derive(Serialize)]
struct WebhookPayload {
text: String,
}
impl Notifier {
pub fn new(channels: Vec<NotificationChannel>) -> Self {
Self {
channels,
client: reqwest::Client::new(),
}
}
pub fn from_config(config: &ObservabilityConfig) -> Self {
let mut channels = Vec::new();
if let Some(ref alerts) = config.alerts {
if let Some(ref url) = alerts.webhook {
channels.push(NotificationChannel::Webhook { url: url.clone() });
}
if let Some(ref email) = alerts.email {
channels.push(NotificationChannel::Email {
smtp_host: "localhost".into(),
smtp_port: 587,
from: "orca@localhost".into(),
to: email.clone(),
});
}
}
Self::new(channels)
}
pub async fn send(&self, title: &str, message: &str, severity: &str) {
for channel in &self.channels {
match channel {
NotificationChannel::Webhook { url } => {
self.send_webhook(url, title, message, severity).await;
}
NotificationChannel::Email {
smtp_host,
smtp_port,
from,
to,
} => {
self.send_email(smtp_host, *smtp_port, from, to, title, message, severity)
.await;
}
}
}
}
async fn send_webhook(&self, url: &str, title: &str, message: &str, severity: &str) {
let payload = WebhookPayload {
text: format!("[{severity}] {title}: {message}"),
};
match self.client.post(url).json(&payload).send().await {
Ok(resp) => {
if resp.status().is_success() {
info!(url = %url, "notification sent via webhook");
} else {
warn!(
url = %url,
status = %resp.status(),
"webhook returned non-success status"
);
}
}
Err(e) => {
warn!(url = %url, error = %e, "failed to send webhook notification");
}
}
}
#[allow(clippy::too_many_arguments)]
async fn send_email(
&self,
smtp_host: &str,
smtp_port: u16,
from: &str,
to: &str,
title: &str,
message: &str,
severity: &str,
) {
let body = format!(
"Subject: [orca/{severity}] {title}\r\nFrom: {from}\r\nTo: {to}\r\n\r\n{message}\r\n"
);
if Self::try_sendmail(to, &body) {
info!(to = %to, "email sent via sendmail");
return;
}
match Self::try_raw_smtp(smtp_host, smtp_port, from, to, &body).await {
Ok(()) => info!(to = %to, "email sent via SMTP to {smtp_host}:{smtp_port}"),
Err(e) => warn!(to = %to, error = %e, "failed to send email"),
}
}
fn try_sendmail(to: &str, body: &str) -> bool {
use std::io::Write;
let Ok(mut child) = std::process::Command::new("sendmail")
.arg("-t")
.arg(to)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
else {
return false;
};
if let Some(ref mut stdin) = child.stdin {
let _ = stdin.write_all(body.as_bytes());
}
child.wait().is_ok_and(|s| s.success())
}
async fn try_raw_smtp(
host: &str,
port: u16,
from: &str,
to: &str,
body: &str,
) -> anyhow::Result<()> {
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
let stream = TcpStream::connect((host, port)).await?;
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
let mut line = String::new();
reader.read_line(&mut line).await?;
line.clear();
writer.write_all(b"HELO orca\r\n").await?;
reader.read_line(&mut line).await?;
line.clear();
writer
.write_all(format!("MAIL FROM:<{from}>\r\n").as_bytes())
.await?;
reader.read_line(&mut line).await?;
line.clear();
writer
.write_all(format!("RCPT TO:<{to}>\r\n").as_bytes())
.await?;
reader.read_line(&mut line).await?;
line.clear();
writer.write_all(b"DATA\r\n").await?;
reader.read_line(&mut line).await?;
line.clear();
writer.write_all(body.as_bytes()).await?;
writer.write_all(b"\r\n.\r\n").await?;
reader.read_line(&mut line).await?;
line.clear();
writer.write_all(b"QUIT\r\n").await?;
Ok(())
}
pub fn channel_count(&self) -> usize {
self.channels.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn from_config_with_webhook() {
use crate::config::{AlertChannelConfig, ObservabilityConfig};
let config = ObservabilityConfig {
otlp_endpoint: None,
alerts: Some(AlertChannelConfig {
webhook: Some("https://hooks.slack.com/test".into()),
email: Some("ops@example.com".into()),
}),
};
let notifier = Notifier::from_config(&config);
assert_eq!(notifier.channel_count(), 2);
}
#[test]
fn from_config_no_alerts() {
let config = ObservabilityConfig {
otlp_endpoint: None,
alerts: None,
};
let notifier = Notifier::from_config(&config);
assert_eq!(notifier.channel_count(), 0);
}
#[test]
fn empty_notifier() {
let notifier = Notifier::new(vec![]);
assert_eq!(notifier.channel_count(), 0);
}
}