Skip to main content

mur_chat/
notification.rs

1//! Notification system — delivers workflow status updates via chat and other sinks.
2//!
3//! Provides a dispatcher that fans out notifications to multiple backends:
4//! console, webhook, and chat platform.
5
6use crate::platform::{ChatPlatform, WorkflowNotification};
7use anyhow::{bail, Result};
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::net::IpAddr;
12use std::sync::Arc;
13
14/// Notification severity level.
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16#[serde(rename_all = "lowercase")]
17pub enum NotificationLevel {
18    Info,
19    Warning,
20    Error,
21    Success,
22}
23
24impl std::fmt::Display for NotificationLevel {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        match self {
27            Self::Info => write!(f, "INFO"),
28            Self::Warning => write!(f, "WARN"),
29            Self::Error => write!(f, "ERROR"),
30            Self::Success => write!(f, "OK"),
31        }
32    }
33}
34
35/// A notification to be delivered.
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct Notification {
38    pub level: NotificationLevel,
39    pub title: String,
40    pub body: String,
41    #[serde(default)]
42    pub metadata: HashMap<String, String>,
43    pub timestamp: DateTime<Utc>,
44}
45
46impl Notification {
47    pub fn new(
48        level: NotificationLevel,
49        title: impl Into<String>,
50        body: impl Into<String>,
51    ) -> Self {
52        Self {
53            level,
54            title: title.into(),
55            body: body.into(),
56            metadata: HashMap::new(),
57            timestamp: Utc::now(),
58        }
59    }
60
61    pub fn with_meta(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
62        self.metadata.insert(key.into(), value.into());
63        self
64    }
65}
66
67/// Console sink — prints notifications to stderr.
68pub struct ConsoleSink;
69
70impl ConsoleSink {
71    pub async fn send(&self, notification: &Notification) -> Result<()> {
72        let emoji = match notification.level {
73            NotificationLevel::Info => "ℹ️",
74            NotificationLevel::Warning => "⚠️",
75            NotificationLevel::Error => "❌",
76            NotificationLevel::Success => "✅",
77        };
78        eprintln!(
79            "{} [{}] {}: {}",
80            emoji, notification.level, notification.title, notification.body
81        );
82        Ok(())
83    }
84}
85
86/// Webhook sink — POSTs notification as JSON to a URL.
87pub struct WebhookSink {
88    url: String,
89    client: reqwest::Client,
90}
91
92impl WebhookSink {
93    /// Create a new webhook sink, validating the URL at construction time.
94    ///
95    /// Rejects URLs that:
96    /// - Are not valid URLs
97    /// - Use a scheme other than http or https
98    /// - Target localhost or private/loopback IP addresses (SSRF mitigation)
99    pub fn new(url: impl Into<String>) -> Result<Self> {
100        let url = url.into();
101        Self::validate_url(&url)?;
102        Ok(Self {
103            url,
104            client: reqwest::Client::builder()
105                .connect_timeout(std::time::Duration::from_secs(10))
106                .timeout(std::time::Duration::from_secs(30))
107                .build()
108                .unwrap_or_else(|_| reqwest::Client::new()),
109        })
110    }
111
112    /// Validate a webhook URL: must be a valid URL with http(s) scheme and
113    /// must not point to loopback or private IP addresses (SSRF protection).
114    fn validate_url(url: &str) -> Result<()> {
115        let parsed = reqwest::Url::parse(url)
116            .map_err(|e| anyhow::anyhow!("Invalid webhook URL '{}': {}", url, e))?;
117
118        match parsed.scheme() {
119            "http" | "https" => {}
120            other => bail!("Webhook URL scheme must be http or https, got '{}'", other),
121        }
122
123        if let Some(host) = parsed.host_str() {
124            // Block well-known loopback hostnames
125            let lower = host.to_lowercase();
126            if lower == "localhost" || lower == "[::1]" {
127                bail!("Webhook URL must not target localhost: {}", host);
128            }
129
130            // Block loopback and private IPs
131            if let Ok(ip) = host.parse::<IpAddr>() {
132                if ip.is_loopback() {
133                    bail!("Webhook URL must not target loopback address: {}", ip);
134                }
135                if let IpAddr::V4(v4) = ip {
136                    if v4.is_private() || v4.is_link_local() || v4.is_unspecified() {
137                        bail!(
138                            "Webhook URL must not target private/link-local IP: {}",
139                            v4
140                        );
141                    }
142                }
143                if let IpAddr::V6(v6) = ip {
144                    if v6.is_unspecified() {
145                        bail!(
146                            "Webhook URL must not target unspecified IPv6: {}",
147                            v6
148                        );
149                    }
150                }
151            }
152        } else {
153            bail!("Webhook URL has no host");
154        }
155
156        Ok(())
157    }
158
159    pub async fn send(&self, notification: &Notification) -> Result<()> {
160        self.client
161            .post(&self.url)
162            .json(notification)
163            .send()
164            .await?
165            .error_for_status()?;
166        Ok(())
167    }
168}
169
170/// Chat notification sink — sends workflow notifications via ChatPlatform.
171pub struct ChatNotificationSink<P: ChatPlatform> {
172    platform: Arc<P>,
173    default_channel: String,
174}
175
176impl<P: ChatPlatform + 'static> ChatNotificationSink<P> {
177    pub fn new(platform: Arc<P>, default_channel: String) -> Self {
178        Self {
179            platform,
180            default_channel,
181        }
182    }
183
184    /// Notify about a workflow result.
185    pub async fn notify_workflow(
186        &self,
187        notification: &WorkflowNotification,
188        thread_id: Option<&str>,
189    ) -> Result<String> {
190        self.platform
191            .send_notification(&self.default_channel, thread_id, notification)
192            .await
193    }
194
195    /// Send a generic notification as a chat message.
196    pub async fn send(&self, notification: &Notification) -> Result<()> {
197        let emoji = match notification.level {
198            NotificationLevel::Info => "ℹ️",
199            NotificationLevel::Warning => "⚠️",
200            NotificationLevel::Error => "❌",
201            NotificationLevel::Success => "✅",
202        };
203
204        let msg = crate::platform::OutgoingMessage {
205            channel_id: self.default_channel.clone(),
206            text: format!(
207                "{} *{}*\n{}",
208                emoji, notification.title, notification.body
209            ),
210            thread_id: None,
211            blocks: None,
212        };
213
214        self.platform.send_message(&msg).await?;
215        Ok(())
216    }
217}
218
219/// Dispatcher that sends notifications to console and optional webhook/chat sinks.
220pub struct NotificationDispatcher {
221    console: ConsoleSink,
222    webhook: Option<WebhookSink>,
223}
224
225impl NotificationDispatcher {
226    pub fn new() -> Self {
227        Self {
228            console: ConsoleSink,
229            webhook: None,
230        }
231    }
232
233    /// Add a webhook endpoint. Returns an error if the URL is invalid, uses a
234    /// non-http(s) scheme, or targets a private/loopback IP address.
235    pub fn with_webhook(mut self, url: impl Into<String>) -> Result<Self> {
236        self.webhook = Some(WebhookSink::new(url)?);
237        Ok(self)
238    }
239
240    /// Send a notification to all configured sinks.
241    pub async fn notify(&self, notification: &Notification) {
242        // Always log to console
243        if let Err(e) = self.console.send(notification).await {
244            tracing::warn!("Console notification failed: {}", e);
245        }
246
247        // Optionally send to webhook
248        if let Some(ref webhook) = self.webhook {
249            if let Err(e) = webhook.send(notification).await {
250                tracing::warn!("Webhook notification failed: {}", e);
251            }
252        }
253    }
254
255    /// Convenience: send an info notification.
256    pub async fn info(&self, title: impl Into<String>, body: impl Into<String>) {
257        self.notify(&Notification::new(NotificationLevel::Info, title, body))
258            .await;
259    }
260
261    /// Convenience: send an error notification.
262    pub async fn error(&self, title: impl Into<String>, body: impl Into<String>) {
263        self.notify(&Notification::new(NotificationLevel::Error, title, body))
264            .await;
265    }
266
267    /// Convenience: send a success notification.
268    pub async fn success(&self, title: impl Into<String>, body: impl Into<String>) {
269        self.notify(&Notification::new(NotificationLevel::Success, title, body))
270            .await;
271    }
272}
273
274impl Default for NotificationDispatcher {
275    fn default() -> Self {
276        Self::new()
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283    use crate::platform::{
284        ApprovalRequest, OutgoingMessage, ProgressUpdate,
285    };
286    use std::sync::Mutex;
287
288    struct MockPlatform {
289        notifications: Arc<Mutex<Vec<(bool, String)>>>,
290        messages: Arc<Mutex<Vec<String>>>,
291    }
292
293    impl MockPlatform {
294        fn new() -> Self {
295            Self {
296                notifications: Arc::new(Mutex::new(Vec::new())),
297                messages: Arc::new(Mutex::new(Vec::new())),
298            }
299        }
300    }
301
302    impl ChatPlatform for MockPlatform {
303        async fn send_message(&self, msg: &OutgoingMessage) -> anyhow::Result<String> {
304            self.messages.lock().unwrap().push(msg.text.clone());
305            Ok("ts".into())
306        }
307        async fn send_approval(
308            &self,
309            _channel_id: &str,
310            _request: &ApprovalRequest,
311        ) -> anyhow::Result<String> {
312            Ok("ts".into())
313        }
314        async fn update_message(
315            &self,
316            _channel_id: &str,
317            _message_id: &str,
318            _text: &str,
319        ) -> anyhow::Result<()> {
320            Ok(())
321        }
322        async fn add_reaction(
323            &self,
324            _channel_id: &str,
325            _message_id: &str,
326            _emoji: &str,
327        ) -> anyhow::Result<()> {
328            Ok(())
329        }
330        async fn send_progress(
331            &self,
332            _channel_id: &str,
333            _thread_id: &str,
334            _progress: &ProgressUpdate,
335        ) -> anyhow::Result<String> {
336            Ok("ts".into())
337        }
338        async fn send_notification(
339            &self,
340            _channel_id: &str,
341            _thread_id: Option<&str>,
342            notification: &WorkflowNotification,
343        ) -> anyhow::Result<String> {
344            self.notifications
345                .lock()
346                .unwrap()
347                .push((notification.success, notification.workflow_id.clone()));
348            Ok("ts".into())
349        }
350        async fn start_thread(
351            &self,
352            _channel_id: &str,
353            _execution_id: &str,
354            _workflow_id: &str,
355            _total_steps: usize,
356            _shadow: bool,
357        ) -> anyhow::Result<String> {
358            Ok("thread-ts".into())
359        }
360    }
361
362    #[tokio::test]
363    async fn test_chat_notification_sink_success() {
364        let platform = Arc::new(MockPlatform::new());
365        let sink = ChatNotificationSink::new(platform.clone(), "#ops".into());
366
367        let notif = WorkflowNotification {
368            execution_id: "e1".into(),
369            workflow_id: "deploy".into(),
370            success: true,
371            steps_completed: 3,
372            total_steps: 3,
373            duration_ms: 1500,
374            error: None,
375        };
376        sink.notify_workflow(&notif, None).await.unwrap();
377
378        let notifs = platform.notifications.lock().unwrap();
379        assert_eq!(notifs.len(), 1);
380        assert!(notifs[0].0);
381        assert_eq!(notifs[0].1, "deploy");
382    }
383
384    #[tokio::test]
385    async fn test_chat_notification_sink_failure() {
386        let platform = Arc::new(MockPlatform::new());
387        let sink = ChatNotificationSink::new(platform.clone(), "#ops".into());
388
389        let notif = WorkflowNotification {
390            execution_id: "e1".into(),
391            workflow_id: "deploy".into(),
392            success: false,
393            steps_completed: 1,
394            total_steps: 3,
395            duration_ms: 500,
396            error: Some("step failed".into()),
397        };
398        sink.notify_workflow(&notif, None).await.unwrap();
399
400        let notifs = platform.notifications.lock().unwrap();
401        assert_eq!(notifs.len(), 1);
402        assert!(!notifs[0].0);
403    }
404
405    #[tokio::test]
406    async fn test_chat_notification_sink_generic() {
407        let platform = Arc::new(MockPlatform::new());
408        let sink = ChatNotificationSink::new(platform.clone(), "#ops".into());
409
410        let notification =
411            Notification::new(NotificationLevel::Info, "Test", "hello world");
412        sink.send(&notification).await.unwrap();
413
414        let msgs = platform.messages.lock().unwrap();
415        assert_eq!(msgs.len(), 1);
416        assert!(msgs[0].contains("Test"));
417    }
418
419    #[tokio::test]
420    async fn test_dispatcher() {
421        let dispatcher = NotificationDispatcher::new();
422        // Just verify it doesn't panic
423        dispatcher.info("Test", "hello").await;
424        dispatcher.error("Fail", "oops").await;
425        dispatcher.success("Done", "all good").await;
426    }
427
428    #[test]
429    fn test_notification_with_meta() {
430        let n = Notification::new(NotificationLevel::Info, "Test", "body")
431            .with_meta("workflow", "deploy")
432            .with_meta("step", "3");
433        assert_eq!(n.metadata.len(), 2);
434        assert_eq!(n.metadata["workflow"], "deploy");
435    }
436
437    #[test]
438    fn test_notification_level_display() {
439        assert_eq!(NotificationLevel::Info.to_string(), "INFO");
440        assert_eq!(NotificationLevel::Error.to_string(), "ERROR");
441        assert_eq!(NotificationLevel::Success.to_string(), "OK");
442        assert_eq!(NotificationLevel::Warning.to_string(), "WARN");
443    }
444}