1#![allow(dead_code)]
12
13use crate::{ArchiveError, ArchiveResult};
14use chrono::{DateTime, Utc};
15use serde::{Deserialize, Serialize};
16use std::collections::VecDeque;
17use std::path::PathBuf;
18use std::sync::{Arc, Mutex};
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
26pub enum Severity {
27 Info,
29 Warning,
31 Error,
33 Critical,
35}
36
37impl Severity {
38 #[must_use]
40 pub fn label(&self) -> &'static str {
41 match self {
42 Self::Info => "INFO",
43 Self::Warning => "WARNING",
44 Self::Error => "ERROR",
45 Self::Critical => "CRITICAL",
46 }
47 }
48}
49
50impl std::fmt::Display for Severity {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 write!(f, "{}", self.label())
53 }
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct NotificationEvent {
63 pub event_id: String,
65 pub timestamp: DateTime<Utc>,
67 pub severity: Severity,
69 pub kind: EventKind,
71 pub message: String,
73 pub affected_files: Vec<PathBuf>,
75 pub metadata: std::collections::HashMap<String, String>,
77}
78
79#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
81pub enum EventKind {
82 FixityCheckPassed,
84 FixityCheckFailed,
86 FixityCheckOverdue,
88 FileQuarantined,
90 FileRestored,
92 IntegrityScanComplete,
94 BatchOperationError,
96 MigrationComplete,
98 Custom(String),
100}
101
102impl std::fmt::Display for EventKind {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 match self {
105 Self::FixityCheckPassed => write!(f, "fixity_check_passed"),
106 Self::FixityCheckFailed => write!(f, "fixity_check_failed"),
107 Self::FixityCheckOverdue => write!(f, "fixity_check_overdue"),
108 Self::FileQuarantined => write!(f, "file_quarantined"),
109 Self::FileRestored => write!(f, "file_restored"),
110 Self::IntegrityScanComplete => write!(f, "integrity_scan_complete"),
111 Self::BatchOperationError => write!(f, "batch_operation_error"),
112 Self::MigrationComplete => write!(f, "migration_complete"),
113 Self::Custom(s) => write!(f, "custom:{s}"),
114 }
115 }
116}
117
118impl NotificationEvent {
119 pub fn new(severity: Severity, kind: EventKind, message: impl Into<String>) -> Self {
121 use std::time::{SystemTime, UNIX_EPOCH};
122 let nanos = SystemTime::now()
123 .duration_since(UNIX_EPOCH)
124 .map(|d| d.subsec_nanos())
125 .unwrap_or(0);
126 let event_id = format!("{:016x}{:08x}", Utc::now().timestamp(), nanos);
127 Self {
128 event_id,
129 timestamp: Utc::now(),
130 severity,
131 kind,
132 message: message.into(),
133 affected_files: Vec::new(),
134 metadata: std::collections::HashMap::new(),
135 }
136 }
137
138 #[must_use]
140 pub fn with_files(mut self, files: impl IntoIterator<Item = PathBuf>) -> Self {
141 self.affected_files.extend(files);
142 self
143 }
144
145 #[must_use]
147 pub fn with_meta(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
148 self.metadata.insert(key.into(), value.into());
149 self
150 }
151}
152
153pub trait NotificationChannel: Send + Sync {
159 fn name(&self) -> &str;
161
162 fn send(&self, event: &NotificationEvent) -> ArchiveResult<bool>;
165
166 fn min_severity(&self) -> Severity {
168 Severity::Warning
169 }
170}
171
172#[derive(Debug, Clone)]
178pub struct WebhookConfig {
179 pub name: String,
181 pub url: String,
183 pub bearer_token: Option<String>,
185 pub min_severity: Severity,
187 pub timeout_secs: u64,
189 pub verify_tls: bool,
191}
192
193impl WebhookConfig {
194 pub fn new(name: impl Into<String>, url: impl Into<String>) -> Self {
196 Self {
197 name: name.into(),
198 url: url.into(),
199 bearer_token: None,
200 min_severity: Severity::Warning,
201 timeout_secs: 10,
202 verify_tls: true,
203 }
204 }
205}
206
207pub struct WebhookChannel {
214 config: WebhookConfig,
215 attempt_count: Arc<Mutex<u64>>,
217}
218
219impl WebhookChannel {
220 pub fn new(config: WebhookConfig) -> Self {
222 Self {
223 config,
224 attempt_count: Arc::new(Mutex::new(0)),
225 }
226 }
227
228 pub fn build_payload(&self, event: &NotificationEvent) -> ArchiveResult<String> {
230 serde_json::to_string(event)
231 .map_err(|e| ArchiveError::Validation(format!("JSON serialization error: {e}")))
232 }
233
234 pub fn url(&self) -> &str {
236 &self.config.url
237 }
238
239 pub fn attempt_count(&self) -> u64 {
241 self.attempt_count.lock().map(|g| *g).unwrap_or(0)
242 }
243}
244
245impl NotificationChannel for WebhookChannel {
246 fn name(&self) -> &str {
247 &self.config.name
248 }
249
250 fn min_severity(&self) -> Severity {
251 self.config.min_severity
252 }
253
254 fn send(&self, event: &NotificationEvent) -> ArchiveResult<bool> {
256 if event.severity < self.config.min_severity {
257 return Ok(false);
258 }
259 let _payload = self.build_payload(event)?;
260 if let Ok(mut count) = self.attempt_count.lock() {
261 *count = count.saturating_add(1);
262 }
263 Ok(true)
266 }
267}
268
269#[derive(Debug)]
277pub struct InMemoryChannel {
278 name: String,
279 min_severity: Severity,
280 capacity: usize,
281 events: Arc<Mutex<VecDeque<NotificationEvent>>>,
282}
283
284impl InMemoryChannel {
285 pub fn new(name: impl Into<String>, capacity: usize, min_severity: Severity) -> Self {
287 Self {
288 name: name.into(),
289 min_severity,
290 capacity,
291 events: Arc::new(Mutex::new(VecDeque::with_capacity(capacity))),
292 }
293 }
294
295 pub fn drain_events(&self) -> Vec<NotificationEvent> {
297 self.events
298 .lock()
299 .map(|mut g| g.drain(..).collect())
300 .unwrap_or_default()
301 }
302
303 pub fn len(&self) -> usize {
305 self.events.lock().map(|g| g.len()).unwrap_or(0)
306 }
307
308 pub fn is_empty(&self) -> bool {
310 self.len() == 0
311 }
312
313 pub fn last_event(&self) -> Option<NotificationEvent> {
315 self.events.lock().ok().and_then(|g| g.back().cloned())
316 }
317}
318
319impl NotificationChannel for InMemoryChannel {
320 fn name(&self) -> &str {
321 &self.name
322 }
323
324 fn min_severity(&self) -> Severity {
325 self.min_severity
326 }
327
328 fn send(&self, event: &NotificationEvent) -> ArchiveResult<bool> {
329 if event.severity < self.min_severity {
330 return Ok(false);
331 }
332 if let Ok(mut queue) = self.events.lock() {
333 if queue.len() >= self.capacity {
334 queue.pop_front();
335 }
336 queue.push_back(event.clone());
337 }
338 Ok(true)
339 }
340}
341
342#[derive(Debug)]
348pub struct FileLogChannel {
349 name: String,
350 log_path: PathBuf,
351 min_severity: Severity,
352}
353
354impl FileLogChannel {
355 pub fn new(
357 name: impl Into<String>,
358 log_path: impl Into<PathBuf>,
359 min_severity: Severity,
360 ) -> Self {
361 Self {
362 name: name.into(),
363 log_path: log_path.into(),
364 min_severity,
365 }
366 }
367
368 pub fn log_path(&self) -> &PathBuf {
370 &self.log_path
371 }
372}
373
374impl NotificationChannel for FileLogChannel {
375 fn name(&self) -> &str {
376 &self.name
377 }
378
379 fn min_severity(&self) -> Severity {
380 self.min_severity
381 }
382
383 fn send(&self, event: &NotificationEvent) -> ArchiveResult<bool> {
384 if event.severity < self.min_severity {
385 return Ok(false);
386 }
387 use std::io::Write;
388 let line = serde_json::to_string(event)
389 .map_err(|e| ArchiveError::Validation(format!("JSON error: {e}")))?;
390 let mut file = std::fs::OpenOptions::new()
391 .create(true)
392 .append(true)
393 .open(&self.log_path)?;
394 writeln!(file, "{line}")?;
395 Ok(true)
396 }
397}
398
399pub struct NotificationDispatcher {
405 channels: Vec<Box<dyn NotificationChannel>>,
406}
407
408impl NotificationDispatcher {
409 pub fn new() -> Self {
411 Self {
412 channels: Vec::new(),
413 }
414 }
415
416 pub fn add_channel(&mut self, channel: Box<dyn NotificationChannel>) {
418 self.channels.push(channel);
419 }
420
421 pub fn dispatch(&self, event: &NotificationEvent) -> Vec<(String, ArchiveResult<bool>)> {
425 self.channels
426 .iter()
427 .map(|ch| {
428 let result = ch.send(event);
429 (ch.name().to_string(), result)
430 })
431 .collect()
432 }
433
434 pub fn channel_count(&self) -> usize {
436 self.channels.len()
437 }
438
439 pub fn notify_fixity_failure(
441 &self,
442 file: PathBuf,
443 expected: &str,
444 actual: &str,
445 ) -> Vec<(String, ArchiveResult<bool>)> {
446 let event = NotificationEvent::new(
447 Severity::Error,
448 EventKind::FixityCheckFailed,
449 format!("Fixity check failed for {}", file.display()),
450 )
451 .with_files(std::iter::once(file))
452 .with_meta("expected_checksum", expected)
453 .with_meta("actual_checksum", actual);
454 self.dispatch(&event)
455 }
456
457 pub fn notify_quarantine(
459 &self,
460 file: PathBuf,
461 reason: &str,
462 ) -> Vec<(String, ArchiveResult<bool>)> {
463 let event = NotificationEvent::new(
464 Severity::Warning,
465 EventKind::FileQuarantined,
466 format!("File quarantined: {}", file.display()),
467 )
468 .with_files(std::iter::once(file))
469 .with_meta("reason", reason);
470 self.dispatch(&event)
471 }
472}
473
474impl Default for NotificationDispatcher {
475 fn default() -> Self {
476 Self::new()
477 }
478}
479
480#[derive(Debug, Clone)]
486pub struct DeliveryReport {
487 pub delivered: usize,
489 pub filtered: usize,
491 pub failed: usize,
493 pub details: Vec<ChannelResult>,
495}
496
497#[derive(Debug, Clone)]
499pub struct ChannelResult {
500 pub channel_name: String,
502 pub outcome: DeliveryOutcome,
504}
505
506#[derive(Debug, Clone, PartialEq, Eq)]
508pub enum DeliveryOutcome {
509 Delivered,
511 Filtered,
513 Failed(String),
515}
516
517impl NotificationDispatcher {
518 pub fn dispatch_with_report(&self, event: &NotificationEvent) -> DeliveryReport {
520 let results = self.dispatch(event);
521 let mut delivered = 0;
522 let mut filtered = 0;
523 let mut failed = 0;
524 let mut details = Vec::with_capacity(results.len());
525
526 for (channel_name, result) in results {
527 let outcome = match result {
528 Ok(true) => {
529 delivered += 1;
530 DeliveryOutcome::Delivered
531 }
532 Ok(false) => {
533 filtered += 1;
534 DeliveryOutcome::Filtered
535 }
536 Err(e) => {
537 failed += 1;
538 DeliveryOutcome::Failed(e.to_string())
539 }
540 };
541 details.push(ChannelResult {
542 channel_name,
543 outcome,
544 });
545 }
546
547 DeliveryReport {
548 delivered,
549 filtered,
550 failed,
551 details,
552 }
553 }
554}
555
556#[cfg(test)]
561mod tests {
562 use super::*;
563 use std::path::PathBuf;
564
565 fn make_event(severity: Severity, kind: EventKind) -> NotificationEvent {
566 NotificationEvent::new(severity, kind, "test event")
567 }
568
569 #[test]
572 fn test_severity_ordering() {
573 assert!(Severity::Info < Severity::Warning);
574 assert!(Severity::Warning < Severity::Error);
575 assert!(Severity::Error < Severity::Critical);
576 }
577
578 #[test]
579 fn test_severity_labels() {
580 assert_eq!(Severity::Info.label(), "INFO");
581 assert_eq!(Severity::Warning.label(), "WARNING");
582 assert_eq!(Severity::Error.label(), "ERROR");
583 assert_eq!(Severity::Critical.label(), "CRITICAL");
584 }
585
586 #[test]
589 fn test_event_construction_with_meta() {
590 let event = NotificationEvent::new(
591 Severity::Warning,
592 EventKind::FixityCheckFailed,
593 "checksum mismatch",
594 )
595 .with_files(vec![PathBuf::from("/archive/file.mkv")])
596 .with_meta("algorithm", "sha256");
597
598 assert_eq!(event.severity, Severity::Warning);
599 assert_eq!(event.kind, EventKind::FixityCheckFailed);
600 assert_eq!(event.affected_files.len(), 1);
601 assert_eq!(
602 event.metadata.get("algorithm").map(String::as_str),
603 Some("sha256")
604 );
605 assert!(!event.event_id.is_empty());
606 }
607
608 #[test]
609 fn test_event_kind_display() {
610 assert_eq!(
611 EventKind::FixityCheckPassed.to_string(),
612 "fixity_check_passed"
613 );
614 assert_eq!(
615 EventKind::FixityCheckFailed.to_string(),
616 "fixity_check_failed"
617 );
618 assert_eq!(EventKind::FileQuarantined.to_string(), "file_quarantined");
619 assert_eq!(EventKind::Custom("ping".into()).to_string(), "custom:ping");
620 }
621
622 #[test]
625 fn test_in_memory_channel_receives_event() {
626 let ch = InMemoryChannel::new("mem", 10, Severity::Info);
627 let event = make_event(Severity::Error, EventKind::FixityCheckFailed);
628 let result = ch.send(&event);
629 assert!(result.is_ok());
630 assert_eq!(ch.len(), 1);
631 }
632
633 #[test]
634 fn test_in_memory_channel_filters_low_severity() {
635 let ch = InMemoryChannel::new("mem", 10, Severity::Error);
636 let event = make_event(Severity::Info, EventKind::FixityCheckPassed);
637 let result = ch.send(&event).expect("send ok");
638 assert!(!result); assert!(ch.is_empty());
640 }
641
642 #[test]
643 fn test_in_memory_channel_capacity_ring() {
644 let ch = InMemoryChannel::new("mem", 3, Severity::Info);
645 for i in 0..5 {
646 let ev = NotificationEvent::new(
647 Severity::Info,
648 EventKind::IntegrityScanComplete,
649 format!("scan {i}"),
650 );
651 ch.send(&ev).expect("send");
652 }
653 assert_eq!(ch.len(), 3); }
655
656 #[test]
657 fn test_in_memory_channel_drain() {
658 let ch = InMemoryChannel::new("mem", 10, Severity::Info);
659 for _ in 0..4 {
660 ch.send(&make_event(
661 Severity::Warning,
662 EventKind::FixityCheckOverdue,
663 ))
664 .expect("send");
665 }
666 let drained = ch.drain_events();
667 assert_eq!(drained.len(), 4);
668 assert!(ch.is_empty());
669 }
670
671 #[test]
674 fn test_file_log_channel_appends_ndjson() {
675 let dir = std::env::temp_dir();
676 let log_path = dir.join("oximedia_notif_test.ndjson");
677 let _ = std::fs::remove_file(&log_path);
679
680 let ch = FileLogChannel::new("filelog", log_path.clone(), Severity::Info);
681 let event = make_event(Severity::Error, EventKind::FileQuarantined);
682 ch.send(&event).expect("send");
683
684 let content = std::fs::read_to_string(&log_path).expect("read log");
685 assert!(!content.is_empty());
686 let parsed: serde_json::Value =
688 serde_json::from_str(content.lines().next().unwrap_or("{}")).expect("valid JSON line");
689 assert!(parsed.get("event_id").is_some());
690
691 let _ = std::fs::remove_file(&log_path);
692 }
693
694 #[test]
697 fn test_webhook_payload_serialization() {
698 let config = WebhookConfig::new("hook", "https://example.com/notify");
699 let ch = WebhookChannel::new(config);
700 let event = make_event(Severity::Critical, EventKind::BatchOperationError);
701 let payload = ch.build_payload(&event).expect("payload");
702 let decoded: serde_json::Value = serde_json::from_str(&payload).expect("valid json");
703 assert_eq!(decoded["severity"].as_str(), Some("Critical"));
704 }
705
706 #[test]
707 fn test_webhook_filters_low_severity() {
708 let mut config = WebhookConfig::new("hook", "https://example.com/notify");
709 config.min_severity = Severity::Critical;
710 let ch = WebhookChannel::new(config);
711 let event = make_event(Severity::Info, EventKind::FixityCheckPassed);
712 let sent = ch.send(&event).expect("send ok");
713 assert!(!sent);
714 assert_eq!(ch.attempt_count(), 0);
715 }
716
717 #[test]
720 fn test_dispatcher_delivers_to_all_channels() {
721 let mut dispatcher = NotificationDispatcher::new();
722 let ch1 = InMemoryChannel::new("ch1", 10, Severity::Info);
723 let ch2 = InMemoryChannel::new("ch2", 10, Severity::Info);
724 let ch1_arc = Arc::new(ch1);
726 let ch2_arc = Arc::new(ch2);
727
728 struct ArcChannel(Arc<InMemoryChannel>);
730 impl NotificationChannel for ArcChannel {
731 fn name(&self) -> &str {
732 self.0.name()
733 }
734 fn min_severity(&self) -> Severity {
735 self.0.min_severity()
736 }
737 fn send(&self, event: &NotificationEvent) -> ArchiveResult<bool> {
738 self.0.send(event)
739 }
740 }
741
742 dispatcher.add_channel(Box::new(ArcChannel(Arc::clone(&ch1_arc))));
743 dispatcher.add_channel(Box::new(ArcChannel(Arc::clone(&ch2_arc))));
744
745 let event = make_event(Severity::Warning, EventKind::FixityCheckFailed);
746 let results = dispatcher.dispatch(&event);
747 assert_eq!(results.len(), 2);
748 assert_eq!(ch1_arc.len(), 1);
749 assert_eq!(ch2_arc.len(), 1);
750 }
751
752 #[test]
753 fn test_dispatcher_delivery_report() {
754 let mut dispatcher = NotificationDispatcher::new();
755 let ch_all = InMemoryChannel::new("all", 10, Severity::Info);
756 let ch_crit = InMemoryChannel::new("crit_only", 10, Severity::Critical);
757
758 struct ArcChannel(Arc<InMemoryChannel>);
759 impl NotificationChannel for ArcChannel {
760 fn name(&self) -> &str {
761 self.0.name()
762 }
763 fn min_severity(&self) -> Severity {
764 self.0.min_severity()
765 }
766 fn send(&self, event: &NotificationEvent) -> ArchiveResult<bool> {
767 self.0.send(event)
768 }
769 }
770
771 dispatcher.add_channel(Box::new(ArcChannel(Arc::new(ch_all))));
772 dispatcher.add_channel(Box::new(ArcChannel(Arc::new(ch_crit))));
773
774 let event = make_event(Severity::Warning, EventKind::FixityCheckOverdue);
775 let report = dispatcher.dispatch_with_report(&event);
776
777 assert_eq!(report.delivered, 1);
778 assert_eq!(report.filtered, 1);
779 assert_eq!(report.failed, 0);
780 }
781
782 #[test]
783 fn test_dispatcher_notify_fixity_failure_convenience() {
784 let mut dispatcher = NotificationDispatcher::new();
785 let ch = Arc::new(InMemoryChannel::new("mem", 10, Severity::Info));
786 struct ArcChannel(Arc<InMemoryChannel>);
787 impl NotificationChannel for ArcChannel {
788 fn name(&self) -> &str {
789 self.0.name()
790 }
791 fn min_severity(&self) -> Severity {
792 self.0.min_severity()
793 }
794 fn send(&self, event: &NotificationEvent) -> ArchiveResult<bool> {
795 self.0.send(event)
796 }
797 }
798 dispatcher.add_channel(Box::new(ArcChannel(Arc::clone(&ch))));
799
800 dispatcher.notify_fixity_failure(PathBuf::from("/media/film.mkv"), "abc123", "def456");
801 assert_eq!(ch.len(), 1);
802 let ev = ch.last_event().expect("event");
803 assert_eq!(ev.kind, EventKind::FixityCheckFailed);
804 assert_eq!(ev.severity, Severity::Error);
805 }
806}