use async_trait::async_trait;
use chrono::{DateTime, Utc};
use paladin_ports::output::notification_port::{
BasicNotificationPort, DeliveryCapabilities, Notification, NotificationChannel,
NotificationDeliveryPort, NotificationDeliveryResult, NotificationPortError,
NotificationPortResult, NotificationRecipient, NotificationStatus,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::Instant;
#[doc(hidden)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemAdapterConfig {
pub max_stored_notifications: usize,
pub cleanup_interval_seconds: u64,
}
impl Default for SystemAdapterConfig {
fn default() -> Self {
Self {
max_stored_notifications: 1000,
cleanup_interval_seconds: 3600, }
}
}
#[doc(hidden)]
#[derive(Debug)]
pub struct SystemNotificationAdapter {
config: SystemAdapterConfig,
notifications: Arc<RwLock<Vec<Notification>>>,
delivery_stats: Arc<RwLock<SystemDeliveryStats>>,
}
#[derive(Debug, Default)]
struct SystemDeliveryStats {
total_delivered: u64,
last_delivery: Option<DateTime<Utc>>,
}
impl SystemNotificationAdapter {
pub fn new(config: SystemAdapterConfig) -> Self {
Self {
config,
notifications: Arc::new(RwLock::new(Vec::new())),
delivery_stats: Arc::new(RwLock::new(SystemDeliveryStats::default())),
}
}
pub fn get_notifications_for_recipient(
&self,
recipient: &NotificationRecipient,
) -> NotificationPortResult<Vec<Notification>> {
let notifications = self.notifications.read().map_err(|_| {
NotificationPortError::ServiceUnavailable(
"Notification storage unavailable".to_string(),
)
})?;
let filtered: Vec<Notification> = notifications
.iter()
.filter(|n| &n.recipient == recipient)
.cloned()
.collect();
Ok(filtered)
}
pub fn cleanup_old_notifications(&self) -> NotificationPortResult<usize> {
let mut notifications = self.notifications.write().map_err(|_| {
NotificationPortError::ServiceUnavailable(
"Notification storage unavailable".to_string(),
)
})?;
let before_count = notifications.len();
if notifications.len() > self.config.max_stored_notifications {
notifications.sort_by_key(|b| std::cmp::Reverse(b.created_at));
notifications.truncate(self.config.max_stored_notifications);
}
let after_count = notifications.len();
Ok(before_count - after_count)
}
}
#[async_trait]
impl NotificationDeliveryPort for SystemNotificationAdapter {
fn channel(&self) -> NotificationChannel {
NotificationChannel::System
}
fn can_handle(&self, notification: &Notification) -> bool {
notification.channel == NotificationChannel::System
}
async fn deliver_notification(
&self,
mut notification: Notification,
) -> NotificationPortResult<NotificationDeliveryResult> {
let start_time = Instant::now();
if !self.can_handle(¬ification) {
return Err(NotificationPortError::ValidationError(
"System adapter cannot handle this notification".to_string(),
));
}
notification.status = NotificationStatus::Delivered;
{
let mut notifications = self.notifications.write().map_err(|_| {
NotificationPortError::ServiceUnavailable(
"Notification storage unavailable".to_string(),
)
})?;
notifications.push(notification.clone());
}
{
let mut stats = self.delivery_stats.write().map_err(|_| {
NotificationPortError::ServiceUnavailable("Stats unavailable".to_string())
})?;
stats.total_delivered += 1;
stats.last_delivery = Some(Utc::now());
}
let processing_time = start_time.elapsed().as_millis() as u64;
let mut metadata = HashMap::new();
metadata.insert(
"delivery_method".to_string(),
serde_json::Value::String("in_memory".to_string()),
);
metadata.insert(
"stored_at".to_string(),
serde_json::Value::String(Utc::now().to_rfc3339()),
);
Ok(NotificationDeliveryResult {
notification_id: notification.id,
status: NotificationStatus::Delivered,
external_id: Some(notification.id.to_string()),
processing_time_ms: processing_time,
error_message: None,
delivered_at: Utc::now(),
channel: NotificationChannel::System,
metadata,
})
}
async fn health_check(&self) -> bool {
self.notifications.read().is_ok() && self.delivery_stats.read().is_ok()
}
fn capabilities(&self) -> DeliveryCapabilities {
DeliveryCapabilities {
supports_bulk: true,
supports_receipts: false,
supports_attachments: false, supports_rich_content: true,
supports_templates: false, max_attachment_size: None,
rate_limit: None, }
}
}
#[async_trait]
impl BasicNotificationPort for SystemNotificationAdapter {}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::platform::container::notification::{
NotificationContent, NotificationPriority,
};
fn create_test_system_notification() -> Notification {
let content = NotificationContent::new(
"System Alert".to_string(),
"This is a test system notification".to_string(),
"system".to_string(),
);
Notification::new(
NotificationRecipient::SystemComponent("user123".to_string()),
content,
NotificationChannel::System,
NotificationPriority::High,
)
.unwrap()
}
#[test]
fn test_system_adapter_creation() {
let config = SystemAdapterConfig::default();
let adapter = SystemNotificationAdapter::new(config);
assert_eq!(adapter.channel(), NotificationChannel::System);
}
#[test]
fn test_can_handle_notification() {
let config = SystemAdapterConfig::default();
let adapter = SystemNotificationAdapter::new(config);
let notification = create_test_system_notification();
assert!(adapter.can_handle(¬ification));
}
#[tokio::test]
async fn test_deliver_system_notification() {
let config = SystemAdapterConfig::default();
let adapter = SystemNotificationAdapter::new(config);
let notification = create_test_system_notification();
let recipient = notification.recipient.clone();
let result = adapter.deliver_notification(notification).await;
assert!(result.is_ok());
let delivery_result = result.unwrap();
assert_eq!(delivery_result.status, NotificationStatus::Delivered);
assert_eq!(delivery_result.channel, NotificationChannel::System);
let stored_notifications = adapter.get_notifications_for_recipient(&recipient).unwrap();
assert_eq!(stored_notifications.len(), 1);
assert_eq!(
stored_notifications[0].status,
NotificationStatus::Delivered
);
}
#[tokio::test]
async fn test_health_check() {
let config = SystemAdapterConfig::default();
let adapter = SystemNotificationAdapter::new(config);
let is_healthy = adapter.health_check().await;
assert!(is_healthy);
}
#[test]
fn test_capabilities() {
let config = SystemAdapterConfig::default();
let adapter = SystemNotificationAdapter::new(config);
let capabilities = adapter.capabilities();
assert!(capabilities.supports_bulk);
assert!(capabilities.supports_rich_content);
assert!(!capabilities.supports_attachments);
assert!(!capabilities.supports_receipts);
assert!(capabilities.rate_limit.is_none());
}
#[test]
fn test_cleanup_old_notifications() {
let config = SystemAdapterConfig {
max_stored_notifications: 2,
cleanup_interval_seconds: 3600,
};
let adapter = SystemNotificationAdapter::new(config);
{
let mut notifications = adapter.notifications.write().unwrap();
for _i in 0..5 {
let mut notification = create_test_system_notification();
notification.id = uuid::Uuid::new_v4();
notifications.push(notification);
}
}
let cleaned_count = adapter.cleanup_old_notifications().unwrap();
assert_eq!(cleaned_count, 3);
let remaining = adapter.notifications.read().unwrap();
assert_eq!(remaining.len(), 2);
}
}