1use crate::platform::{ChatPlatform, WorkflowNotification};
7use anyhow::{bail, Result};
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::net::IpAddr;
12use std::sync::Arc;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16#[serde(rename_all = "lowercase")]
17pub enum NotificationLevel {
18 Info,
19 Warning,
20 Error,
21 Success,
22}
23
24impl std::fmt::Display for NotificationLevel {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 match self {
27 Self::Info => write!(f, "INFO"),
28 Self::Warning => write!(f, "WARN"),
29 Self::Error => write!(f, "ERROR"),
30 Self::Success => write!(f, "OK"),
31 }
32 }
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct Notification {
38 pub level: NotificationLevel,
39 pub title: String,
40 pub body: String,
41 #[serde(default)]
42 pub metadata: HashMap<String, String>,
43 pub timestamp: DateTime<Utc>,
44}
45
46impl Notification {
47 pub fn new(
48 level: NotificationLevel,
49 title: impl Into<String>,
50 body: impl Into<String>,
51 ) -> Self {
52 Self {
53 level,
54 title: title.into(),
55 body: body.into(),
56 metadata: HashMap::new(),
57 timestamp: Utc::now(),
58 }
59 }
60
61 pub fn with_meta(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
62 self.metadata.insert(key.into(), value.into());
63 self
64 }
65}
66
67pub struct ConsoleSink;
69
70impl ConsoleSink {
71 pub async fn send(&self, notification: &Notification) -> Result<()> {
72 let emoji = match notification.level {
73 NotificationLevel::Info => "ℹ️",
74 NotificationLevel::Warning => "⚠️",
75 NotificationLevel::Error => "❌",
76 NotificationLevel::Success => "✅",
77 };
78 eprintln!(
79 "{} [{}] {}: {}",
80 emoji, notification.level, notification.title, notification.body
81 );
82 Ok(())
83 }
84}
85
86pub struct WebhookSink {
88 url: String,
89 client: reqwest::Client,
90}
91
92impl WebhookSink {
93 pub fn new(url: impl Into<String>) -> Result<Self> {
100 let url = url.into();
101 Self::validate_url(&url)?;
102 Ok(Self {
103 url,
104 client: reqwest::Client::builder()
105 .connect_timeout(std::time::Duration::from_secs(10))
106 .timeout(std::time::Duration::from_secs(30))
107 .build()
108 .unwrap_or_else(|_| reqwest::Client::new()),
109 })
110 }
111
112 fn validate_url(url: &str) -> Result<()> {
115 let parsed = reqwest::Url::parse(url)
116 .map_err(|e| anyhow::anyhow!("Invalid webhook URL '{}': {}", url, e))?;
117
118 match parsed.scheme() {
119 "http" | "https" => {}
120 other => bail!("Webhook URL scheme must be http or https, got '{}'", other),
121 }
122
123 if let Some(host) = parsed.host_str() {
124 let lower = host.to_lowercase();
126 if lower == "localhost" || lower == "[::1]" {
127 bail!("Webhook URL must not target localhost: {}", host);
128 }
129
130 if let Ok(ip) = host.parse::<IpAddr>() {
132 if ip.is_loopback() {
133 bail!("Webhook URL must not target loopback address: {}", ip);
134 }
135 if let IpAddr::V4(v4) = ip {
136 if v4.is_private() || v4.is_link_local() || v4.is_unspecified() {
137 bail!(
138 "Webhook URL must not target private/link-local IP: {}",
139 v4
140 );
141 }
142 }
143 if let IpAddr::V6(v6) = ip {
144 if v6.is_unspecified() {
145 bail!(
146 "Webhook URL must not target unspecified IPv6: {}",
147 v6
148 );
149 }
150 }
151 }
152 } else {
153 bail!("Webhook URL has no host");
154 }
155
156 Ok(())
157 }
158
159 pub async fn send(&self, notification: &Notification) -> Result<()> {
160 self.client
161 .post(&self.url)
162 .json(notification)
163 .send()
164 .await?
165 .error_for_status()?;
166 Ok(())
167 }
168}
169
170pub struct ChatNotificationSink<P: ChatPlatform> {
172 platform: Arc<P>,
173 default_channel: String,
174}
175
176impl<P: ChatPlatform + 'static> ChatNotificationSink<P> {
177 pub fn new(platform: Arc<P>, default_channel: String) -> Self {
178 Self {
179 platform,
180 default_channel,
181 }
182 }
183
184 pub async fn notify_workflow(
186 &self,
187 notification: &WorkflowNotification,
188 thread_id: Option<&str>,
189 ) -> Result<String> {
190 self.platform
191 .send_notification(&self.default_channel, thread_id, notification)
192 .await
193 }
194
195 pub async fn send(&self, notification: &Notification) -> Result<()> {
197 let emoji = match notification.level {
198 NotificationLevel::Info => "ℹ️",
199 NotificationLevel::Warning => "⚠️",
200 NotificationLevel::Error => "❌",
201 NotificationLevel::Success => "✅",
202 };
203
204 let msg = crate::platform::OutgoingMessage {
205 channel_id: self.default_channel.clone(),
206 text: format!(
207 "{} *{}*\n{}",
208 emoji, notification.title, notification.body
209 ),
210 thread_id: None,
211 blocks: None,
212 };
213
214 self.platform.send_message(&msg).await?;
215 Ok(())
216 }
217}
218
219pub struct NotificationDispatcher {
221 console: ConsoleSink,
222 webhook: Option<WebhookSink>,
223}
224
225impl NotificationDispatcher {
226 pub fn new() -> Self {
227 Self {
228 console: ConsoleSink,
229 webhook: None,
230 }
231 }
232
233 pub fn with_webhook(mut self, url: impl Into<String>) -> Result<Self> {
236 self.webhook = Some(WebhookSink::new(url)?);
237 Ok(self)
238 }
239
240 pub async fn notify(&self, notification: &Notification) {
242 if let Err(e) = self.console.send(notification).await {
244 tracing::warn!("Console notification failed: {}", e);
245 }
246
247 if let Some(ref webhook) = self.webhook {
249 if let Err(e) = webhook.send(notification).await {
250 tracing::warn!("Webhook notification failed: {}", e);
251 }
252 }
253 }
254
255 pub async fn info(&self, title: impl Into<String>, body: impl Into<String>) {
257 self.notify(&Notification::new(NotificationLevel::Info, title, body))
258 .await;
259 }
260
261 pub async fn error(&self, title: impl Into<String>, body: impl Into<String>) {
263 self.notify(&Notification::new(NotificationLevel::Error, title, body))
264 .await;
265 }
266
267 pub async fn success(&self, title: impl Into<String>, body: impl Into<String>) {
269 self.notify(&Notification::new(NotificationLevel::Success, title, body))
270 .await;
271 }
272}
273
274impl Default for NotificationDispatcher {
275 fn default() -> Self {
276 Self::new()
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283 use crate::platform::{
284 ApprovalRequest, OutgoingMessage, ProgressUpdate,
285 };
286 use std::sync::Mutex;
287
288 struct MockPlatform {
289 notifications: Arc<Mutex<Vec<(bool, String)>>>,
290 messages: Arc<Mutex<Vec<String>>>,
291 }
292
293 impl MockPlatform {
294 fn new() -> Self {
295 Self {
296 notifications: Arc::new(Mutex::new(Vec::new())),
297 messages: Arc::new(Mutex::new(Vec::new())),
298 }
299 }
300 }
301
302 impl ChatPlatform for MockPlatform {
303 async fn send_message(&self, msg: &OutgoingMessage) -> anyhow::Result<String> {
304 self.messages.lock().unwrap().push(msg.text.clone());
305 Ok("ts".into())
306 }
307 async fn send_approval(
308 &self,
309 _channel_id: &str,
310 _request: &ApprovalRequest,
311 ) -> anyhow::Result<String> {
312 Ok("ts".into())
313 }
314 async fn update_message(
315 &self,
316 _channel_id: &str,
317 _message_id: &str,
318 _text: &str,
319 ) -> anyhow::Result<()> {
320 Ok(())
321 }
322 async fn add_reaction(
323 &self,
324 _channel_id: &str,
325 _message_id: &str,
326 _emoji: &str,
327 ) -> anyhow::Result<()> {
328 Ok(())
329 }
330 async fn send_progress(
331 &self,
332 _channel_id: &str,
333 _thread_id: &str,
334 _progress: &ProgressUpdate,
335 ) -> anyhow::Result<String> {
336 Ok("ts".into())
337 }
338 async fn send_notification(
339 &self,
340 _channel_id: &str,
341 _thread_id: Option<&str>,
342 notification: &WorkflowNotification,
343 ) -> anyhow::Result<String> {
344 self.notifications
345 .lock()
346 .unwrap()
347 .push((notification.success, notification.workflow_id.clone()));
348 Ok("ts".into())
349 }
350 async fn start_thread(
351 &self,
352 _channel_id: &str,
353 _execution_id: &str,
354 _workflow_id: &str,
355 _total_steps: usize,
356 _shadow: bool,
357 ) -> anyhow::Result<String> {
358 Ok("thread-ts".into())
359 }
360 }
361
362 #[tokio::test]
363 async fn test_chat_notification_sink_success() {
364 let platform = Arc::new(MockPlatform::new());
365 let sink = ChatNotificationSink::new(platform.clone(), "#ops".into());
366
367 let notif = WorkflowNotification {
368 execution_id: "e1".into(),
369 workflow_id: "deploy".into(),
370 success: true,
371 steps_completed: 3,
372 total_steps: 3,
373 duration_ms: 1500,
374 error: None,
375 };
376 sink.notify_workflow(¬if, None).await.unwrap();
377
378 let notifs = platform.notifications.lock().unwrap();
379 assert_eq!(notifs.len(), 1);
380 assert!(notifs[0].0);
381 assert_eq!(notifs[0].1, "deploy");
382 }
383
384 #[tokio::test]
385 async fn test_chat_notification_sink_failure() {
386 let platform = Arc::new(MockPlatform::new());
387 let sink = ChatNotificationSink::new(platform.clone(), "#ops".into());
388
389 let notif = WorkflowNotification {
390 execution_id: "e1".into(),
391 workflow_id: "deploy".into(),
392 success: false,
393 steps_completed: 1,
394 total_steps: 3,
395 duration_ms: 500,
396 error: Some("step failed".into()),
397 };
398 sink.notify_workflow(¬if, None).await.unwrap();
399
400 let notifs = platform.notifications.lock().unwrap();
401 assert_eq!(notifs.len(), 1);
402 assert!(!notifs[0].0);
403 }
404
405 #[tokio::test]
406 async fn test_chat_notification_sink_generic() {
407 let platform = Arc::new(MockPlatform::new());
408 let sink = ChatNotificationSink::new(platform.clone(), "#ops".into());
409
410 let notification =
411 Notification::new(NotificationLevel::Info, "Test", "hello world");
412 sink.send(¬ification).await.unwrap();
413
414 let msgs = platform.messages.lock().unwrap();
415 assert_eq!(msgs.len(), 1);
416 assert!(msgs[0].contains("Test"));
417 }
418
419 #[tokio::test]
420 async fn test_dispatcher() {
421 let dispatcher = NotificationDispatcher::new();
422 dispatcher.info("Test", "hello").await;
424 dispatcher.error("Fail", "oops").await;
425 dispatcher.success("Done", "all good").await;
426 }
427
428 #[test]
429 fn test_notification_with_meta() {
430 let n = Notification::new(NotificationLevel::Info, "Test", "body")
431 .with_meta("workflow", "deploy")
432 .with_meta("step", "3");
433 assert_eq!(n.metadata.len(), 2);
434 assert_eq!(n.metadata["workflow"], "deploy");
435 }
436
437 #[test]
438 fn test_notification_level_display() {
439 assert_eq!(NotificationLevel::Info.to_string(), "INFO");
440 assert_eq!(NotificationLevel::Error.to_string(), "ERROR");
441 assert_eq!(NotificationLevel::Success.to_string(), "OK");
442 assert_eq!(NotificationLevel::Warning.to_string(), "WARN");
443 }
444}