orca_core/
notifications.rs1use serde::Serialize;
4use tracing::{info, warn};
5
6use crate::config::ObservabilityConfig;
7
8#[derive(Debug, Clone)]
10pub enum NotificationChannel {
11 Webhook { url: String },
13 Email {
15 smtp_host: String,
16 smtp_port: u16,
17 from: String,
18 to: String,
19 },
20}
21
22#[derive(Debug, Clone)]
24pub struct Notifier {
25 channels: Vec<NotificationChannel>,
26 client: reqwest::Client,
27}
28
29#[derive(Serialize)]
30struct WebhookPayload {
31 text: String,
32}
33
34impl Notifier {
35 pub fn new(channels: Vec<NotificationChannel>) -> Self {
37 Self {
38 channels,
39 client: reqwest::Client::new(),
40 }
41 }
42
43 pub fn from_config(config: &ObservabilityConfig) -> Self {
47 let mut channels = Vec::new();
48
49 if let Some(ref alerts) = config.alerts {
50 if let Some(ref url) = alerts.webhook {
51 channels.push(NotificationChannel::Webhook { url: url.clone() });
52 }
53 if let Some(ref email) = alerts.email {
54 channels.push(NotificationChannel::Email {
55 smtp_host: "localhost".into(),
56 smtp_port: 587,
57 from: "orca@localhost".into(),
58 to: email.clone(),
59 });
60 }
61 }
62
63 Self::new(channels)
64 }
65
66 pub async fn send(&self, title: &str, message: &str, severity: &str) {
71 for channel in &self.channels {
72 match channel {
73 NotificationChannel::Webhook { url } => {
74 self.send_webhook(url, title, message, severity).await;
75 }
76 NotificationChannel::Email {
77 smtp_host,
78 smtp_port,
79 from,
80 to,
81 } => {
82 self.send_email(smtp_host, *smtp_port, from, to, title, message, severity)
83 .await;
84 }
85 }
86 }
87 }
88
89 async fn send_webhook(&self, url: &str, title: &str, message: &str, severity: &str) {
90 let payload = WebhookPayload {
91 text: format!("[{severity}] {title}: {message}"),
92 };
93
94 match self.client.post(url).json(&payload).send().await {
95 Ok(resp) => {
96 if resp.status().is_success() {
97 info!(url = %url, "notification sent via webhook");
98 } else {
99 warn!(
100 url = %url,
101 status = %resp.status(),
102 "webhook returned non-success status"
103 );
104 }
105 }
106 Err(e) => {
107 warn!(url = %url, error = %e, "failed to send webhook notification");
108 }
109 }
110 }
111
112 #[allow(clippy::too_many_arguments)]
115 async fn send_email(
116 &self,
117 smtp_host: &str,
118 smtp_port: u16,
119 from: &str,
120 to: &str,
121 title: &str,
122 message: &str,
123 severity: &str,
124 ) {
125 let body = format!(
126 "Subject: [orca/{severity}] {title}\r\nFrom: {from}\r\nTo: {to}\r\n\r\n{message}\r\n"
127 );
128
129 if Self::try_sendmail(to, &body) {
131 info!(to = %to, "email sent via sendmail");
132 return;
133 }
134
135 match Self::try_raw_smtp(smtp_host, smtp_port, from, to, &body).await {
137 Ok(()) => info!(to = %to, "email sent via SMTP to {smtp_host}:{smtp_port}"),
138 Err(e) => warn!(to = %to, error = %e, "failed to send email"),
139 }
140 }
141
142 fn try_sendmail(to: &str, body: &str) -> bool {
144 use std::io::Write;
145 let Ok(mut child) = std::process::Command::new("sendmail")
146 .arg("-t")
147 .arg(to)
148 .stdin(std::process::Stdio::piped())
149 .stdout(std::process::Stdio::null())
150 .stderr(std::process::Stdio::null())
151 .spawn()
152 else {
153 return false;
154 };
155 if let Some(ref mut stdin) = child.stdin {
156 let _ = stdin.write_all(body.as_bytes());
157 }
158 child.wait().is_ok_and(|s| s.success())
159 }
160
161 async fn try_raw_smtp(
163 host: &str,
164 port: u16,
165 from: &str,
166 to: &str,
167 body: &str,
168 ) -> anyhow::Result<()> {
169 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
170 use tokio::net::TcpStream;
171
172 let stream = TcpStream::connect((host, port)).await?;
173 let (reader, mut writer) = stream.into_split();
174 let mut reader = BufReader::new(reader);
175 let mut line = String::new();
176
177 reader.read_line(&mut line).await?;
179 line.clear();
180
181 writer.write_all(b"HELO orca\r\n").await?;
182 reader.read_line(&mut line).await?;
183 line.clear();
184
185 writer
186 .write_all(format!("MAIL FROM:<{from}>\r\n").as_bytes())
187 .await?;
188 reader.read_line(&mut line).await?;
189 line.clear();
190
191 writer
192 .write_all(format!("RCPT TO:<{to}>\r\n").as_bytes())
193 .await?;
194 reader.read_line(&mut line).await?;
195 line.clear();
196
197 writer.write_all(b"DATA\r\n").await?;
198 reader.read_line(&mut line).await?;
199 line.clear();
200
201 writer.write_all(body.as_bytes()).await?;
202 writer.write_all(b"\r\n.\r\n").await?;
203 reader.read_line(&mut line).await?;
204 line.clear();
205
206 writer.write_all(b"QUIT\r\n").await?;
207
208 Ok(())
209 }
210
211 pub fn channel_count(&self) -> usize {
213 self.channels.len()
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220
221 #[test]
222 fn from_config_with_webhook() {
223 use crate::config::{AlertChannelConfig, ObservabilityConfig};
224
225 let config = ObservabilityConfig {
226 otlp_endpoint: None,
227 alerts: Some(AlertChannelConfig {
228 webhook: Some("https://hooks.slack.com/test".into()),
229 email: Some("ops@example.com".into()),
230 }),
231 };
232
233 let notifier = Notifier::from_config(&config);
234 assert_eq!(notifier.channel_count(), 2);
235 }
236
237 #[test]
238 fn from_config_no_alerts() {
239 let config = ObservabilityConfig {
240 otlp_endpoint: None,
241 alerts: None,
242 };
243
244 let notifier = Notifier::from_config(&config);
245 assert_eq!(notifier.channel_count(), 0);
246 }
247
248 #[test]
249 fn empty_notifier() {
250 let notifier = Notifier::new(vec![]);
251 assert_eq!(notifier.channel_count(), 0);
252 }
253}