Skip to main content

camgrab_core/notify/
mod.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use std::path::PathBuf;
5use std::time::Duration;
6use thiserror::Error;
7
8/// Notification event that occurred in the system
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct NotificationEvent {
11    pub camera_name: String,
12    pub event_type: EventType,
13    pub timestamp: DateTime<Utc>,
14    pub score: Option<f64>,
15    pub image_path: Option<PathBuf>,
16    pub message: String,
17}
18
19/// Types of events that can trigger notifications
20#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
21#[serde(rename_all = "snake_case")]
22pub enum EventType {
23    MotionDetected,
24    CameraOffline,
25    CameraOnline,
26    RecordingStarted,
27    RecordingStopped,
28    HealthCheckFailed,
29}
30
31/// HTTP methods supported by webhook notifier
32#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
33#[serde(rename_all = "UPPERCASE")]
34pub enum HttpMethod {
35    POST,
36    PUT,
37}
38
39/// Errors that can occur during notification
40#[derive(Debug, Error)]
41pub enum NotifyError {
42    #[error("Webhook request failed: {0}")]
43    WebhookFailed(String),
44
45    #[error("MQTT publish failed: {0}")]
46    MqttFailed(String),
47
48    #[error("Email send failed: {0}")]
49    EmailFailed(String),
50
51    #[error("Notification timeout")]
52    Timeout,
53
54    #[error("Configuration error: {0}")]
55    ConfigError(String),
56}
57
58/// Trait for notification backends
59#[async_trait::async_trait]
60pub trait Notifier: Send + Sync {
61    /// Send a notification event
62    async fn send(&self, event: &NotificationEvent) -> Result<(), NotifyError>;
63
64    /// Get the name of this notifier
65    fn name(&self) -> &str;
66}
67
68/// Webhook notifier that sends HTTP requests
69pub struct WebhookNotifier {
70    url: String,
71    headers: HashMap<String, String>,
72    method: HttpMethod,
73    client: reqwest::Client,
74}
75
76impl WebhookNotifier {
77    pub fn new(
78        url: String,
79        headers: HashMap<String, String>,
80        method: HttpMethod,
81    ) -> Result<Self, NotifyError> {
82        let client = reqwest::Client::builder()
83            .timeout(Duration::from_secs(10))
84            .build()
85            .map_err(|e| NotifyError::ConfigError(format!("Failed to create HTTP client: {e}")))?;
86
87        Ok(Self {
88            url,
89            headers,
90            method,
91            client,
92        })
93    }
94
95    async fn send_with_retry(
96        &self,
97        event: &NotificationEvent,
98        attempts: u32,
99    ) -> Result<(), NotifyError> {
100        let mut last_error = None;
101
102        for attempt in 0..attempts {
103            if attempt > 0 {
104                // Exponential backoff: 1s, 2s, 4s
105                let delay = Duration::from_secs(2_u64.pow(attempt - 1));
106                tokio::time::sleep(delay).await;
107            }
108
109            let mut request = match self.method {
110                HttpMethod::POST => self.client.post(&self.url),
111                HttpMethod::PUT => self.client.put(&self.url),
112            };
113
114            // Add custom headers
115            for (key, value) in &self.headers {
116                request = request.header(key, value);
117            }
118
119            match request.json(event).send().await {
120                Ok(response) => {
121                    if response.status().is_success() {
122                        return Ok(());
123                    }
124                    last_error = Some(format!(
125                        "HTTP {}: {}",
126                        response.status(),
127                        response.text().await.unwrap_or_default()
128                    ));
129                }
130                Err(e) => {
131                    last_error = Some(e.to_string());
132                }
133            }
134        }
135
136        Err(NotifyError::WebhookFailed(
137            last_error.unwrap_or_else(|| "Unknown error".to_string()),
138        ))
139    }
140}
141
142#[async_trait::async_trait]
143impl Notifier for WebhookNotifier {
144    async fn send(&self, event: &NotificationEvent) -> Result<(), NotifyError> {
145        self.send_with_retry(event, 3).await
146    }
147
148    fn name(&self) -> &str {
149        "webhook"
150    }
151}
152
153/// MQTT notifier that publishes messages to a broker
154pub struct MqttNotifier {
155    broker: String,
156    port: u16,
157    topic: String,
158    client_id: String,
159    username: Option<String>,
160    password: Option<String>,
161    qos: rumqttc::QoS,
162}
163
164impl MqttNotifier {
165    #[allow(clippy::too_many_arguments)]
166    pub fn new(
167        broker: String,
168        port: u16,
169        topic: String,
170        client_id: String,
171        username: Option<String>,
172        password: Option<String>,
173        qos: u8,
174    ) -> Result<Self, NotifyError> {
175        let qos = match qos {
176            0 => rumqttc::QoS::AtMostOnce,
177            1 => rumqttc::QoS::AtLeastOnce,
178            2 => rumqttc::QoS::ExactlyOnce,
179            _ => {
180                return Err(NotifyError::ConfigError(format!(
181                    "Invalid QoS level: {qos}"
182                )))
183            }
184        };
185
186        Ok(Self {
187            broker,
188            port,
189            topic,
190            client_id,
191            username,
192            password,
193            qos,
194        })
195    }
196}
197
198#[async_trait::async_trait]
199impl Notifier for MqttNotifier {
200    async fn send(&self, event: &NotificationEvent) -> Result<(), NotifyError> {
201        let mut options = rumqttc::MqttOptions::new(&self.client_id, &self.broker, self.port);
202
203        if let (Some(username), Some(password)) = (&self.username, &self.password) {
204            options.set_credentials(username, password);
205        }
206
207        options.set_keep_alive(Duration::from_secs(30));
208
209        let (client, mut eventloop) = rumqttc::AsyncClient::new(options, 10);
210
211        // Serialize event to JSON
212        let payload = serde_json::to_vec(event)
213            .map_err(|e| NotifyError::MqttFailed(format!("Failed to serialize event: {e}")))?;
214
215        // Publish message
216        client
217            .publish(&self.topic, self.qos, false, payload)
218            .await
219            .map_err(|e| NotifyError::MqttFailed(format!("Failed to publish: {e}")))?;
220
221        // Wait for publish to complete
222        tokio::time::timeout(Duration::from_secs(5), async {
223            loop {
224                match eventloop.poll().await {
225                    Ok(rumqttc::Event::Outgoing(rumqttc::Outgoing::Publish(_))) => {
226                        return Ok(());
227                    }
228                    Ok(_) => continue,
229                    Err(e) => {
230                        return Err(NotifyError::MqttFailed(format!("Event loop error: {e}")))
231                    }
232                }
233            }
234        })
235        .await
236        .map_err(|_| NotifyError::Timeout)?
237    }
238
239    fn name(&self) -> &str {
240        "mqtt"
241    }
242}
243
244/// Email notifier that sends SMTP emails
245pub struct EmailNotifier {
246    smtp_host: String,
247    smtp_port: u16,
248    from: String,
249    to: Vec<String>,
250    username: String,
251    password: String,
252}
253
254impl EmailNotifier {
255    #[allow(clippy::too_many_arguments)]
256    pub fn new(
257        smtp_host: String,
258        smtp_port: u16,
259        from: String,
260        to: Vec<String>,
261        username: String,
262        password: String,
263    ) -> Self {
264        Self {
265            smtp_host,
266            smtp_port,
267            from,
268            to,
269            username,
270            password,
271        }
272    }
273
274    fn format_html_email(event: &NotificationEvent) -> String {
275        let event_type_str = match event.event_type {
276            EventType::MotionDetected => "Motion Detected",
277            EventType::CameraOffline => "Camera Offline",
278            EventType::CameraOnline => "Camera Online",
279            EventType::RecordingStarted => "Recording Started",
280            EventType::RecordingStopped => "Recording Stopped",
281            EventType::HealthCheckFailed => "Health Check Failed",
282        };
283
284        let score_html = event
285            .score
286            .map(|s| format!("<p><strong>Score:</strong> {s:.2}</p>"))
287            .unwrap_or_default();
288
289        let image_html = event
290            .image_path
291            .as_ref()
292            .map(|path| format!("<p><strong>Image:</strong> {}</p>", path.display()))
293            .unwrap_or_default();
294
295        format!(
296            r#"<!DOCTYPE html>
297<html>
298<head>
299    <style>
300        body {{ font-family: Arial, sans-serif; margin: 20px; }}
301        .header {{ background-color: #f0f0f0; padding: 10px; border-radius: 5px; }}
302        .content {{ margin-top: 20px; }}
303        .footer {{ margin-top: 20px; color: #666; font-size: 12px; }}
304    </style>
305</head>
306<body>
307    <div class="header">
308        <h2>Camera Alert: {}</h2>
309    </div>
310    <div class="content">
311        <p><strong>Camera:</strong> {}</p>
312        <p><strong>Event:</strong> {}</p>
313        <p><strong>Time:</strong> {}</p>
314        {}
315        {}
316        <p><strong>Message:</strong> {}</p>
317    </div>
318    <div class="footer">
319        <p>This is an automated notification from camgrab</p>
320    </div>
321</body>
322</html>"#,
323            event_type_str,
324            event.camera_name,
325            event_type_str,
326            event.timestamp.format("%Y-%m-%d %H:%M:%S UTC"),
327            score_html,
328            image_html,
329            event.message
330        )
331    }
332}
333
334#[async_trait::async_trait]
335impl Notifier for EmailNotifier {
336    async fn send(&self, event: &NotificationEvent) -> Result<(), NotifyError> {
337        use lettre::message::header::ContentType;
338        use lettre::transport::smtp::authentication::Credentials;
339        use lettre::{AsyncSmtpTransport, AsyncTransport, Message, Tokio1Executor};
340
341        let subject = format!(
342            "[camgrab] {} - {}",
343            event.camera_name,
344            match event.event_type {
345                EventType::MotionDetected => "Motion Detected",
346                EventType::CameraOffline => "Camera Offline",
347                EventType::CameraOnline => "Camera Online",
348                EventType::RecordingStarted => "Recording Started",
349                EventType::RecordingStopped => "Recording Stopped",
350                EventType::HealthCheckFailed => "Health Check Failed",
351            }
352        );
353
354        let html_body = Self::format_html_email(event);
355
356        // Build email for each recipient
357        for recipient in &self.to {
358            let email =
359                Message::builder()
360                    .from(self.from.parse().map_err(|e| {
361                        NotifyError::EmailFailed(format!("Invalid from address: {e}"))
362                    })?)
363                    .to(recipient.parse().map_err(|e| {
364                        NotifyError::EmailFailed(format!("Invalid to address: {e}"))
365                    })?)
366                    .subject(&subject)
367                    .header(ContentType::TEXT_HTML)
368                    .body(html_body.clone())
369                    .map_err(|e| NotifyError::EmailFailed(format!("Failed to build email: {e}")))?;
370
371            let creds = Credentials::new(self.username.clone(), self.password.clone());
372
373            let mailer = AsyncSmtpTransport::<Tokio1Executor>::starttls_relay(&self.smtp_host)
374                .map_err(|e| NotifyError::EmailFailed(format!("Failed to create transport: {e}")))?
375                .port(self.smtp_port)
376                .credentials(creds)
377                .build();
378
379            mailer
380                .send(email)
381                .await
382                .map_err(|e| NotifyError::EmailFailed(format!("Failed to send email: {e}")))?;
383        }
384
385        Ok(())
386    }
387
388    fn name(&self) -> &str {
389        "email"
390    }
391}
392
393/// Router that manages multiple notifiers
394pub struct NotificationRouter {
395    notifiers: Vec<Box<dyn Notifier + Send + Sync>>,
396}
397
398impl NotificationRouter {
399    pub fn new() -> Self {
400        Self {
401            notifiers: Vec::new(),
402        }
403    }
404
405    /// Add a notifier to the router
406    pub fn add(&mut self, notifier: Box<dyn Notifier + Send + Sync>) {
407        self.notifiers.push(notifier);
408    }
409
410    /// Broadcast an event to all notifiers
411    pub async fn broadcast(&self, event: &NotificationEvent) -> Vec<Result<(), NotifyError>> {
412        let mut results = Vec::new();
413
414        for notifier in &self.notifiers {
415            results.push(notifier.send(event).await);
416        }
417
418        results
419    }
420
421    /// Send an event to a specific notifier by name
422    pub async fn send_to(&self, name: &str, event: &NotificationEvent) -> Result<(), NotifyError> {
423        for notifier in &self.notifiers {
424            if notifier.name() == name {
425                return notifier.send(event).await;
426            }
427        }
428
429        Err(NotifyError::ConfigError(format!(
430            "Notifier not found: {name}"
431        )))
432    }
433}
434
435impl Default for NotificationRouter {
436    fn default() -> Self {
437        Self::new()
438    }
439}
440
441#[cfg(test)]
442mod tests {
443    use super::*;
444    use std::sync::{Arc, Mutex};
445
446    // Mock notifier for testing
447    struct MockNotifier {
448        name: String,
449        calls: Arc<Mutex<Vec<NotificationEvent>>>,
450        should_fail: bool,
451    }
452
453    impl MockNotifier {
454        fn new(name: &str) -> Self {
455            Self {
456                name: name.to_string(),
457                calls: Arc::new(Mutex::new(Vec::new())),
458                should_fail: false,
459            }
460        }
461
462        fn new_failing(name: &str) -> Self {
463            Self {
464                name: name.to_string(),
465                calls: Arc::new(Mutex::new(Vec::new())),
466                should_fail: true,
467            }
468        }
469
470        #[allow(dead_code)]
471        fn get_calls(&self) -> Vec<NotificationEvent> {
472            self.calls.lock().unwrap().clone()
473        }
474    }
475
476    #[async_trait::async_trait]
477    impl Notifier for MockNotifier {
478        async fn send(&self, event: &NotificationEvent) -> Result<(), NotifyError> {
479            self.calls.lock().unwrap().push(event.clone());
480
481            if self.should_fail {
482                Err(NotifyError::WebhookFailed("Mock failure".to_string()))
483            } else {
484                Ok(())
485            }
486        }
487
488        fn name(&self) -> &str {
489            &self.name
490        }
491    }
492
493    #[test]
494    fn test_event_type_serialization() {
495        let event_type = EventType::MotionDetected;
496        let json = serde_json::to_string(&event_type).unwrap();
497        assert_eq!(json, r#""motion_detected""#);
498
499        let deserialized: EventType = serde_json::from_str(&json).unwrap();
500        assert_eq!(deserialized, EventType::MotionDetected);
501    }
502
503    #[tokio::test]
504    async fn test_notification_router_broadcast() {
505        let mut router = NotificationRouter::new();
506
507        let notifier1 = MockNotifier::new("test1");
508        let notifier2 = MockNotifier::new("test2");
509
510        let calls1 = notifier1.calls.clone();
511        let calls2 = notifier2.calls.clone();
512
513        router.add(Box::new(notifier1));
514        router.add(Box::new(notifier2));
515
516        let event = NotificationEvent {
517            camera_name: "test-camera".to_string(),
518            event_type: EventType::MotionDetected,
519            timestamp: Utc::now(),
520            score: Some(0.95),
521            image_path: Some(PathBuf::from("/tmp/test.jpg")),
522            message: "Motion detected".to_string(),
523        };
524
525        let results = router.broadcast(&event).await;
526
527        assert_eq!(results.len(), 2);
528        assert!(results[0].is_ok());
529        assert!(results[1].is_ok());
530
531        assert_eq!(calls1.lock().unwrap().len(), 1);
532        assert_eq!(calls2.lock().unwrap().len(), 1);
533    }
534
535    #[tokio::test]
536    async fn test_notification_router_send_to() {
537        let mut router = NotificationRouter::new();
538
539        let notifier1 = MockNotifier::new("test1");
540        let notifier2 = MockNotifier::new("test2");
541
542        let calls1 = notifier1.calls.clone();
543        let calls2 = notifier2.calls.clone();
544
545        router.add(Box::new(notifier1));
546        router.add(Box::new(notifier2));
547
548        let event = NotificationEvent {
549            camera_name: "test-camera".to_string(),
550            event_type: EventType::CameraOffline,
551            timestamp: Utc::now(),
552            score: None,
553            image_path: None,
554            message: "Camera went offline".to_string(),
555        };
556
557        let result = router.send_to("test2", &event).await;
558        assert!(result.is_ok());
559
560        assert_eq!(calls1.lock().unwrap().len(), 0);
561        assert_eq!(calls2.lock().unwrap().len(), 1);
562    }
563
564    #[tokio::test]
565    async fn test_notification_router_not_found() {
566        let router = NotificationRouter::new();
567
568        let event = NotificationEvent {
569            camera_name: "test-camera".to_string(),
570            event_type: EventType::CameraOnline,
571            timestamp: Utc::now(),
572            score: None,
573            image_path: None,
574            message: "Camera came online".to_string(),
575        };
576
577        let result = router.send_to("nonexistent", &event).await;
578        assert!(result.is_err());
579        assert!(matches!(result, Err(NotifyError::ConfigError(_))));
580    }
581
582    #[tokio::test]
583    async fn test_notification_router_partial_failure() {
584        let mut router = NotificationRouter::new();
585
586        router.add(Box::new(MockNotifier::new("success")));
587        router.add(Box::new(MockNotifier::new_failing("failure")));
588
589        let event = NotificationEvent {
590            camera_name: "test-camera".to_string(),
591            event_type: EventType::RecordingStarted,
592            timestamp: Utc::now(),
593            score: None,
594            image_path: None,
595            message: "Recording started".to_string(),
596        };
597
598        let results = router.broadcast(&event).await;
599
600        assert_eq!(results.len(), 2);
601        assert!(results[0].is_ok());
602        assert!(results[1].is_err());
603    }
604}