greentic_start/notifier/
memory.rs1use crate::notifier::{ActivityNotifier, EventStream, NotifierError, NotifyEvent};
2use crate::operator_log;
3use async_trait::async_trait;
4use dashmap::DashMap;
5use futures_util::StreamExt;
6use std::sync::Arc;
7use tokio::sync::broadcast;
8
9pub struct InMemoryNotifier {
10 channels: Arc<DashMap<(String, String), broadcast::Sender<NotifyEvent>>>,
11 capacity: usize,
12}
13
14impl InMemoryNotifier {
15 pub fn new(capacity: usize) -> Self {
16 Self {
17 channels: Arc::new(DashMap::new()),
18 capacity,
19 }
20 }
21}
22
23#[async_trait]
24impl ActivityNotifier for InMemoryNotifier {
25 async fn publish(&self, event: NotifyEvent) {
26 let key = (event.tenant_id.clone(), event.conversation_id.clone());
27 if let Some(sender) = self.channels.get(&key) {
28 let receiver_count = sender.receiver_count();
29 let send_result = sender.send(event.clone());
31 operator_log::debug(
32 module_path!(),
33 format!(
34 "[ws notifier:memory] publish tenant={} conv={} watermark={} subscribers={} send_ok={}",
35 event.tenant_id,
36 event.conversation_id,
37 event.new_watermark,
38 receiver_count,
39 send_result.is_ok(),
40 ),
41 );
42 } else {
43 operator_log::debug(
44 module_path!(),
45 format!(
46 "[ws notifier:memory] publish tenant={} conv={} watermark={} no_channel_for_key",
47 event.tenant_id, event.conversation_id, event.new_watermark,
48 ),
49 );
50 }
51 }
52
53 async fn subscribe(
54 &self,
55 tenant_id: &str,
56 conversation_id: &str,
57 ) -> Result<EventStream, NotifierError> {
58 let key = (tenant_id.to_string(), conversation_id.to_string());
59 let sender = self
60 .channels
61 .entry(key)
62 .or_insert_with(|| broadcast::channel(self.capacity).0)
63 .clone();
64 let receiver = sender.subscribe();
65 let stream = tokio_stream::wrappers::BroadcastStream::new(receiver)
66 .filter_map(|res| async move { res.ok() });
67 Ok(Box::pin(stream))
68 }
69}
70
71#[cfg(test)]
72mod tests {
73 use super::*;
74
75 fn event(conv: &str, wm: u64) -> NotifyEvent {
76 NotifyEvent {
77 tenant_id: "tenant1".into(),
78 conversation_id: conv.into(),
79 new_watermark: wm,
80 }
81 }
82
83 #[tokio::test]
84 async fn publish_with_no_subscribers_drops_silently() {
85 let notifier = InMemoryNotifier::new(8);
86 notifier.publish(event("conv1", 1)).await;
87 }
89
90 #[tokio::test]
91 async fn subscribe_then_publish_delivers_event() {
92 let notifier = InMemoryNotifier::new(8);
93 let mut stream = notifier.subscribe("tenant1", "conv1").await.unwrap();
94
95 notifier.publish(event("conv1", 5)).await;
96
97 let received = stream.next().await.expect("expected event");
98 assert_eq!(received.new_watermark, 5);
99 }
100
101 #[tokio::test]
102 async fn multi_subscribers_same_conv_all_receive() {
103 let notifier = InMemoryNotifier::new(8);
104 let mut s1 = notifier.subscribe("tenant1", "conv1").await.unwrap();
105 let mut s2 = notifier.subscribe("tenant1", "conv1").await.unwrap();
106
107 notifier.publish(event("conv1", 7)).await;
108
109 let r1 = s1.next().await.expect("s1 event");
110 let r2 = s2.next().await.expect("s2 event");
111 assert_eq!(r1.new_watermark, 7);
112 assert_eq!(r2.new_watermark, 7);
113 }
114
115 #[tokio::test]
116 async fn different_conversations_isolated() {
117 let notifier = InMemoryNotifier::new(8);
118 let mut s_a = notifier.subscribe("tenant1", "convA").await.unwrap();
119
120 notifier.publish(event("convB", 1)).await;
121
122 let result = tokio::time::timeout(std::time::Duration::from_millis(50), s_a.next()).await;
124 assert!(result.is_err(), "should have timed out (no event)");
125 }
126}