1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use std::path::PathBuf;
5use std::time::Duration;
6use thiserror::Error;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct NotificationEvent {
11 pub camera_name: String,
12 pub event_type: EventType,
13 pub timestamp: DateTime<Utc>,
14 pub score: Option<f64>,
15 pub image_path: Option<PathBuf>,
16 pub message: String,
17}
18
19#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
21#[serde(rename_all = "snake_case")]
22pub enum EventType {
23 MotionDetected,
24 CameraOffline,
25 CameraOnline,
26 RecordingStarted,
27 RecordingStopped,
28 HealthCheckFailed,
29}
30
31#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
33#[serde(rename_all = "UPPERCASE")]
34pub enum HttpMethod {
35 POST,
36 PUT,
37}
38
39#[derive(Debug, Error)]
41pub enum NotifyError {
42 #[error("Webhook request failed: {0}")]
43 WebhookFailed(String),
44
45 #[error("MQTT publish failed: {0}")]
46 MqttFailed(String),
47
48 #[error("Email send failed: {0}")]
49 EmailFailed(String),
50
51 #[error("Notification timeout")]
52 Timeout,
53
54 #[error("Configuration error: {0}")]
55 ConfigError(String),
56}
57
58#[async_trait::async_trait]
60pub trait Notifier: Send + Sync {
61 async fn send(&self, event: &NotificationEvent) -> Result<(), NotifyError>;
63
64 fn name(&self) -> &str;
66}
67
68pub struct WebhookNotifier {
70 url: String,
71 headers: HashMap<String, String>,
72 method: HttpMethod,
73 client: reqwest::Client,
74}
75
76impl WebhookNotifier {
77 pub fn new(
78 url: String,
79 headers: HashMap<String, String>,
80 method: HttpMethod,
81 ) -> Result<Self, NotifyError> {
82 let client = reqwest::Client::builder()
83 .timeout(Duration::from_secs(10))
84 .build()
85 .map_err(|e| NotifyError::ConfigError(format!("Failed to create HTTP client: {e}")))?;
86
87 Ok(Self {
88 url,
89 headers,
90 method,
91 client,
92 })
93 }
94
95 async fn send_with_retry(
96 &self,
97 event: &NotificationEvent,
98 attempts: u32,
99 ) -> Result<(), NotifyError> {
100 let mut last_error = None;
101
102 for attempt in 0..attempts {
103 if attempt > 0 {
104 let delay = Duration::from_secs(2_u64.pow(attempt - 1));
106 tokio::time::sleep(delay).await;
107 }
108
109 let mut request = match self.method {
110 HttpMethod::POST => self.client.post(&self.url),
111 HttpMethod::PUT => self.client.put(&self.url),
112 };
113
114 for (key, value) in &self.headers {
116 request = request.header(key, value);
117 }
118
119 match request.json(event).send().await {
120 Ok(response) => {
121 if response.status().is_success() {
122 return Ok(());
123 }
124 last_error = Some(format!(
125 "HTTP {}: {}",
126 response.status(),
127 response.text().await.unwrap_or_default()
128 ));
129 }
130 Err(e) => {
131 last_error = Some(e.to_string());
132 }
133 }
134 }
135
136 Err(NotifyError::WebhookFailed(
137 last_error.unwrap_or_else(|| "Unknown error".to_string()),
138 ))
139 }
140}
141
142#[async_trait::async_trait]
143impl Notifier for WebhookNotifier {
144 async fn send(&self, event: &NotificationEvent) -> Result<(), NotifyError> {
145 self.send_with_retry(event, 3).await
146 }
147
148 fn name(&self) -> &str {
149 "webhook"
150 }
151}
152
153pub struct MqttNotifier {
155 broker: String,
156 port: u16,
157 topic: String,
158 client_id: String,
159 username: Option<String>,
160 password: Option<String>,
161 qos: rumqttc::QoS,
162}
163
164impl MqttNotifier {
165 #[allow(clippy::too_many_arguments)]
166 pub fn new(
167 broker: String,
168 port: u16,
169 topic: String,
170 client_id: String,
171 username: Option<String>,
172 password: Option<String>,
173 qos: u8,
174 ) -> Result<Self, NotifyError> {
175 let qos = match qos {
176 0 => rumqttc::QoS::AtMostOnce,
177 1 => rumqttc::QoS::AtLeastOnce,
178 2 => rumqttc::QoS::ExactlyOnce,
179 _ => {
180 return Err(NotifyError::ConfigError(format!(
181 "Invalid QoS level: {qos}"
182 )))
183 }
184 };
185
186 Ok(Self {
187 broker,
188 port,
189 topic,
190 client_id,
191 username,
192 password,
193 qos,
194 })
195 }
196}
197
198#[async_trait::async_trait]
199impl Notifier for MqttNotifier {
200 async fn send(&self, event: &NotificationEvent) -> Result<(), NotifyError> {
201 let mut options = rumqttc::MqttOptions::new(&self.client_id, &self.broker, self.port);
202
203 if let (Some(username), Some(password)) = (&self.username, &self.password) {
204 options.set_credentials(username, password);
205 }
206
207 options.set_keep_alive(Duration::from_secs(30));
208
209 let (client, mut eventloop) = rumqttc::AsyncClient::new(options, 10);
210
211 let payload = serde_json::to_vec(event)
213 .map_err(|e| NotifyError::MqttFailed(format!("Failed to serialize event: {e}")))?;
214
215 client
217 .publish(&self.topic, self.qos, false, payload)
218 .await
219 .map_err(|e| NotifyError::MqttFailed(format!("Failed to publish: {e}")))?;
220
221 tokio::time::timeout(Duration::from_secs(5), async {
223 loop {
224 match eventloop.poll().await {
225 Ok(rumqttc::Event::Outgoing(rumqttc::Outgoing::Publish(_))) => {
226 return Ok(());
227 }
228 Ok(_) => continue,
229 Err(e) => {
230 return Err(NotifyError::MqttFailed(format!("Event loop error: {e}")))
231 }
232 }
233 }
234 })
235 .await
236 .map_err(|_| NotifyError::Timeout)?
237 }
238
239 fn name(&self) -> &str {
240 "mqtt"
241 }
242}
243
244pub struct EmailNotifier {
246 smtp_host: String,
247 smtp_port: u16,
248 from: String,
249 to: Vec<String>,
250 username: String,
251 password: String,
252}
253
254impl EmailNotifier {
255 #[allow(clippy::too_many_arguments)]
256 pub fn new(
257 smtp_host: String,
258 smtp_port: u16,
259 from: String,
260 to: Vec<String>,
261 username: String,
262 password: String,
263 ) -> Self {
264 Self {
265 smtp_host,
266 smtp_port,
267 from,
268 to,
269 username,
270 password,
271 }
272 }
273
274 fn format_html_email(event: &NotificationEvent) -> String {
275 let event_type_str = match event.event_type {
276 EventType::MotionDetected => "Motion Detected",
277 EventType::CameraOffline => "Camera Offline",
278 EventType::CameraOnline => "Camera Online",
279 EventType::RecordingStarted => "Recording Started",
280 EventType::RecordingStopped => "Recording Stopped",
281 EventType::HealthCheckFailed => "Health Check Failed",
282 };
283
284 let score_html = event
285 .score
286 .map(|s| format!("<p><strong>Score:</strong> {s:.2}</p>"))
287 .unwrap_or_default();
288
289 let image_html = event
290 .image_path
291 .as_ref()
292 .map(|path| format!("<p><strong>Image:</strong> {}</p>", path.display()))
293 .unwrap_or_default();
294
295 format!(
296 r#"<!DOCTYPE html>
297<html>
298<head>
299 <style>
300 body {{ font-family: Arial, sans-serif; margin: 20px; }}
301 .header {{ background-color: #f0f0f0; padding: 10px; border-radius: 5px; }}
302 .content {{ margin-top: 20px; }}
303 .footer {{ margin-top: 20px; color: #666; font-size: 12px; }}
304 </style>
305</head>
306<body>
307 <div class="header">
308 <h2>Camera Alert: {}</h2>
309 </div>
310 <div class="content">
311 <p><strong>Camera:</strong> {}</p>
312 <p><strong>Event:</strong> {}</p>
313 <p><strong>Time:</strong> {}</p>
314 {}
315 {}
316 <p><strong>Message:</strong> {}</p>
317 </div>
318 <div class="footer">
319 <p>This is an automated notification from camgrab</p>
320 </div>
321</body>
322</html>"#,
323 event_type_str,
324 event.camera_name,
325 event_type_str,
326 event.timestamp.format("%Y-%m-%d %H:%M:%S UTC"),
327 score_html,
328 image_html,
329 event.message
330 )
331 }
332}
333
334#[async_trait::async_trait]
335impl Notifier for EmailNotifier {
336 async fn send(&self, event: &NotificationEvent) -> Result<(), NotifyError> {
337 use lettre::message::header::ContentType;
338 use lettre::transport::smtp::authentication::Credentials;
339 use lettre::{AsyncSmtpTransport, AsyncTransport, Message, Tokio1Executor};
340
341 let subject = format!(
342 "[camgrab] {} - {}",
343 event.camera_name,
344 match event.event_type {
345 EventType::MotionDetected => "Motion Detected",
346 EventType::CameraOffline => "Camera Offline",
347 EventType::CameraOnline => "Camera Online",
348 EventType::RecordingStarted => "Recording Started",
349 EventType::RecordingStopped => "Recording Stopped",
350 EventType::HealthCheckFailed => "Health Check Failed",
351 }
352 );
353
354 let html_body = Self::format_html_email(event);
355
356 for recipient in &self.to {
358 let email =
359 Message::builder()
360 .from(self.from.parse().map_err(|e| {
361 NotifyError::EmailFailed(format!("Invalid from address: {e}"))
362 })?)
363 .to(recipient.parse().map_err(|e| {
364 NotifyError::EmailFailed(format!("Invalid to address: {e}"))
365 })?)
366 .subject(&subject)
367 .header(ContentType::TEXT_HTML)
368 .body(html_body.clone())
369 .map_err(|e| NotifyError::EmailFailed(format!("Failed to build email: {e}")))?;
370
371 let creds = Credentials::new(self.username.clone(), self.password.clone());
372
373 let mailer = AsyncSmtpTransport::<Tokio1Executor>::starttls_relay(&self.smtp_host)
374 .map_err(|e| NotifyError::EmailFailed(format!("Failed to create transport: {e}")))?
375 .port(self.smtp_port)
376 .credentials(creds)
377 .build();
378
379 mailer
380 .send(email)
381 .await
382 .map_err(|e| NotifyError::EmailFailed(format!("Failed to send email: {e}")))?;
383 }
384
385 Ok(())
386 }
387
388 fn name(&self) -> &str {
389 "email"
390 }
391}
392
393pub struct NotificationRouter {
395 notifiers: Vec<Box<dyn Notifier + Send + Sync>>,
396}
397
398impl NotificationRouter {
399 pub fn new() -> Self {
400 Self {
401 notifiers: Vec::new(),
402 }
403 }
404
405 pub fn add(&mut self, notifier: Box<dyn Notifier + Send + Sync>) {
407 self.notifiers.push(notifier);
408 }
409
410 pub async fn broadcast(&self, event: &NotificationEvent) -> Vec<Result<(), NotifyError>> {
412 let mut results = Vec::new();
413
414 for notifier in &self.notifiers {
415 results.push(notifier.send(event).await);
416 }
417
418 results
419 }
420
421 pub async fn send_to(&self, name: &str, event: &NotificationEvent) -> Result<(), NotifyError> {
423 for notifier in &self.notifiers {
424 if notifier.name() == name {
425 return notifier.send(event).await;
426 }
427 }
428
429 Err(NotifyError::ConfigError(format!(
430 "Notifier not found: {name}"
431 )))
432 }
433}
434
435impl Default for NotificationRouter {
436 fn default() -> Self {
437 Self::new()
438 }
439}
440
441#[cfg(test)]
442mod tests {
443 use super::*;
444 use std::sync::{Arc, Mutex};
445
446 struct MockNotifier {
448 name: String,
449 calls: Arc<Mutex<Vec<NotificationEvent>>>,
450 should_fail: bool,
451 }
452
453 impl MockNotifier {
454 fn new(name: &str) -> Self {
455 Self {
456 name: name.to_string(),
457 calls: Arc::new(Mutex::new(Vec::new())),
458 should_fail: false,
459 }
460 }
461
462 fn new_failing(name: &str) -> Self {
463 Self {
464 name: name.to_string(),
465 calls: Arc::new(Mutex::new(Vec::new())),
466 should_fail: true,
467 }
468 }
469
470 #[allow(dead_code)]
471 fn get_calls(&self) -> Vec<NotificationEvent> {
472 self.calls.lock().unwrap().clone()
473 }
474 }
475
476 #[async_trait::async_trait]
477 impl Notifier for MockNotifier {
478 async fn send(&self, event: &NotificationEvent) -> Result<(), NotifyError> {
479 self.calls.lock().unwrap().push(event.clone());
480
481 if self.should_fail {
482 Err(NotifyError::WebhookFailed("Mock failure".to_string()))
483 } else {
484 Ok(())
485 }
486 }
487
488 fn name(&self) -> &str {
489 &self.name
490 }
491 }
492
493 #[test]
494 fn test_event_type_serialization() {
495 let event_type = EventType::MotionDetected;
496 let json = serde_json::to_string(&event_type).unwrap();
497 assert_eq!(json, r#""motion_detected""#);
498
499 let deserialized: EventType = serde_json::from_str(&json).unwrap();
500 assert_eq!(deserialized, EventType::MotionDetected);
501 }
502
503 #[tokio::test]
504 async fn test_notification_router_broadcast() {
505 let mut router = NotificationRouter::new();
506
507 let notifier1 = MockNotifier::new("test1");
508 let notifier2 = MockNotifier::new("test2");
509
510 let calls1 = notifier1.calls.clone();
511 let calls2 = notifier2.calls.clone();
512
513 router.add(Box::new(notifier1));
514 router.add(Box::new(notifier2));
515
516 let event = NotificationEvent {
517 camera_name: "test-camera".to_string(),
518 event_type: EventType::MotionDetected,
519 timestamp: Utc::now(),
520 score: Some(0.95),
521 image_path: Some(PathBuf::from("/tmp/test.jpg")),
522 message: "Motion detected".to_string(),
523 };
524
525 let results = router.broadcast(&event).await;
526
527 assert_eq!(results.len(), 2);
528 assert!(results[0].is_ok());
529 assert!(results[1].is_ok());
530
531 assert_eq!(calls1.lock().unwrap().len(), 1);
532 assert_eq!(calls2.lock().unwrap().len(), 1);
533 }
534
535 #[tokio::test]
536 async fn test_notification_router_send_to() {
537 let mut router = NotificationRouter::new();
538
539 let notifier1 = MockNotifier::new("test1");
540 let notifier2 = MockNotifier::new("test2");
541
542 let calls1 = notifier1.calls.clone();
543 let calls2 = notifier2.calls.clone();
544
545 router.add(Box::new(notifier1));
546 router.add(Box::new(notifier2));
547
548 let event = NotificationEvent {
549 camera_name: "test-camera".to_string(),
550 event_type: EventType::CameraOffline,
551 timestamp: Utc::now(),
552 score: None,
553 image_path: None,
554 message: "Camera went offline".to_string(),
555 };
556
557 let result = router.send_to("test2", &event).await;
558 assert!(result.is_ok());
559
560 assert_eq!(calls1.lock().unwrap().len(), 0);
561 assert_eq!(calls2.lock().unwrap().len(), 1);
562 }
563
564 #[tokio::test]
565 async fn test_notification_router_not_found() {
566 let router = NotificationRouter::new();
567
568 let event = NotificationEvent {
569 camera_name: "test-camera".to_string(),
570 event_type: EventType::CameraOnline,
571 timestamp: Utc::now(),
572 score: None,
573 image_path: None,
574 message: "Camera came online".to_string(),
575 };
576
577 let result = router.send_to("nonexistent", &event).await;
578 assert!(result.is_err());
579 assert!(matches!(result, Err(NotifyError::ConfigError(_))));
580 }
581
582 #[tokio::test]
583 async fn test_notification_router_partial_failure() {
584 let mut router = NotificationRouter::new();
585
586 router.add(Box::new(MockNotifier::new("success")));
587 router.add(Box::new(MockNotifier::new_failing("failure")));
588
589 let event = NotificationEvent {
590 camera_name: "test-camera".to_string(),
591 event_type: EventType::RecordingStarted,
592 timestamp: Utc::now(),
593 score: None,
594 image_path: None,
595 message: "Recording started".to_string(),
596 };
597
598 let results = router.broadcast(&event).await;
599
600 assert_eq!(results.len(), 2);
601 assert!(results[0].is_ok());
602 assert!(results[1].is_err());
603 }
604}