Skip to main content

greentic_start/notifier/
memory.rs

1use crate::notifier::{ActivityNotifier, EventStream, NotifierError, NotifyEvent};
2use crate::operator_log;
3use async_trait::async_trait;
4use dashmap::DashMap;
5use futures_util::StreamExt;
6use std::sync::Arc;
7use tokio::sync::broadcast;
8
9pub struct InMemoryNotifier {
10    channels: Arc<DashMap<(String, String), broadcast::Sender<NotifyEvent>>>,
11    capacity: usize,
12}
13
14impl InMemoryNotifier {
15    pub fn new(capacity: usize) -> Self {
16        Self {
17            channels: Arc::new(DashMap::new()),
18            capacity,
19        }
20    }
21}
22
23#[async_trait]
24impl ActivityNotifier for InMemoryNotifier {
25    async fn publish(&self, event: NotifyEvent) {
26        let key = (event.tenant_id.clone(), event.conversation_id.clone());
27        if let Some(sender) = self.channels.get(&key) {
28            let receiver_count = sender.receiver_count();
29            // send returns Err(SendError) if no receivers — drop silently.
30            let send_result = sender.send(event.clone());
31            operator_log::debug(
32                module_path!(),
33                format!(
34                    "[ws notifier:memory] publish tenant={} conv={} watermark={} subscribers={} send_ok={}",
35                    event.tenant_id,
36                    event.conversation_id,
37                    event.new_watermark,
38                    receiver_count,
39                    send_result.is_ok(),
40                ),
41            );
42        } else {
43            operator_log::debug(
44                module_path!(),
45                format!(
46                    "[ws notifier:memory] publish tenant={} conv={} watermark={} no_channel_for_key",
47                    event.tenant_id, event.conversation_id, event.new_watermark,
48                ),
49            );
50        }
51    }
52
53    async fn subscribe(
54        &self,
55        tenant_id: &str,
56        conversation_id: &str,
57    ) -> Result<EventStream, NotifierError> {
58        let key = (tenant_id.to_string(), conversation_id.to_string());
59        let sender = self
60            .channels
61            .entry(key)
62            .or_insert_with(|| broadcast::channel(self.capacity).0)
63            .clone();
64        let receiver = sender.subscribe();
65        let stream = tokio_stream::wrappers::BroadcastStream::new(receiver)
66            .filter_map(|res| async move { res.ok() });
67        Ok(Box::pin(stream))
68    }
69}
70
71#[cfg(test)]
72mod tests {
73    use super::*;
74
75    fn event(conv: &str, wm: u64) -> NotifyEvent {
76        NotifyEvent {
77            tenant_id: "tenant1".into(),
78            conversation_id: conv.into(),
79            new_watermark: wm,
80        }
81    }
82
83    #[tokio::test]
84    async fn publish_with_no_subscribers_drops_silently() {
85        let notifier = InMemoryNotifier::new(8);
86        notifier.publish(event("conv1", 1)).await;
87        // Should not panic or error.
88    }
89
90    #[tokio::test]
91    async fn subscribe_then_publish_delivers_event() {
92        let notifier = InMemoryNotifier::new(8);
93        let mut stream = notifier.subscribe("tenant1", "conv1").await.unwrap();
94
95        notifier.publish(event("conv1", 5)).await;
96
97        let received = stream.next().await.expect("expected event");
98        assert_eq!(received.new_watermark, 5);
99    }
100
101    #[tokio::test]
102    async fn multi_subscribers_same_conv_all_receive() {
103        let notifier = InMemoryNotifier::new(8);
104        let mut s1 = notifier.subscribe("tenant1", "conv1").await.unwrap();
105        let mut s2 = notifier.subscribe("tenant1", "conv1").await.unwrap();
106
107        notifier.publish(event("conv1", 7)).await;
108
109        let r1 = s1.next().await.expect("s1 event");
110        let r2 = s2.next().await.expect("s2 event");
111        assert_eq!(r1.new_watermark, 7);
112        assert_eq!(r2.new_watermark, 7);
113    }
114
115    #[tokio::test]
116    async fn different_conversations_isolated() {
117        let notifier = InMemoryNotifier::new(8);
118        let mut s_a = notifier.subscribe("tenant1", "convA").await.unwrap();
119
120        notifier.publish(event("convB", 1)).await;
121
122        // Use a short timeout; convA subscriber must not see convB's event.
123        let result = tokio::time::timeout(std::time::Duration::from_millis(50), s_a.next()).await;
124        assert!(result.is_err(), "should have timed out (no event)");
125    }
126}