Skip to main content

this/events/sinks/
in_app.rs

1//! In-app notification sink — stores notifications per user
2//!
3//! This is the primary sink for the notification system. It stores
4//! structured notifications in memory (extensible to a database),
5//! supporting list, mark_as_read, and unread_count operations.
6//!
7//! # Payload format
8//!
9//! The `map` operator should produce a payload with these fields:
10//!
11//! ```json
12//! {
13//!     "title": "New follower",
14//!     "body": "Alice started following you",
15//!     "notification_type": "new_follower",
16//!     "recipient_id": "user-uuid",
17//!     "data": { ... }  // optional extra data
18//! }
19//! ```
20//!
21//! # Preferences
22//!
23//! If a `NotificationPreferencesStore` is attached, the sink checks
24//! user preferences before storing. Disabled notification types are
25//! silently dropped.
26
27use crate::config::sinks::SinkType;
28use crate::events::sinks::Sink;
29use crate::events::sinks::preferences::NotificationPreferencesStore;
30use anyhow::{Result, anyhow};
31use async_trait::async_trait;
32use chrono::{DateTime, Utc};
33use serde::{Deserialize, Serialize};
34use serde_json::Value;
35use std::collections::HashMap;
36use std::sync::Arc;
37use tokio::sync::{RwLock, broadcast};
38use uuid::Uuid;
39
40/// A stored notification
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct StoredNotification {
43    /// Unique notification ID
44    pub id: Uuid,
45
46    /// Recipient user ID
47    pub recipient_id: String,
48
49    /// Notification type (e.g., "new_follower", "new_like", "new_comment")
50    pub notification_type: String,
51
52    /// Human-readable title
53    pub title: String,
54
55    /// Human-readable body
56    pub body: String,
57
58    /// Additional payload data (optional)
59    #[serde(default)]
60    pub data: Value,
61
62    /// Whether the notification has been read
63    pub read: bool,
64
65    /// Creation timestamp
66    pub created_at: DateTime<Utc>,
67}
68
69/// Maximum notifications stored per user before eviction
70const MAX_PER_USER: usize = 1000;
71
72/// Broadcast channel capacity for real-time notification streaming.
73///
74/// Subscribers that fall behind by more than this many notifications
75/// will receive a `Lagged` error and miss intermediate events.
76const BROADCAST_CAPACITY: usize = 256;
77
78/// In-memory notification store
79///
80/// Thread-safe store for notifications, keyed by recipient_id.
81/// Each recipient has their own ordered list stored in chronological order
82/// (oldest first, newest last). Retrieval returns newest first.
83///
84/// When a user exceeds `MAX_PER_USER` notifications, the oldest are evicted.
85///
86/// ## Real-time streaming
87///
88/// A `tokio::sync::broadcast` channel broadcasts every inserted notification
89/// to subscribers (e.g., gRPC server-streaming RPCs). The broadcast is
90/// fire-and-forget: if no subscriber is listening, the notification is simply
91/// not delivered to the channel (no error, no panic).
92#[derive(Debug)]
93pub struct NotificationStore {
94    /// Notifications keyed by recipient_id (chronological order: oldest first)
95    notifications: RwLock<HashMap<String, Vec<StoredNotification>>>,
96
97    /// Broadcast channel for real-time notification streaming.
98    /// Every `insert()` sends a clone on this channel.
99    broadcast: broadcast::Sender<StoredNotification>,
100}
101
102impl NotificationStore {
103    /// Create an empty store with a broadcast channel
104    pub fn new() -> Self {
105        let (broadcast, _) = broadcast::channel(BROADCAST_CAPACITY);
106        Self {
107            notifications: RwLock::new(HashMap::new()),
108            broadcast,
109        }
110    }
111
112    /// Subscribe to real-time notifications.
113    ///
114    /// Returns a receiver that will get every notification inserted after
115    /// this call. Notifications inserted before are NOT replayed.
116    ///
117    /// The subscriber should filter by `recipient_id` if it only wants
118    /// notifications for a specific user.
119    pub fn subscribe(&self) -> broadcast::Receiver<StoredNotification> {
120        self.broadcast.subscribe()
121    }
122
123    /// Store a notification and broadcast it to real-time subscribers.
124    ///
125    /// Notifications are stored in chronological order (oldest first).
126    /// If the user exceeds `MAX_PER_USER`, the oldest notifications are evicted.
127    ///
128    /// The broadcast is fire-and-forget: if no subscriber is listening,
129    /// `send()` returns `Err` which is silently ignored.
130    pub async fn insert(&self, notification: StoredNotification) {
131        // Broadcast to real-time subscribers (fire-and-forget)
132        let _ = self.broadcast.send(notification.clone());
133
134        let mut store = self.notifications.write().await;
135        let user_notifs = store.entry(notification.recipient_id.clone()).or_default();
136        user_notifs.push(notification);
137
138        // Evict oldest if over capacity
139        if user_notifs.len() > MAX_PER_USER {
140            let excess = user_notifs.len() - MAX_PER_USER;
141            user_notifs.drain(0..excess);
142        }
143    }
144
145    /// List notifications for a user with pagination
146    ///
147    /// Returns notifications ordered by creation time (newest first).
148    /// No sorting needed — stored in chronological order, iterated in reverse.
149    pub async fn list_by_user(
150        &self,
151        recipient_id: &str,
152        limit: usize,
153        offset: usize,
154    ) -> Vec<StoredNotification> {
155        let store = self.notifications.read().await;
156        let Some(user_notifications) = store.get(recipient_id) else {
157            return Vec::new();
158        };
159
160        // Iterate in reverse (newest first) — no clone+sort needed
161        user_notifications
162            .iter()
163            .rev()
164            .skip(offset)
165            .take(limit)
166            .cloned()
167            .collect()
168    }
169
170    /// Mark notifications as read by their IDs
171    ///
172    /// If `recipient_id` is provided, only searches that user's notifications
173    /// (avoiding a full scan). Otherwise scans all users.
174    ///
175    /// Returns the number of notifications actually marked as read.
176    pub async fn mark_as_read(
177        &self,
178        notification_ids: &[Uuid],
179        recipient_id: Option<&str>,
180    ) -> usize {
181        let mut store = self.notifications.write().await;
182        let mut count = 0;
183
184        let values: Box<dyn Iterator<Item = &mut Vec<StoredNotification>>> =
185            if let Some(rid) = recipient_id {
186                // Scoped search: only this user's notifications
187                Box::new(store.get_mut(rid).into_iter())
188            } else {
189                // Global scan (fallback)
190                Box::new(store.values_mut())
191            };
192
193        for notifications in values {
194            for notif in notifications.iter_mut() {
195                if notification_ids.contains(&notif.id) && !notif.read {
196                    notif.read = true;
197                    count += 1;
198                }
199            }
200        }
201
202        count
203    }
204
205    /// Mark all notifications for a user as read
206    pub async fn mark_all_as_read(&self, recipient_id: &str) -> usize {
207        let mut store = self.notifications.write().await;
208        let Some(notifications) = store.get_mut(recipient_id) else {
209            return 0;
210        };
211
212        let mut count = 0;
213        for notif in notifications.iter_mut() {
214            if !notif.read {
215                notif.read = true;
216                count += 1;
217            }
218        }
219        count
220    }
221
222    /// Count unread notifications for a user
223    pub async fn unread_count(&self, recipient_id: &str) -> usize {
224        let store = self.notifications.read().await;
225        store
226            .get(recipient_id)
227            .map(|notifs| notifs.iter().filter(|n| !n.read).count())
228            .unwrap_or(0)
229    }
230
231    /// Total notification count for a user
232    pub async fn total_count(&self, recipient_id: &str) -> usize {
233        let store = self.notifications.read().await;
234        store.get(recipient_id).map(|n| n.len()).unwrap_or(0)
235    }
236
237    /// Delete a notification by ID
238    pub async fn delete(&self, notification_id: &Uuid) -> bool {
239        let mut store = self.notifications.write().await;
240        for notifications in store.values_mut() {
241            if let Some(pos) = notifications.iter().position(|n| n.id == *notification_id) {
242                notifications.remove(pos);
243                return true;
244            }
245        }
246        false
247    }
248}
249
250impl Default for NotificationStore {
251    fn default() -> Self {
252        Self::new()
253    }
254}
255
256/// In-app notification sink
257///
258/// Receives payloads from the `deliver` operator and stores them
259/// as structured notifications in the `NotificationStore`.
260///
261/// Optionally checks user notification preferences before storing.
262#[derive(Debug)]
263pub struct InAppNotificationSink {
264    /// The notification store
265    store: Arc<NotificationStore>,
266
267    /// Optional preferences store (checks before delivering)
268    preferences: Option<Arc<NotificationPreferencesStore>>,
269}
270
271impl InAppNotificationSink {
272    /// Create a new InAppNotificationSink
273    pub fn new(store: Arc<NotificationStore>) -> Self {
274        Self {
275            store,
276            preferences: None,
277        }
278    }
279
280    /// Create with a preferences store
281    pub fn with_preferences(
282        store: Arc<NotificationStore>,
283        preferences: Arc<NotificationPreferencesStore>,
284    ) -> Self {
285        Self {
286            store,
287            preferences: Some(preferences),
288        }
289    }
290
291    /// Access the underlying notification store
292    pub fn store(&self) -> &Arc<NotificationStore> {
293        &self.store
294    }
295}
296
297#[async_trait]
298impl Sink for InAppNotificationSink {
299    async fn deliver(
300        &self,
301        payload: Value,
302        recipient_id: Option<&str>,
303        context_vars: &HashMap<String, Value>,
304    ) -> Result<()> {
305        // Determine recipient: explicit parameter > payload field > context variable
306        let recipient =
307            super::resolve_recipient(recipient_id, &payload, context_vars).ok_or_else(|| {
308                anyhow!(
309                    "in_app sink: recipient_id not found. \
310                     Provide it as a parameter, in the payload, or as a context variable."
311                )
312            })?;
313
314        // Extract notification fields from payload
315        let title = payload
316            .get("title")
317            .and_then(|v| v.as_str())
318            .unwrap_or("Notification")
319            .to_string();
320
321        let body = payload
322            .get("body")
323            .and_then(|v| v.as_str())
324            .unwrap_or("")
325            .to_string();
326
327        let notification_type = payload
328            .get("notification_type")
329            .and_then(|v| v.as_str())
330            .unwrap_or("generic")
331            .to_string();
332
333        let data = payload.get("data").cloned().unwrap_or(Value::Null);
334
335        // Check preferences if available
336        if let Some(prefs_store) = &self.preferences
337            && !prefs_store.is_enabled(&recipient, &notification_type).await
338        {
339            tracing::debug!(
340                recipient = %recipient,
341                notification_type = %notification_type,
342                "in_app sink: notification type disabled by user preferences, skipping"
343            );
344            return Ok(());
345        }
346
347        // Create and store the notification
348        let notification = StoredNotification {
349            id: Uuid::new_v4(),
350            recipient_id: recipient,
351            notification_type,
352            title,
353            body,
354            data,
355            read: false,
356            created_at: Utc::now(),
357        };
358
359        self.store.insert(notification).await;
360        Ok(())
361    }
362
363    fn name(&self) -> &str {
364        "in_app"
365    }
366
367    fn sink_type(&self) -> SinkType {
368        SinkType::InApp
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375    use serde_json::json;
376
377    #[tokio::test]
378    async fn test_store_insert_and_list() {
379        let store = NotificationStore::new();
380
381        for i in 0..5 {
382            store
383                .insert(StoredNotification {
384                    id: Uuid::new_v4(),
385                    recipient_id: "user-A".to_string(),
386                    notification_type: "new_follower".to_string(),
387                    title: format!("Follower {}", i),
388                    body: format!("User {} followed you", i),
389                    data: Value::Null,
390                    read: false,
391                    created_at: Utc::now() + chrono::Duration::seconds(i as i64),
392                })
393                .await;
394        }
395
396        // List with limit
397        let page = store.list_by_user("user-A", 3, 0).await;
398        assert_eq!(page.len(), 3);
399        // Newest first
400        assert_eq!(page[0].title, "Follower 4");
401        assert_eq!(page[1].title, "Follower 3");
402        assert_eq!(page[2].title, "Follower 2");
403    }
404
405    #[tokio::test]
406    async fn test_store_pagination() {
407        let store = NotificationStore::new();
408
409        for i in 0..5 {
410            store
411                .insert(StoredNotification {
412                    id: Uuid::new_v4(),
413                    recipient_id: "user-A".to_string(),
414                    notification_type: "test".to_string(),
415                    title: format!("Notif {}", i),
416                    body: String::new(),
417                    data: Value::Null,
418                    read: false,
419                    created_at: Utc::now() + chrono::Duration::seconds(i as i64),
420                })
421                .await;
422        }
423
424        // Page 2 (offset=3, limit=3) → should get 2 items
425        let page2 = store.list_by_user("user-A", 3, 3).await;
426        assert_eq!(page2.len(), 2);
427    }
428
429    #[tokio::test]
430    async fn test_store_mark_as_read() {
431        let store = NotificationStore::new();
432        let id1 = Uuid::new_v4();
433        let id2 = Uuid::new_v4();
434        let id3 = Uuid::new_v4();
435
436        for (id, i) in [(id1, 0), (id2, 1), (id3, 2)] {
437            store
438                .insert(StoredNotification {
439                    id,
440                    recipient_id: "user-A".to_string(),
441                    notification_type: "test".to_string(),
442                    title: format!("Notif {}", i),
443                    body: String::new(),
444                    data: Value::Null,
445                    read: false,
446                    created_at: Utc::now(),
447                })
448                .await;
449        }
450
451        assert_eq!(store.unread_count("user-A").await, 3);
452
453        // Mark 2 as read (scoped to user-A)
454        let marked = store.mark_as_read(&[id1, id2], Some("user-A")).await;
455        assert_eq!(marked, 2);
456        assert_eq!(store.unread_count("user-A").await, 1);
457    }
458
459    #[tokio::test]
460    async fn test_store_mark_all_as_read() {
461        let store = NotificationStore::new();
462
463        for i in 0..5 {
464            store
465                .insert(StoredNotification {
466                    id: Uuid::new_v4(),
467                    recipient_id: "user-A".to_string(),
468                    notification_type: "test".to_string(),
469                    title: format!("Notif {}", i),
470                    body: String::new(),
471                    data: Value::Null,
472                    read: false,
473                    created_at: Utc::now(),
474                })
475                .await;
476        }
477
478        assert_eq!(store.unread_count("user-A").await, 5);
479
480        let marked = store.mark_all_as_read("user-A").await;
481        assert_eq!(marked, 5);
482        assert_eq!(store.unread_count("user-A").await, 0);
483    }
484
485    #[tokio::test]
486    async fn test_store_separate_users() {
487        let store = NotificationStore::new();
488
489        store
490            .insert(StoredNotification {
491                id: Uuid::new_v4(),
492                recipient_id: "user-A".to_string(),
493                notification_type: "test".to_string(),
494                title: "For A".to_string(),
495                body: String::new(),
496                data: Value::Null,
497                read: false,
498                created_at: Utc::now(),
499            })
500            .await;
501
502        store
503            .insert(StoredNotification {
504                id: Uuid::new_v4(),
505                recipient_id: "user-B".to_string(),
506                notification_type: "test".to_string(),
507                title: "For B".to_string(),
508                body: String::new(),
509                data: Value::Null,
510                read: false,
511                created_at: Utc::now(),
512            })
513            .await;
514
515        assert_eq!(store.unread_count("user-A").await, 1);
516        assert_eq!(store.unread_count("user-B").await, 1);
517        assert_eq!(store.total_count("user-A").await, 1);
518    }
519
520    #[tokio::test]
521    async fn test_store_delete() {
522        let store = NotificationStore::new();
523        let id = Uuid::new_v4();
524
525        store
526            .insert(StoredNotification {
527                id,
528                recipient_id: "user-A".to_string(),
529                notification_type: "test".to_string(),
530                title: "Will be deleted".to_string(),
531                body: String::new(),
532                data: Value::Null,
533                read: false,
534                created_at: Utc::now(),
535            })
536            .await;
537
538        assert_eq!(store.total_count("user-A").await, 1);
539        assert!(store.delete(&id).await);
540        assert_eq!(store.total_count("user-A").await, 0);
541        assert!(!store.delete(&id).await); // Already deleted
542    }
543
544    #[tokio::test]
545    async fn test_store_empty_user() {
546        let store = NotificationStore::new();
547        assert_eq!(store.unread_count("nobody").await, 0);
548        assert_eq!(store.list_by_user("nobody", 10, 0).await.len(), 0);
549    }
550
551    // ── Sink trait tests ────────────────────────────────────────────
552
553    #[tokio::test]
554    async fn test_sink_deliver_from_payload() {
555        let store = Arc::new(NotificationStore::new());
556        let sink = InAppNotificationSink::new(store.clone());
557
558        let payload = json!({
559            "title": "New follower",
560            "body": "Alice followed you",
561            "notification_type": "new_follower",
562            "recipient_id": "user-A",
563            "data": {"follower_name": "Alice"}
564        });
565
566        sink.deliver(payload, None, &HashMap::new()).await.unwrap();
567
568        let notifs = store.list_by_user("user-A", 10, 0).await;
569        assert_eq!(notifs.len(), 1);
570        assert_eq!(notifs[0].title, "New follower");
571        assert_eq!(notifs[0].body, "Alice followed you");
572        assert_eq!(notifs[0].notification_type, "new_follower");
573        assert!(!notifs[0].read);
574        assert_eq!(notifs[0].data, json!({"follower_name": "Alice"}));
575    }
576
577    #[tokio::test]
578    async fn test_sink_deliver_explicit_recipient() {
579        let store = Arc::new(NotificationStore::new());
580        let sink = InAppNotificationSink::new(store.clone());
581
582        let payload = json!({
583            "title": "Hello",
584            "body": "World",
585            "notification_type": "test"
586        });
587
588        // Explicit recipient_id parameter overrides payload
589        sink.deliver(payload, Some("user-B"), &HashMap::new())
590            .await
591            .unwrap();
592
593        assert_eq!(store.unread_count("user-B").await, 1);
594    }
595
596    #[tokio::test]
597    async fn test_sink_deliver_recipient_from_context() {
598        let store = Arc::new(NotificationStore::new());
599        let sink = InAppNotificationSink::new(store.clone());
600
601        let payload = json!({
602            "title": "Hello",
603            "notification_type": "test"
604        });
605
606        let mut vars = HashMap::new();
607        vars.insert(
608            "recipient_id".to_string(),
609            Value::String("user-C".to_string()),
610        );
611
612        sink.deliver(payload, None, &vars).await.unwrap();
613        assert_eq!(store.unread_count("user-C").await, 1);
614    }
615
616    #[tokio::test]
617    async fn test_sink_deliver_no_recipient_error() {
618        let store = Arc::new(NotificationStore::new());
619        let sink = InAppNotificationSink::new(store);
620
621        let payload = json!({
622            "title": "Hello",
623            "notification_type": "test"
624        });
625
626        let result = sink.deliver(payload, None, &HashMap::new()).await;
627        assert!(result.is_err());
628        assert!(result.unwrap_err().to_string().contains("recipient_id"));
629    }
630
631    #[tokio::test]
632    async fn test_sink_deliver_defaults() {
633        let store = Arc::new(NotificationStore::new());
634        let sink = InAppNotificationSink::new(store.clone());
635
636        // Minimal payload — no title, body, notification_type
637        let payload = json!({
638            "recipient_id": "user-A"
639        });
640
641        sink.deliver(payload, None, &HashMap::new()).await.unwrap();
642
643        let notifs = store.list_by_user("user-A", 10, 0).await;
644        assert_eq!(notifs[0].title, "Notification");
645        assert_eq!(notifs[0].body, "");
646        assert_eq!(notifs[0].notification_type, "generic");
647    }
648
649    #[tokio::test]
650    async fn test_sink_name_and_type() {
651        let sink = InAppNotificationSink::new(Arc::new(NotificationStore::new()));
652        assert_eq!(sink.name(), "in_app");
653        assert_eq!(sink.sink_type(), SinkType::InApp);
654    }
655
656    // ── Preferences integration tests ───────────────────────────────
657
658    #[tokio::test]
659    async fn test_sink_with_preferences_disabled_type_skipped() {
660        let store = Arc::new(NotificationStore::new());
661        let prefs = Arc::new(NotificationPreferencesStore::new());
662        prefs.disable_type("user-A", "new_like").await;
663
664        let sink = InAppNotificationSink::with_preferences(store.clone(), prefs);
665
666        // Deliver a "new_like" notification — should be skipped
667        let payload = json!({
668            "title": "New like",
669            "notification_type": "new_like",
670            "recipient_id": "user-A"
671        });
672        sink.deliver(payload, None, &HashMap::new()).await.unwrap();
673        assert_eq!(store.unread_count("user-A").await, 0);
674
675        // Deliver a "new_follower" notification — should be stored
676        let payload = json!({
677            "title": "New follower",
678            "notification_type": "new_follower",
679            "recipient_id": "user-A"
680        });
681        sink.deliver(payload, None, &HashMap::new()).await.unwrap();
682        assert_eq!(store.unread_count("user-A").await, 1);
683    }
684
685    #[tokio::test]
686    async fn test_sink_with_preferences_muted_user() {
687        let store = Arc::new(NotificationStore::new());
688        let prefs = Arc::new(NotificationPreferencesStore::new());
689        prefs.mute("user-A").await;
690
691        let sink = InAppNotificationSink::with_preferences(store.clone(), prefs);
692
693        // All notification types should be skipped when muted
694        for notif_type in &["new_follower", "new_like", "new_comment"] {
695            let payload = json!({
696                "title": "Test",
697                "notification_type": notif_type,
698                "recipient_id": "user-A"
699            });
700            sink.deliver(payload, None, &HashMap::new()).await.unwrap();
701        }
702
703        assert_eq!(store.unread_count("user-A").await, 0);
704    }
705
706    #[tokio::test]
707    async fn test_sink_without_preferences_delivers_all() {
708        let store = Arc::new(NotificationStore::new());
709        // No preferences store → all types delivered
710        let sink = InAppNotificationSink::new(store.clone());
711
712        for notif_type in &["new_follower", "new_like", "new_comment"] {
713            let payload = json!({
714                "title": "Test",
715                "notification_type": notif_type,
716                "recipient_id": "user-A"
717            });
718            sink.deliver(payload, None, &HashMap::new()).await.unwrap();
719        }
720
721        assert_eq!(store.unread_count("user-A").await, 3);
722    }
723
724    // ── Eviction + mark_as_read scoped tests ──────────────────────────
725
726    #[tokio::test]
727    async fn test_store_evicts_oldest_beyond_max() {
728        let store = NotificationStore::new();
729
730        // Insert MAX_PER_USER + 50 notifications
731        let total = MAX_PER_USER + 50;
732        for i in 0..total {
733            store
734                .insert(StoredNotification {
735                    id: Uuid::new_v4(),
736                    recipient_id: "user-A".to_string(),
737                    notification_type: "test".to_string(),
738                    title: format!("Notif {}", i),
739                    body: String::new(),
740                    data: Value::Null,
741                    read: false,
742                    created_at: Utc::now() + chrono::Duration::seconds(i as i64),
743                })
744                .await;
745        }
746
747        // Should be capped at MAX_PER_USER
748        assert_eq!(store.total_count("user-A").await, MAX_PER_USER);
749
750        // Newest should still be present (last inserted)
751        let latest = store.list_by_user("user-A", 1, 0).await;
752        assert_eq!(latest[0].title, format!("Notif {}", total - 1));
753
754        // Oldest 50 should have been evicted — first kept is Notif 50
755        let oldest = store.list_by_user("user-A", 1, MAX_PER_USER - 1).await;
756        assert_eq!(oldest[0].title, "Notif 50");
757    }
758
759    #[tokio::test]
760    async fn test_mark_as_read_scoped_to_recipient() {
761        let store = NotificationStore::new();
762        let id_a = Uuid::new_v4();
763        let id_b = Uuid::new_v4();
764
765        store
766            .insert(StoredNotification {
767                id: id_a,
768                recipient_id: "user-A".to_string(),
769                notification_type: "test".to_string(),
770                title: "For A".to_string(),
771                body: String::new(),
772                data: Value::Null,
773                read: false,
774                created_at: Utc::now(),
775            })
776            .await;
777
778        store
779            .insert(StoredNotification {
780                id: id_b,
781                recipient_id: "user-B".to_string(),
782                notification_type: "test".to_string(),
783                title: "For B".to_string(),
784                body: String::new(),
785                data: Value::Null,
786                read: false,
787                created_at: Utc::now(),
788            })
789            .await;
790
791        // Scoped mark_as_read: try to mark id_b but scoped to user-A → should find 0
792        let marked = store.mark_as_read(&[id_b], Some("user-A")).await;
793        assert_eq!(marked, 0);
794        assert_eq!(store.unread_count("user-B").await, 1); // Still unread
795
796        // Scoped mark_as_read: mark id_a scoped to user-A → should find 1
797        let marked = store.mark_as_read(&[id_a], Some("user-A")).await;
798        assert_eq!(marked, 1);
799        assert_eq!(store.unread_count("user-A").await, 0);
800    }
801
802    #[tokio::test]
803    async fn test_mark_as_read_global_fallback() {
804        let store = NotificationStore::new();
805        let id = Uuid::new_v4();
806
807        store
808            .insert(StoredNotification {
809                id,
810                recipient_id: "user-A".to_string(),
811                notification_type: "test".to_string(),
812                title: "Test".to_string(),
813                body: String::new(),
814                data: Value::Null,
815                read: false,
816                created_at: Utc::now(),
817            })
818            .await;
819
820        // Without recipient_id — global scan fallback
821        let marked = store.mark_as_read(&[id], None).await;
822        assert_eq!(marked, 1);
823        assert_eq!(store.unread_count("user-A").await, 0);
824    }
825
826    // ── Broadcast channel tests ─────────────────────────────────────
827
828    #[tokio::test]
829    async fn test_notification_broadcast_on_insert() {
830        let store = NotificationStore::new();
831        let mut rx = store.subscribe();
832
833        let notif_id = Uuid::new_v4();
834        store
835            .insert(StoredNotification {
836                id: notif_id,
837                recipient_id: "user-A".to_string(),
838                notification_type: "new_follower".to_string(),
839                title: "New follower".to_string(),
840                body: "Alice followed you".to_string(),
841                data: json!({"follower_name": "Alice"}),
842                read: false,
843                created_at: Utc::now(),
844            })
845            .await;
846
847        let received = rx.recv().await.expect("should receive broadcast");
848        assert_eq!(received.id, notif_id);
849        assert_eq!(received.recipient_id, "user-A");
850        assert_eq!(received.notification_type, "new_follower");
851        assert_eq!(received.title, "New follower");
852
853        // Also stored in the store
854        assert_eq!(store.total_count("user-A").await, 1);
855    }
856
857    #[tokio::test]
858    async fn test_broadcast_without_subscriber() {
859        let store = NotificationStore::new();
860
861        // Insert without any subscriber — should NOT panic
862        store
863            .insert(StoredNotification {
864                id: Uuid::new_v4(),
865                recipient_id: "user-A".to_string(),
866                notification_type: "test".to_string(),
867                title: "No one listening".to_string(),
868                body: String::new(),
869                data: Value::Null,
870                read: false,
871                created_at: Utc::now(),
872            })
873            .await;
874
875        // Still stored
876        assert_eq!(store.total_count("user-A").await, 1);
877    }
878
879    #[tokio::test]
880    async fn test_broadcast_multiple_subscribers() {
881        let store = NotificationStore::new();
882        let mut rx1 = store.subscribe();
883        let mut rx2 = store.subscribe();
884
885        let notif_id = Uuid::new_v4();
886        store
887            .insert(StoredNotification {
888                id: notif_id,
889                recipient_id: "user-A".to_string(),
890                notification_type: "test".to_string(),
891                title: "For everyone".to_string(),
892                body: String::new(),
893                data: Value::Null,
894                read: false,
895                created_at: Utc::now(),
896            })
897            .await;
898
899        let r1 = rx1.recv().await.expect("rx1 should receive");
900        let r2 = rx2.recv().await.expect("rx2 should receive");
901
902        assert_eq!(r1.id, notif_id);
903        assert_eq!(r2.id, notif_id);
904    }
905}