Skip to main content

orca_core/
notifications.rs

1//! Notification dispatch to Slack, Discord, and email channels.
2
3use serde::Serialize;
4use tracing::{info, warn};
5
6use crate::config::ObservabilityConfig;
7
8/// A notification channel target.
9#[derive(Debug, Clone)]
10pub enum NotificationChannel {
11    /// Slack/Discord-compatible webhook.
12    Webhook { url: String },
13    /// Email notification (SMTP delivery deferred to M5).
14    Email {
15        smtp_host: String,
16        smtp_port: u16,
17        from: String,
18        to: String,
19    },
20}
21
22/// Dispatches notifications to all configured channels.
23#[derive(Debug, Clone)]
24pub struct Notifier {
25    channels: Vec<NotificationChannel>,
26    client: reqwest::Client,
27}
28
29#[derive(Serialize)]
30struct WebhookPayload {
31    text: String,
32}
33
34impl Notifier {
35    /// Create a notifier with the given channels.
36    pub fn new(channels: Vec<NotificationChannel>) -> Self {
37        Self {
38            channels,
39            client: reqwest::Client::new(),
40        }
41    }
42
43    /// Build a notifier from cluster observability config.
44    ///
45    /// Reads `observability.alerts.webhook` and `observability.alerts.email` fields.
46    pub fn from_config(config: &ObservabilityConfig) -> Self {
47        let mut channels = Vec::new();
48
49        if let Some(ref alerts) = config.alerts {
50            if let Some(ref url) = alerts.webhook {
51                channels.push(NotificationChannel::Webhook { url: url.clone() });
52            }
53            if let Some(ref email) = alerts.email {
54                channels.push(NotificationChannel::Email {
55                    smtp_host: "localhost".into(),
56                    smtp_port: 587,
57                    from: "orca@localhost".into(),
58                    to: email.clone(),
59                });
60            }
61        }
62
63        Self::new(channels)
64    }
65
66    /// Send a notification to all configured channels.
67    ///
68    /// `severity` is informational (e.g. "info", "warning", "critical").
69    /// Failures on individual channels are logged but do not abort the remaining sends.
70    pub async fn send(&self, title: &str, message: &str, severity: &str) {
71        for channel in &self.channels {
72            match channel {
73                NotificationChannel::Webhook { url } => {
74                    self.send_webhook(url, title, message, severity).await;
75                }
76                NotificationChannel::Email {
77                    smtp_host,
78                    smtp_port,
79                    from,
80                    to,
81                } => {
82                    self.send_email(smtp_host, *smtp_port, from, to, title, message, severity)
83                        .await;
84                }
85            }
86        }
87    }
88
89    async fn send_webhook(&self, url: &str, title: &str, message: &str, severity: &str) {
90        let payload = WebhookPayload {
91            text: format!("[{severity}] {title}: {message}"),
92        };
93
94        match self.client.post(url).json(&payload).send().await {
95            Ok(resp) => {
96                if resp.status().is_success() {
97                    info!(url = %url, "notification sent via webhook");
98                } else {
99                    warn!(
100                        url = %url,
101                        status = %resp.status(),
102                        "webhook returned non-success status"
103                    );
104                }
105            }
106            Err(e) => {
107                warn!(url = %url, error = %e, "failed to send webhook notification");
108            }
109        }
110    }
111
112    /// Send email notification via the `sendmail` command if available,
113    /// falling back to raw SMTP over TCP for internal relays.
114    #[allow(clippy::too_many_arguments)]
115    async fn send_email(
116        &self,
117        smtp_host: &str,
118        smtp_port: u16,
119        from: &str,
120        to: &str,
121        title: &str,
122        message: &str,
123        severity: &str,
124    ) {
125        let body = format!(
126            "Subject: [orca/{severity}] {title}\r\nFrom: {from}\r\nTo: {to}\r\n\r\n{message}\r\n"
127        );
128
129        // Try sendmail first (most reliable for self-hosted setups)
130        if Self::try_sendmail(to, &body) {
131            info!(to = %to, "email sent via sendmail");
132            return;
133        }
134
135        // Fall back to raw SMTP
136        match Self::try_raw_smtp(smtp_host, smtp_port, from, to, &body).await {
137            Ok(()) => info!(to = %to, "email sent via SMTP to {smtp_host}:{smtp_port}"),
138            Err(e) => warn!(to = %to, error = %e, "failed to send email"),
139        }
140    }
141
142    /// Attempt to send via the local sendmail binary.
143    fn try_sendmail(to: &str, body: &str) -> bool {
144        use std::io::Write;
145        let Ok(mut child) = std::process::Command::new("sendmail")
146            .arg("-t")
147            .arg(to)
148            .stdin(std::process::Stdio::piped())
149            .stdout(std::process::Stdio::null())
150            .stderr(std::process::Stdio::null())
151            .spawn()
152        else {
153            return false;
154        };
155        if let Some(ref mut stdin) = child.stdin {
156            let _ = stdin.write_all(body.as_bytes());
157        }
158        child.wait().is_ok_and(|s| s.success())
159    }
160
161    /// Send via raw SMTP (no TLS, suitable for internal relays).
162    async fn try_raw_smtp(
163        host: &str,
164        port: u16,
165        from: &str,
166        to: &str,
167        body: &str,
168    ) -> anyhow::Result<()> {
169        use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
170        use tokio::net::TcpStream;
171
172        let stream = TcpStream::connect((host, port)).await?;
173        let (reader, mut writer) = stream.into_split();
174        let mut reader = BufReader::new(reader);
175        let mut line = String::new();
176
177        // Read greeting
178        reader.read_line(&mut line).await?;
179        line.clear();
180
181        writer.write_all(b"HELO orca\r\n").await?;
182        reader.read_line(&mut line).await?;
183        line.clear();
184
185        writer
186            .write_all(format!("MAIL FROM:<{from}>\r\n").as_bytes())
187            .await?;
188        reader.read_line(&mut line).await?;
189        line.clear();
190
191        writer
192            .write_all(format!("RCPT TO:<{to}>\r\n").as_bytes())
193            .await?;
194        reader.read_line(&mut line).await?;
195        line.clear();
196
197        writer.write_all(b"DATA\r\n").await?;
198        reader.read_line(&mut line).await?;
199        line.clear();
200
201        writer.write_all(body.as_bytes()).await?;
202        writer.write_all(b"\r\n.\r\n").await?;
203        reader.read_line(&mut line).await?;
204        line.clear();
205
206        writer.write_all(b"QUIT\r\n").await?;
207
208        Ok(())
209    }
210
211    /// Returns the number of configured channels.
212    pub fn channel_count(&self) -> usize {
213        self.channels.len()
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220
221    #[test]
222    fn from_config_with_webhook() {
223        use crate::config::{AlertChannelConfig, ObservabilityConfig};
224
225        let config = ObservabilityConfig {
226            otlp_endpoint: None,
227            alerts: Some(AlertChannelConfig {
228                webhook: Some("https://hooks.slack.com/test".into()),
229                email: Some("ops@example.com".into()),
230            }),
231        };
232
233        let notifier = Notifier::from_config(&config);
234        assert_eq!(notifier.channel_count(), 2);
235    }
236
237    #[test]
238    fn from_config_no_alerts() {
239        let config = ObservabilityConfig {
240            otlp_endpoint: None,
241            alerts: None,
242        };
243
244        let notifier = Notifier::from_config(&config);
245        assert_eq!(notifier.channel_count(), 0);
246    }
247
248    #[test]
249    fn empty_notifier() {
250        let notifier = Notifier::new(vec![]);
251        assert_eq!(notifier.channel_count(), 0);
252    }
253}