use crate::config::sinks::SinkType;
use crate::events::sinks::Sink;
use crate::events::sinks::preferences::NotificationPreferencesStore;
use anyhow::{Result, anyhow};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{RwLock, broadcast};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StoredNotification {
pub id: Uuid,
pub recipient_id: String,
pub notification_type: String,
pub title: String,
pub body: String,
#[serde(default)]
pub data: Value,
pub read: bool,
pub created_at: DateTime<Utc>,
}
const MAX_PER_USER: usize = 1000;
const BROADCAST_CAPACITY: usize = 256;
#[derive(Debug)]
pub struct NotificationStore {
notifications: RwLock<HashMap<String, Vec<StoredNotification>>>,
broadcast: broadcast::Sender<StoredNotification>,
}
impl NotificationStore {
pub fn new() -> Self {
let (broadcast, _) = broadcast::channel(BROADCAST_CAPACITY);
Self {
notifications: RwLock::new(HashMap::new()),
broadcast,
}
}
pub fn subscribe(&self) -> broadcast::Receiver<StoredNotification> {
self.broadcast.subscribe()
}
pub async fn insert(&self, notification: StoredNotification) {
let _ = self.broadcast.send(notification.clone());
let mut store = self.notifications.write().await;
let user_notifs = store.entry(notification.recipient_id.clone()).or_default();
user_notifs.push(notification);
if user_notifs.len() > MAX_PER_USER {
let excess = user_notifs.len() - MAX_PER_USER;
user_notifs.drain(0..excess);
}
}
pub async fn list_by_user(
&self,
recipient_id: &str,
limit: usize,
offset: usize,
) -> Vec<StoredNotification> {
let store = self.notifications.read().await;
let Some(user_notifications) = store.get(recipient_id) else {
return Vec::new();
};
user_notifications
.iter()
.rev()
.skip(offset)
.take(limit)
.cloned()
.collect()
}
pub async fn mark_as_read(
&self,
notification_ids: &[Uuid],
recipient_id: Option<&str>,
) -> usize {
let mut store = self.notifications.write().await;
let mut count = 0;
let values: Box<dyn Iterator<Item = &mut Vec<StoredNotification>>> =
if let Some(rid) = recipient_id {
Box::new(store.get_mut(rid).into_iter())
} else {
Box::new(store.values_mut())
};
for notifications in values {
for notif in notifications.iter_mut() {
if notification_ids.contains(¬if.id) && !notif.read {
notif.read = true;
count += 1;
}
}
}
count
}
pub async fn mark_all_as_read(&self, recipient_id: &str) -> usize {
let mut store = self.notifications.write().await;
let Some(notifications) = store.get_mut(recipient_id) else {
return 0;
};
let mut count = 0;
for notif in notifications.iter_mut() {
if !notif.read {
notif.read = true;
count += 1;
}
}
count
}
pub async fn unread_count(&self, recipient_id: &str) -> usize {
let store = self.notifications.read().await;
store
.get(recipient_id)
.map(|notifs| notifs.iter().filter(|n| !n.read).count())
.unwrap_or(0)
}
pub async fn total_count(&self, recipient_id: &str) -> usize {
let store = self.notifications.read().await;
store.get(recipient_id).map(|n| n.len()).unwrap_or(0)
}
pub async fn delete(&self, notification_id: &Uuid) -> bool {
let mut store = self.notifications.write().await;
for notifications in store.values_mut() {
if let Some(pos) = notifications.iter().position(|n| n.id == *notification_id) {
notifications.remove(pos);
return true;
}
}
false
}
}
impl Default for NotificationStore {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct InAppNotificationSink {
store: Arc<NotificationStore>,
preferences: Option<Arc<NotificationPreferencesStore>>,
}
impl InAppNotificationSink {
pub fn new(store: Arc<NotificationStore>) -> Self {
Self {
store,
preferences: None,
}
}
pub fn with_preferences(
store: Arc<NotificationStore>,
preferences: Arc<NotificationPreferencesStore>,
) -> Self {
Self {
store,
preferences: Some(preferences),
}
}
pub fn store(&self) -> &Arc<NotificationStore> {
&self.store
}
}
#[async_trait]
impl Sink for InAppNotificationSink {
async fn deliver(
&self,
payload: Value,
recipient_id: Option<&str>,
context_vars: &HashMap<String, Value>,
) -> Result<()> {
let recipient =
super::resolve_recipient(recipient_id, &payload, context_vars).ok_or_else(|| {
anyhow!(
"in_app sink: recipient_id not found. \
Provide it as a parameter, in the payload, or as a context variable."
)
})?;
let title = payload
.get("title")
.and_then(|v| v.as_str())
.unwrap_or("Notification")
.to_string();
let body = payload
.get("body")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let notification_type = payload
.get("notification_type")
.and_then(|v| v.as_str())
.unwrap_or("generic")
.to_string();
let data = payload.get("data").cloned().unwrap_or(Value::Null);
if let Some(prefs_store) = &self.preferences
&& !prefs_store.is_enabled(&recipient, ¬ification_type).await
{
tracing::debug!(
recipient = %recipient,
notification_type = %notification_type,
"in_app sink: notification type disabled by user preferences, skipping"
);
return Ok(());
}
let notification = StoredNotification {
id: Uuid::new_v4(),
recipient_id: recipient,
notification_type,
title,
body,
data,
read: false,
created_at: Utc::now(),
};
self.store.insert(notification).await;
Ok(())
}
fn name(&self) -> &str {
"in_app"
}
fn sink_type(&self) -> SinkType {
SinkType::InApp
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[tokio::test]
async fn test_store_insert_and_list() {
let store = NotificationStore::new();
for i in 0..5 {
store
.insert(StoredNotification {
id: Uuid::new_v4(),
recipient_id: "user-A".to_string(),
notification_type: "new_follower".to_string(),
title: format!("Follower {}", i),
body: format!("User {} followed you", i),
data: Value::Null,
read: false,
created_at: Utc::now() + chrono::Duration::seconds(i as i64),
})
.await;
}
let page = store.list_by_user("user-A", 3, 0).await;
assert_eq!(page.len(), 3);
assert_eq!(page[0].title, "Follower 4");
assert_eq!(page[1].title, "Follower 3");
assert_eq!(page[2].title, "Follower 2");
}
#[tokio::test]
async fn test_store_pagination() {
let store = NotificationStore::new();
for i in 0..5 {
store
.insert(StoredNotification {
id: Uuid::new_v4(),
recipient_id: "user-A".to_string(),
notification_type: "test".to_string(),
title: format!("Notif {}", i),
body: String::new(),
data: Value::Null,
read: false,
created_at: Utc::now() + chrono::Duration::seconds(i as i64),
})
.await;
}
let page2 = store.list_by_user("user-A", 3, 3).await;
assert_eq!(page2.len(), 2);
}
#[tokio::test]
async fn test_store_mark_as_read() {
let store = NotificationStore::new();
let id1 = Uuid::new_v4();
let id2 = Uuid::new_v4();
let id3 = Uuid::new_v4();
for (id, i) in [(id1, 0), (id2, 1), (id3, 2)] {
store
.insert(StoredNotification {
id,
recipient_id: "user-A".to_string(),
notification_type: "test".to_string(),
title: format!("Notif {}", i),
body: String::new(),
data: Value::Null,
read: false,
created_at: Utc::now(),
})
.await;
}
assert_eq!(store.unread_count("user-A").await, 3);
let marked = store.mark_as_read(&[id1, id2], Some("user-A")).await;
assert_eq!(marked, 2);
assert_eq!(store.unread_count("user-A").await, 1);
}
#[tokio::test]
async fn test_store_mark_all_as_read() {
let store = NotificationStore::new();
for i in 0..5 {
store
.insert(StoredNotification {
id: Uuid::new_v4(),
recipient_id: "user-A".to_string(),
notification_type: "test".to_string(),
title: format!("Notif {}", i),
body: String::new(),
data: Value::Null,
read: false,
created_at: Utc::now(),
})
.await;
}
assert_eq!(store.unread_count("user-A").await, 5);
let marked = store.mark_all_as_read("user-A").await;
assert_eq!(marked, 5);
assert_eq!(store.unread_count("user-A").await, 0);
}
#[tokio::test]
async fn test_store_separate_users() {
let store = NotificationStore::new();
store
.insert(StoredNotification {
id: Uuid::new_v4(),
recipient_id: "user-A".to_string(),
notification_type: "test".to_string(),
title: "For A".to_string(),
body: String::new(),
data: Value::Null,
read: false,
created_at: Utc::now(),
})
.await;
store
.insert(StoredNotification {
id: Uuid::new_v4(),
recipient_id: "user-B".to_string(),
notification_type: "test".to_string(),
title: "For B".to_string(),
body: String::new(),
data: Value::Null,
read: false,
created_at: Utc::now(),
})
.await;
assert_eq!(store.unread_count("user-A").await, 1);
assert_eq!(store.unread_count("user-B").await, 1);
assert_eq!(store.total_count("user-A").await, 1);
}
#[tokio::test]
async fn test_store_delete() {
let store = NotificationStore::new();
let id = Uuid::new_v4();
store
.insert(StoredNotification {
id,
recipient_id: "user-A".to_string(),
notification_type: "test".to_string(),
title: "Will be deleted".to_string(),
body: String::new(),
data: Value::Null,
read: false,
created_at: Utc::now(),
})
.await;
assert_eq!(store.total_count("user-A").await, 1);
assert!(store.delete(&id).await);
assert_eq!(store.total_count("user-A").await, 0);
assert!(!store.delete(&id).await); }
#[tokio::test]
async fn test_store_empty_user() {
let store = NotificationStore::new();
assert_eq!(store.unread_count("nobody").await, 0);
assert_eq!(store.list_by_user("nobody", 10, 0).await.len(), 0);
}
#[tokio::test]
async fn test_sink_deliver_from_payload() {
let store = Arc::new(NotificationStore::new());
let sink = InAppNotificationSink::new(store.clone());
let payload = json!({
"title": "New follower",
"body": "Alice followed you",
"notification_type": "new_follower",
"recipient_id": "user-A",
"data": {"follower_name": "Alice"}
});
sink.deliver(payload, None, &HashMap::new()).await.unwrap();
let notifs = store.list_by_user("user-A", 10, 0).await;
assert_eq!(notifs.len(), 1);
assert_eq!(notifs[0].title, "New follower");
assert_eq!(notifs[0].body, "Alice followed you");
assert_eq!(notifs[0].notification_type, "new_follower");
assert!(!notifs[0].read);
assert_eq!(notifs[0].data, json!({"follower_name": "Alice"}));
}
#[tokio::test]
async fn test_sink_deliver_explicit_recipient() {
let store = Arc::new(NotificationStore::new());
let sink = InAppNotificationSink::new(store.clone());
let payload = json!({
"title": "Hello",
"body": "World",
"notification_type": "test"
});
sink.deliver(payload, Some("user-B"), &HashMap::new())
.await
.unwrap();
assert_eq!(store.unread_count("user-B").await, 1);
}
#[tokio::test]
async fn test_sink_deliver_recipient_from_context() {
let store = Arc::new(NotificationStore::new());
let sink = InAppNotificationSink::new(store.clone());
let payload = json!({
"title": "Hello",
"notification_type": "test"
});
let mut vars = HashMap::new();
vars.insert(
"recipient_id".to_string(),
Value::String("user-C".to_string()),
);
sink.deliver(payload, None, &vars).await.unwrap();
assert_eq!(store.unread_count("user-C").await, 1);
}
#[tokio::test]
async fn test_sink_deliver_no_recipient_error() {
let store = Arc::new(NotificationStore::new());
let sink = InAppNotificationSink::new(store);
let payload = json!({
"title": "Hello",
"notification_type": "test"
});
let result = sink.deliver(payload, None, &HashMap::new()).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("recipient_id"));
}
#[tokio::test]
async fn test_sink_deliver_defaults() {
let store = Arc::new(NotificationStore::new());
let sink = InAppNotificationSink::new(store.clone());
let payload = json!({
"recipient_id": "user-A"
});
sink.deliver(payload, None, &HashMap::new()).await.unwrap();
let notifs = store.list_by_user("user-A", 10, 0).await;
assert_eq!(notifs[0].title, "Notification");
assert_eq!(notifs[0].body, "");
assert_eq!(notifs[0].notification_type, "generic");
}
#[tokio::test]
async fn test_sink_name_and_type() {
let sink = InAppNotificationSink::new(Arc::new(NotificationStore::new()));
assert_eq!(sink.name(), "in_app");
assert_eq!(sink.sink_type(), SinkType::InApp);
}
#[tokio::test]
async fn test_sink_with_preferences_disabled_type_skipped() {
let store = Arc::new(NotificationStore::new());
let prefs = Arc::new(NotificationPreferencesStore::new());
prefs.disable_type("user-A", "new_like").await;
let sink = InAppNotificationSink::with_preferences(store.clone(), prefs);
let payload = json!({
"title": "New like",
"notification_type": "new_like",
"recipient_id": "user-A"
});
sink.deliver(payload, None, &HashMap::new()).await.unwrap();
assert_eq!(store.unread_count("user-A").await, 0);
let payload = json!({
"title": "New follower",
"notification_type": "new_follower",
"recipient_id": "user-A"
});
sink.deliver(payload, None, &HashMap::new()).await.unwrap();
assert_eq!(store.unread_count("user-A").await, 1);
}
#[tokio::test]
async fn test_sink_with_preferences_muted_user() {
let store = Arc::new(NotificationStore::new());
let prefs = Arc::new(NotificationPreferencesStore::new());
prefs.mute("user-A").await;
let sink = InAppNotificationSink::with_preferences(store.clone(), prefs);
for notif_type in &["new_follower", "new_like", "new_comment"] {
let payload = json!({
"title": "Test",
"notification_type": notif_type,
"recipient_id": "user-A"
});
sink.deliver(payload, None, &HashMap::new()).await.unwrap();
}
assert_eq!(store.unread_count("user-A").await, 0);
}
#[tokio::test]
async fn test_sink_without_preferences_delivers_all() {
let store = Arc::new(NotificationStore::new());
let sink = InAppNotificationSink::new(store.clone());
for notif_type in &["new_follower", "new_like", "new_comment"] {
let payload = json!({
"title": "Test",
"notification_type": notif_type,
"recipient_id": "user-A"
});
sink.deliver(payload, None, &HashMap::new()).await.unwrap();
}
assert_eq!(store.unread_count("user-A").await, 3);
}
#[tokio::test]
async fn test_store_evicts_oldest_beyond_max() {
let store = NotificationStore::new();
let total = MAX_PER_USER + 50;
for i in 0..total {
store
.insert(StoredNotification {
id: Uuid::new_v4(),
recipient_id: "user-A".to_string(),
notification_type: "test".to_string(),
title: format!("Notif {}", i),
body: String::new(),
data: Value::Null,
read: false,
created_at: Utc::now() + chrono::Duration::seconds(i as i64),
})
.await;
}
assert_eq!(store.total_count("user-A").await, MAX_PER_USER);
let latest = store.list_by_user("user-A", 1, 0).await;
assert_eq!(latest[0].title, format!("Notif {}", total - 1));
let oldest = store.list_by_user("user-A", 1, MAX_PER_USER - 1).await;
assert_eq!(oldest[0].title, "Notif 50");
}
#[tokio::test]
async fn test_mark_as_read_scoped_to_recipient() {
let store = NotificationStore::new();
let id_a = Uuid::new_v4();
let id_b = Uuid::new_v4();
store
.insert(StoredNotification {
id: id_a,
recipient_id: "user-A".to_string(),
notification_type: "test".to_string(),
title: "For A".to_string(),
body: String::new(),
data: Value::Null,
read: false,
created_at: Utc::now(),
})
.await;
store
.insert(StoredNotification {
id: id_b,
recipient_id: "user-B".to_string(),
notification_type: "test".to_string(),
title: "For B".to_string(),
body: String::new(),
data: Value::Null,
read: false,
created_at: Utc::now(),
})
.await;
let marked = store.mark_as_read(&[id_b], Some("user-A")).await;
assert_eq!(marked, 0);
assert_eq!(store.unread_count("user-B").await, 1);
let marked = store.mark_as_read(&[id_a], Some("user-A")).await;
assert_eq!(marked, 1);
assert_eq!(store.unread_count("user-A").await, 0);
}
#[tokio::test]
async fn test_mark_as_read_global_fallback() {
let store = NotificationStore::new();
let id = Uuid::new_v4();
store
.insert(StoredNotification {
id,
recipient_id: "user-A".to_string(),
notification_type: "test".to_string(),
title: "Test".to_string(),
body: String::new(),
data: Value::Null,
read: false,
created_at: Utc::now(),
})
.await;
let marked = store.mark_as_read(&[id], None).await;
assert_eq!(marked, 1);
assert_eq!(store.unread_count("user-A").await, 0);
}
#[tokio::test]
async fn test_notification_broadcast_on_insert() {
let store = NotificationStore::new();
let mut rx = store.subscribe();
let notif_id = Uuid::new_v4();
store
.insert(StoredNotification {
id: notif_id,
recipient_id: "user-A".to_string(),
notification_type: "new_follower".to_string(),
title: "New follower".to_string(),
body: "Alice followed you".to_string(),
data: json!({"follower_name": "Alice"}),
read: false,
created_at: Utc::now(),
})
.await;
let received = rx.recv().await.expect("should receive broadcast");
assert_eq!(received.id, notif_id);
assert_eq!(received.recipient_id, "user-A");
assert_eq!(received.notification_type, "new_follower");
assert_eq!(received.title, "New follower");
assert_eq!(store.total_count("user-A").await, 1);
}
#[tokio::test]
async fn test_broadcast_without_subscriber() {
let store = NotificationStore::new();
store
.insert(StoredNotification {
id: Uuid::new_v4(),
recipient_id: "user-A".to_string(),
notification_type: "test".to_string(),
title: "No one listening".to_string(),
body: String::new(),
data: Value::Null,
read: false,
created_at: Utc::now(),
})
.await;
assert_eq!(store.total_count("user-A").await, 1);
}
#[tokio::test]
async fn test_broadcast_multiple_subscribers() {
let store = NotificationStore::new();
let mut rx1 = store.subscribe();
let mut rx2 = store.subscribe();
let notif_id = Uuid::new_v4();
store
.insert(StoredNotification {
id: notif_id,
recipient_id: "user-A".to_string(),
notification_type: "test".to_string(),
title: "For everyone".to_string(),
body: String::new(),
data: Value::Null,
read: false,
created_at: Utc::now(),
})
.await;
let r1 = rx1.recv().await.expect("rx1 should receive");
let r2 = rx2.recv().await.expect("rx2 should receive");
assert_eq!(r1.id, notif_id);
assert_eq!(r2.id, notif_id);
}
}