1use crate::config::sinks::SinkType;
28use crate::events::sinks::Sink;
29use crate::events::sinks::preferences::NotificationPreferencesStore;
30use anyhow::{Result, anyhow};
31use async_trait::async_trait;
32use chrono::{DateTime, Utc};
33use serde::{Deserialize, Serialize};
34use serde_json::Value;
35use std::collections::HashMap;
36use std::sync::Arc;
37use tokio::sync::{RwLock, broadcast};
38use uuid::Uuid;
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct StoredNotification {
43 pub id: Uuid,
45
46 pub recipient_id: String,
48
49 pub notification_type: String,
51
52 pub title: String,
54
55 pub body: String,
57
58 #[serde(default)]
60 pub data: Value,
61
62 pub read: bool,
64
65 pub created_at: DateTime<Utc>,
67}
68
69const MAX_PER_USER: usize = 1000;
71
72const BROADCAST_CAPACITY: usize = 256;
77
78#[derive(Debug)]
93pub struct NotificationStore {
94 notifications: RwLock<HashMap<String, Vec<StoredNotification>>>,
96
97 broadcast: broadcast::Sender<StoredNotification>,
100}
101
102impl NotificationStore {
103 pub fn new() -> Self {
105 let (broadcast, _) = broadcast::channel(BROADCAST_CAPACITY);
106 Self {
107 notifications: RwLock::new(HashMap::new()),
108 broadcast,
109 }
110 }
111
112 pub fn subscribe(&self) -> broadcast::Receiver<StoredNotification> {
120 self.broadcast.subscribe()
121 }
122
123 pub async fn insert(&self, notification: StoredNotification) {
131 let _ = self.broadcast.send(notification.clone());
133
134 let mut store = self.notifications.write().await;
135 let user_notifs = store.entry(notification.recipient_id.clone()).or_default();
136 user_notifs.push(notification);
137
138 if user_notifs.len() > MAX_PER_USER {
140 let excess = user_notifs.len() - MAX_PER_USER;
141 user_notifs.drain(0..excess);
142 }
143 }
144
145 pub async fn list_by_user(
150 &self,
151 recipient_id: &str,
152 limit: usize,
153 offset: usize,
154 ) -> Vec<StoredNotification> {
155 let store = self.notifications.read().await;
156 let Some(user_notifications) = store.get(recipient_id) else {
157 return Vec::new();
158 };
159
160 user_notifications
162 .iter()
163 .rev()
164 .skip(offset)
165 .take(limit)
166 .cloned()
167 .collect()
168 }
169
170 pub async fn mark_as_read(
177 &self,
178 notification_ids: &[Uuid],
179 recipient_id: Option<&str>,
180 ) -> usize {
181 let mut store = self.notifications.write().await;
182 let mut count = 0;
183
184 let values: Box<dyn Iterator<Item = &mut Vec<StoredNotification>>> =
185 if let Some(rid) = recipient_id {
186 Box::new(store.get_mut(rid).into_iter())
188 } else {
189 Box::new(store.values_mut())
191 };
192
193 for notifications in values {
194 for notif in notifications.iter_mut() {
195 if notification_ids.contains(¬if.id) && !notif.read {
196 notif.read = true;
197 count += 1;
198 }
199 }
200 }
201
202 count
203 }
204
205 pub async fn mark_all_as_read(&self, recipient_id: &str) -> usize {
207 let mut store = self.notifications.write().await;
208 let Some(notifications) = store.get_mut(recipient_id) else {
209 return 0;
210 };
211
212 let mut count = 0;
213 for notif in notifications.iter_mut() {
214 if !notif.read {
215 notif.read = true;
216 count += 1;
217 }
218 }
219 count
220 }
221
222 pub async fn unread_count(&self, recipient_id: &str) -> usize {
224 let store = self.notifications.read().await;
225 store
226 .get(recipient_id)
227 .map(|notifs| notifs.iter().filter(|n| !n.read).count())
228 .unwrap_or(0)
229 }
230
231 pub async fn total_count(&self, recipient_id: &str) -> usize {
233 let store = self.notifications.read().await;
234 store.get(recipient_id).map(|n| n.len()).unwrap_or(0)
235 }
236
237 pub async fn delete(&self, notification_id: &Uuid) -> bool {
239 let mut store = self.notifications.write().await;
240 for notifications in store.values_mut() {
241 if let Some(pos) = notifications.iter().position(|n| n.id == *notification_id) {
242 notifications.remove(pos);
243 return true;
244 }
245 }
246 false
247 }
248}
249
250impl Default for NotificationStore {
251 fn default() -> Self {
252 Self::new()
253 }
254}
255
256#[derive(Debug)]
263pub struct InAppNotificationSink {
264 store: Arc<NotificationStore>,
266
267 preferences: Option<Arc<NotificationPreferencesStore>>,
269}
270
271impl InAppNotificationSink {
272 pub fn new(store: Arc<NotificationStore>) -> Self {
274 Self {
275 store,
276 preferences: None,
277 }
278 }
279
280 pub fn with_preferences(
282 store: Arc<NotificationStore>,
283 preferences: Arc<NotificationPreferencesStore>,
284 ) -> Self {
285 Self {
286 store,
287 preferences: Some(preferences),
288 }
289 }
290
291 pub fn store(&self) -> &Arc<NotificationStore> {
293 &self.store
294 }
295}
296
297#[async_trait]
298impl Sink for InAppNotificationSink {
299 async fn deliver(
300 &self,
301 payload: Value,
302 recipient_id: Option<&str>,
303 context_vars: &HashMap<String, Value>,
304 ) -> Result<()> {
305 let recipient =
307 super::resolve_recipient(recipient_id, &payload, context_vars).ok_or_else(|| {
308 anyhow!(
309 "in_app sink: recipient_id not found. \
310 Provide it as a parameter, in the payload, or as a context variable."
311 )
312 })?;
313
314 let title = payload
316 .get("title")
317 .and_then(|v| v.as_str())
318 .unwrap_or("Notification")
319 .to_string();
320
321 let body = payload
322 .get("body")
323 .and_then(|v| v.as_str())
324 .unwrap_or("")
325 .to_string();
326
327 let notification_type = payload
328 .get("notification_type")
329 .and_then(|v| v.as_str())
330 .unwrap_or("generic")
331 .to_string();
332
333 let data = payload.get("data").cloned().unwrap_or(Value::Null);
334
335 if let Some(prefs_store) = &self.preferences
337 && !prefs_store.is_enabled(&recipient, ¬ification_type).await
338 {
339 tracing::debug!(
340 recipient = %recipient,
341 notification_type = %notification_type,
342 "in_app sink: notification type disabled by user preferences, skipping"
343 );
344 return Ok(());
345 }
346
347 let notification = StoredNotification {
349 id: Uuid::new_v4(),
350 recipient_id: recipient,
351 notification_type,
352 title,
353 body,
354 data,
355 read: false,
356 created_at: Utc::now(),
357 };
358
359 self.store.insert(notification).await;
360 Ok(())
361 }
362
363 fn name(&self) -> &str {
364 "in_app"
365 }
366
367 fn sink_type(&self) -> SinkType {
368 SinkType::InApp
369 }
370}
371
372#[cfg(test)]
373mod tests {
374 use super::*;
375 use serde_json::json;
376
377 #[tokio::test]
378 async fn test_store_insert_and_list() {
379 let store = NotificationStore::new();
380
381 for i in 0..5 {
382 store
383 .insert(StoredNotification {
384 id: Uuid::new_v4(),
385 recipient_id: "user-A".to_string(),
386 notification_type: "new_follower".to_string(),
387 title: format!("Follower {}", i),
388 body: format!("User {} followed you", i),
389 data: Value::Null,
390 read: false,
391 created_at: Utc::now() + chrono::Duration::seconds(i as i64),
392 })
393 .await;
394 }
395
396 let page = store.list_by_user("user-A", 3, 0).await;
398 assert_eq!(page.len(), 3);
399 assert_eq!(page[0].title, "Follower 4");
401 assert_eq!(page[1].title, "Follower 3");
402 assert_eq!(page[2].title, "Follower 2");
403 }
404
405 #[tokio::test]
406 async fn test_store_pagination() {
407 let store = NotificationStore::new();
408
409 for i in 0..5 {
410 store
411 .insert(StoredNotification {
412 id: Uuid::new_v4(),
413 recipient_id: "user-A".to_string(),
414 notification_type: "test".to_string(),
415 title: format!("Notif {}", i),
416 body: String::new(),
417 data: Value::Null,
418 read: false,
419 created_at: Utc::now() + chrono::Duration::seconds(i as i64),
420 })
421 .await;
422 }
423
424 let page2 = store.list_by_user("user-A", 3, 3).await;
426 assert_eq!(page2.len(), 2);
427 }
428
429 #[tokio::test]
430 async fn test_store_mark_as_read() {
431 let store = NotificationStore::new();
432 let id1 = Uuid::new_v4();
433 let id2 = Uuid::new_v4();
434 let id3 = Uuid::new_v4();
435
436 for (id, i) in [(id1, 0), (id2, 1), (id3, 2)] {
437 store
438 .insert(StoredNotification {
439 id,
440 recipient_id: "user-A".to_string(),
441 notification_type: "test".to_string(),
442 title: format!("Notif {}", i),
443 body: String::new(),
444 data: Value::Null,
445 read: false,
446 created_at: Utc::now(),
447 })
448 .await;
449 }
450
451 assert_eq!(store.unread_count("user-A").await, 3);
452
453 let marked = store.mark_as_read(&[id1, id2], Some("user-A")).await;
455 assert_eq!(marked, 2);
456 assert_eq!(store.unread_count("user-A").await, 1);
457 }
458
459 #[tokio::test]
460 async fn test_store_mark_all_as_read() {
461 let store = NotificationStore::new();
462
463 for i in 0..5 {
464 store
465 .insert(StoredNotification {
466 id: Uuid::new_v4(),
467 recipient_id: "user-A".to_string(),
468 notification_type: "test".to_string(),
469 title: format!("Notif {}", i),
470 body: String::new(),
471 data: Value::Null,
472 read: false,
473 created_at: Utc::now(),
474 })
475 .await;
476 }
477
478 assert_eq!(store.unread_count("user-A").await, 5);
479
480 let marked = store.mark_all_as_read("user-A").await;
481 assert_eq!(marked, 5);
482 assert_eq!(store.unread_count("user-A").await, 0);
483 }
484
485 #[tokio::test]
486 async fn test_store_separate_users() {
487 let store = NotificationStore::new();
488
489 store
490 .insert(StoredNotification {
491 id: Uuid::new_v4(),
492 recipient_id: "user-A".to_string(),
493 notification_type: "test".to_string(),
494 title: "For A".to_string(),
495 body: String::new(),
496 data: Value::Null,
497 read: false,
498 created_at: Utc::now(),
499 })
500 .await;
501
502 store
503 .insert(StoredNotification {
504 id: Uuid::new_v4(),
505 recipient_id: "user-B".to_string(),
506 notification_type: "test".to_string(),
507 title: "For B".to_string(),
508 body: String::new(),
509 data: Value::Null,
510 read: false,
511 created_at: Utc::now(),
512 })
513 .await;
514
515 assert_eq!(store.unread_count("user-A").await, 1);
516 assert_eq!(store.unread_count("user-B").await, 1);
517 assert_eq!(store.total_count("user-A").await, 1);
518 }
519
520 #[tokio::test]
521 async fn test_store_delete() {
522 let store = NotificationStore::new();
523 let id = Uuid::new_v4();
524
525 store
526 .insert(StoredNotification {
527 id,
528 recipient_id: "user-A".to_string(),
529 notification_type: "test".to_string(),
530 title: "Will be deleted".to_string(),
531 body: String::new(),
532 data: Value::Null,
533 read: false,
534 created_at: Utc::now(),
535 })
536 .await;
537
538 assert_eq!(store.total_count("user-A").await, 1);
539 assert!(store.delete(&id).await);
540 assert_eq!(store.total_count("user-A").await, 0);
541 assert!(!store.delete(&id).await); }
543
544 #[tokio::test]
545 async fn test_store_empty_user() {
546 let store = NotificationStore::new();
547 assert_eq!(store.unread_count("nobody").await, 0);
548 assert_eq!(store.list_by_user("nobody", 10, 0).await.len(), 0);
549 }
550
551 #[tokio::test]
554 async fn test_sink_deliver_from_payload() {
555 let store = Arc::new(NotificationStore::new());
556 let sink = InAppNotificationSink::new(store.clone());
557
558 let payload = json!({
559 "title": "New follower",
560 "body": "Alice followed you",
561 "notification_type": "new_follower",
562 "recipient_id": "user-A",
563 "data": {"follower_name": "Alice"}
564 });
565
566 sink.deliver(payload, None, &HashMap::new()).await.unwrap();
567
568 let notifs = store.list_by_user("user-A", 10, 0).await;
569 assert_eq!(notifs.len(), 1);
570 assert_eq!(notifs[0].title, "New follower");
571 assert_eq!(notifs[0].body, "Alice followed you");
572 assert_eq!(notifs[0].notification_type, "new_follower");
573 assert!(!notifs[0].read);
574 assert_eq!(notifs[0].data, json!({"follower_name": "Alice"}));
575 }
576
577 #[tokio::test]
578 async fn test_sink_deliver_explicit_recipient() {
579 let store = Arc::new(NotificationStore::new());
580 let sink = InAppNotificationSink::new(store.clone());
581
582 let payload = json!({
583 "title": "Hello",
584 "body": "World",
585 "notification_type": "test"
586 });
587
588 sink.deliver(payload, Some("user-B"), &HashMap::new())
590 .await
591 .unwrap();
592
593 assert_eq!(store.unread_count("user-B").await, 1);
594 }
595
596 #[tokio::test]
597 async fn test_sink_deliver_recipient_from_context() {
598 let store = Arc::new(NotificationStore::new());
599 let sink = InAppNotificationSink::new(store.clone());
600
601 let payload = json!({
602 "title": "Hello",
603 "notification_type": "test"
604 });
605
606 let mut vars = HashMap::new();
607 vars.insert(
608 "recipient_id".to_string(),
609 Value::String("user-C".to_string()),
610 );
611
612 sink.deliver(payload, None, &vars).await.unwrap();
613 assert_eq!(store.unread_count("user-C").await, 1);
614 }
615
616 #[tokio::test]
617 async fn test_sink_deliver_no_recipient_error() {
618 let store = Arc::new(NotificationStore::new());
619 let sink = InAppNotificationSink::new(store);
620
621 let payload = json!({
622 "title": "Hello",
623 "notification_type": "test"
624 });
625
626 let result = sink.deliver(payload, None, &HashMap::new()).await;
627 assert!(result.is_err());
628 assert!(result.unwrap_err().to_string().contains("recipient_id"));
629 }
630
631 #[tokio::test]
632 async fn test_sink_deliver_defaults() {
633 let store = Arc::new(NotificationStore::new());
634 let sink = InAppNotificationSink::new(store.clone());
635
636 let payload = json!({
638 "recipient_id": "user-A"
639 });
640
641 sink.deliver(payload, None, &HashMap::new()).await.unwrap();
642
643 let notifs = store.list_by_user("user-A", 10, 0).await;
644 assert_eq!(notifs[0].title, "Notification");
645 assert_eq!(notifs[0].body, "");
646 assert_eq!(notifs[0].notification_type, "generic");
647 }
648
649 #[tokio::test]
650 async fn test_sink_name_and_type() {
651 let sink = InAppNotificationSink::new(Arc::new(NotificationStore::new()));
652 assert_eq!(sink.name(), "in_app");
653 assert_eq!(sink.sink_type(), SinkType::InApp);
654 }
655
656 #[tokio::test]
659 async fn test_sink_with_preferences_disabled_type_skipped() {
660 let store = Arc::new(NotificationStore::new());
661 let prefs = Arc::new(NotificationPreferencesStore::new());
662 prefs.disable_type("user-A", "new_like").await;
663
664 let sink = InAppNotificationSink::with_preferences(store.clone(), prefs);
665
666 let payload = json!({
668 "title": "New like",
669 "notification_type": "new_like",
670 "recipient_id": "user-A"
671 });
672 sink.deliver(payload, None, &HashMap::new()).await.unwrap();
673 assert_eq!(store.unread_count("user-A").await, 0);
674
675 let payload = json!({
677 "title": "New follower",
678 "notification_type": "new_follower",
679 "recipient_id": "user-A"
680 });
681 sink.deliver(payload, None, &HashMap::new()).await.unwrap();
682 assert_eq!(store.unread_count("user-A").await, 1);
683 }
684
685 #[tokio::test]
686 async fn test_sink_with_preferences_muted_user() {
687 let store = Arc::new(NotificationStore::new());
688 let prefs = Arc::new(NotificationPreferencesStore::new());
689 prefs.mute("user-A").await;
690
691 let sink = InAppNotificationSink::with_preferences(store.clone(), prefs);
692
693 for notif_type in &["new_follower", "new_like", "new_comment"] {
695 let payload = json!({
696 "title": "Test",
697 "notification_type": notif_type,
698 "recipient_id": "user-A"
699 });
700 sink.deliver(payload, None, &HashMap::new()).await.unwrap();
701 }
702
703 assert_eq!(store.unread_count("user-A").await, 0);
704 }
705
706 #[tokio::test]
707 async fn test_sink_without_preferences_delivers_all() {
708 let store = Arc::new(NotificationStore::new());
709 let sink = InAppNotificationSink::new(store.clone());
711
712 for notif_type in &["new_follower", "new_like", "new_comment"] {
713 let payload = json!({
714 "title": "Test",
715 "notification_type": notif_type,
716 "recipient_id": "user-A"
717 });
718 sink.deliver(payload, None, &HashMap::new()).await.unwrap();
719 }
720
721 assert_eq!(store.unread_count("user-A").await, 3);
722 }
723
724 #[tokio::test]
727 async fn test_store_evicts_oldest_beyond_max() {
728 let store = NotificationStore::new();
729
730 let total = MAX_PER_USER + 50;
732 for i in 0..total {
733 store
734 .insert(StoredNotification {
735 id: Uuid::new_v4(),
736 recipient_id: "user-A".to_string(),
737 notification_type: "test".to_string(),
738 title: format!("Notif {}", i),
739 body: String::new(),
740 data: Value::Null,
741 read: false,
742 created_at: Utc::now() + chrono::Duration::seconds(i as i64),
743 })
744 .await;
745 }
746
747 assert_eq!(store.total_count("user-A").await, MAX_PER_USER);
749
750 let latest = store.list_by_user("user-A", 1, 0).await;
752 assert_eq!(latest[0].title, format!("Notif {}", total - 1));
753
754 let oldest = store.list_by_user("user-A", 1, MAX_PER_USER - 1).await;
756 assert_eq!(oldest[0].title, "Notif 50");
757 }
758
759 #[tokio::test]
760 async fn test_mark_as_read_scoped_to_recipient() {
761 let store = NotificationStore::new();
762 let id_a = Uuid::new_v4();
763 let id_b = Uuid::new_v4();
764
765 store
766 .insert(StoredNotification {
767 id: id_a,
768 recipient_id: "user-A".to_string(),
769 notification_type: "test".to_string(),
770 title: "For A".to_string(),
771 body: String::new(),
772 data: Value::Null,
773 read: false,
774 created_at: Utc::now(),
775 })
776 .await;
777
778 store
779 .insert(StoredNotification {
780 id: id_b,
781 recipient_id: "user-B".to_string(),
782 notification_type: "test".to_string(),
783 title: "For B".to_string(),
784 body: String::new(),
785 data: Value::Null,
786 read: false,
787 created_at: Utc::now(),
788 })
789 .await;
790
791 let marked = store.mark_as_read(&[id_b], Some("user-A")).await;
793 assert_eq!(marked, 0);
794 assert_eq!(store.unread_count("user-B").await, 1); let marked = store.mark_as_read(&[id_a], Some("user-A")).await;
798 assert_eq!(marked, 1);
799 assert_eq!(store.unread_count("user-A").await, 0);
800 }
801
802 #[tokio::test]
803 async fn test_mark_as_read_global_fallback() {
804 let store = NotificationStore::new();
805 let id = Uuid::new_v4();
806
807 store
808 .insert(StoredNotification {
809 id,
810 recipient_id: "user-A".to_string(),
811 notification_type: "test".to_string(),
812 title: "Test".to_string(),
813 body: String::new(),
814 data: Value::Null,
815 read: false,
816 created_at: Utc::now(),
817 })
818 .await;
819
820 let marked = store.mark_as_read(&[id], None).await;
822 assert_eq!(marked, 1);
823 assert_eq!(store.unread_count("user-A").await, 0);
824 }
825
826 #[tokio::test]
829 async fn test_notification_broadcast_on_insert() {
830 let store = NotificationStore::new();
831 let mut rx = store.subscribe();
832
833 let notif_id = Uuid::new_v4();
834 store
835 .insert(StoredNotification {
836 id: notif_id,
837 recipient_id: "user-A".to_string(),
838 notification_type: "new_follower".to_string(),
839 title: "New follower".to_string(),
840 body: "Alice followed you".to_string(),
841 data: json!({"follower_name": "Alice"}),
842 read: false,
843 created_at: Utc::now(),
844 })
845 .await;
846
847 let received = rx.recv().await.expect("should receive broadcast");
848 assert_eq!(received.id, notif_id);
849 assert_eq!(received.recipient_id, "user-A");
850 assert_eq!(received.notification_type, "new_follower");
851 assert_eq!(received.title, "New follower");
852
853 assert_eq!(store.total_count("user-A").await, 1);
855 }
856
857 #[tokio::test]
858 async fn test_broadcast_without_subscriber() {
859 let store = NotificationStore::new();
860
861 store
863 .insert(StoredNotification {
864 id: Uuid::new_v4(),
865 recipient_id: "user-A".to_string(),
866 notification_type: "test".to_string(),
867 title: "No one listening".to_string(),
868 body: String::new(),
869 data: Value::Null,
870 read: false,
871 created_at: Utc::now(),
872 })
873 .await;
874
875 assert_eq!(store.total_count("user-A").await, 1);
877 }
878
879 #[tokio::test]
880 async fn test_broadcast_multiple_subscribers() {
881 let store = NotificationStore::new();
882 let mut rx1 = store.subscribe();
883 let mut rx2 = store.subscribe();
884
885 let notif_id = Uuid::new_v4();
886 store
887 .insert(StoredNotification {
888 id: notif_id,
889 recipient_id: "user-A".to_string(),
890 notification_type: "test".to_string(),
891 title: "For everyone".to_string(),
892 body: String::new(),
893 data: Value::Null,
894 read: false,
895 created_at: Utc::now(),
896 })
897 .await;
898
899 let r1 = rx1.recv().await.expect("rx1 should receive");
900 let r2 = rx2.recv().await.expect("rx2 should receive");
901
902 assert_eq!(r1.id, notif_id);
903 assert_eq!(r2.id, notif_id);
904 }
905}