llm_memory_graph/observatory/
publisher.rs1use super::events::MemoryGraphEvent;
4use crate::error::Result;
5use async_trait::async_trait;
6use std::sync::Arc;
7use tokio::sync::RwLock;
8
9#[async_trait]
11pub trait EventPublisher: Send + Sync {
12 async fn publish(&self, event: MemoryGraphEvent) -> Result<()>;
14
15 async fn publish_batch(&self, events: Vec<MemoryGraphEvent>) -> Result<()> {
17 for event in events {
18 self.publish(event).await?;
19 }
20 Ok(())
21 }
22
23 async fn flush(&self) -> Result<()> {
25 Ok(())
26 }
27}
28
29#[derive(Clone)]
31pub struct InMemoryPublisher {
32 events: Arc<RwLock<Vec<MemoryGraphEvent>>>,
33}
34
35impl InMemoryPublisher {
36 pub fn new() -> Self {
38 Self {
39 events: Arc::new(RwLock::new(Vec::new())),
40 }
41 }
42
43 pub async fn get_events(&self) -> Vec<MemoryGraphEvent> {
45 self.events.read().await.clone()
46 }
47
48 pub async fn count(&self) -> usize {
50 self.events.read().await.len()
51 }
52
53 pub async fn clear(&self) {
55 self.events.write().await.clear();
56 }
57
58 pub async fn get_events_by_type(&self, event_type: &str) -> Vec<MemoryGraphEvent> {
60 self.events
61 .read()
62 .await
63 .iter()
64 .filter(|e| e.event_type() == event_type)
65 .cloned()
66 .collect()
67 }
68}
69
70impl Default for InMemoryPublisher {
71 fn default() -> Self {
72 Self::new()
73 }
74}
75
76#[async_trait]
77impl EventPublisher for InMemoryPublisher {
78 async fn publish(&self, event: MemoryGraphEvent) -> Result<()> {
79 self.events.write().await.push(event);
80 Ok(())
81 }
82
83 async fn publish_batch(&self, events: Vec<MemoryGraphEvent>) -> Result<()> {
84 self.events.write().await.extend(events);
85 Ok(())
86 }
87}
88
89#[derive(Clone, Copy)]
91pub struct NoOpPublisher;
92
93#[async_trait]
94impl EventPublisher for NoOpPublisher {
95 async fn publish(&self, _event: MemoryGraphEvent) -> Result<()> {
96 Ok(())
97 }
98
99 async fn publish_batch(&self, _events: Vec<MemoryGraphEvent>) -> Result<()> {
100 Ok(())
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use super::*;
107 use crate::types::{NodeId, NodeType, SessionId};
108 use chrono::Utc;
109 use std::collections::HashMap;
110
111 #[tokio::test]
112 async fn test_in_memory_publisher() {
113 let publisher = InMemoryPublisher::new();
114
115 let event = MemoryGraphEvent::NodeCreated {
116 node_id: NodeId::new(),
117 node_type: NodeType::Prompt,
118 session_id: Some(SessionId::new()),
119 timestamp: Utc::now(),
120 metadata: HashMap::new(),
121 };
122
123 publisher.publish(event.clone()).await.unwrap();
124
125 let events = publisher.get_events().await;
126 assert_eq!(events.len(), 1);
127 assert_eq!(events[0].event_type(), event.event_type());
128
129 let count = publisher.count().await;
130 assert_eq!(count, 1);
131 }
132
133 #[tokio::test]
134 async fn test_in_memory_publisher_batch() {
135 let publisher = InMemoryPublisher::new();
136
137 let events = vec![
138 MemoryGraphEvent::NodeCreated {
139 node_id: NodeId::new(),
140 node_type: NodeType::Prompt,
141 session_id: None,
142 timestamp: Utc::now(),
143 metadata: HashMap::new(),
144 },
145 MemoryGraphEvent::NodeCreated {
146 node_id: NodeId::new(),
147 node_type: NodeType::Response,
148 session_id: None,
149 timestamp: Utc::now(),
150 metadata: HashMap::new(),
151 },
152 ];
153
154 publisher.publish_batch(events).await.unwrap();
155
156 assert_eq!(publisher.count().await, 2);
157 }
158
159 #[tokio::test]
160 async fn test_in_memory_publisher_clear() {
161 let publisher = InMemoryPublisher::new();
162
163 let event = MemoryGraphEvent::NodeCreated {
164 node_id: NodeId::new(),
165 node_type: NodeType::Prompt,
166 session_id: None,
167 timestamp: Utc::now(),
168 metadata: HashMap::new(),
169 };
170
171 publisher.publish(event).await.unwrap();
172 assert_eq!(publisher.count().await, 1);
173
174 publisher.clear().await;
175 assert_eq!(publisher.count().await, 0);
176 }
177
178 #[tokio::test]
179 async fn test_in_memory_publisher_filter_by_type() {
180 let publisher = InMemoryPublisher::new();
181
182 let node_event = MemoryGraphEvent::NodeCreated {
183 node_id: NodeId::new(),
184 node_type: NodeType::Prompt,
185 session_id: None,
186 timestamp: Utc::now(),
187 metadata: HashMap::new(),
188 };
189
190 let query_event = MemoryGraphEvent::QueryExecuted {
191 query_type: "test".to_string(),
192 results_count: 10,
193 duration_ms: 50,
194 timestamp: Utc::now(),
195 };
196
197 publisher.publish(node_event).await.unwrap();
198 publisher.publish(query_event).await.unwrap();
199
200 let node_events = publisher.get_events_by_type("node_created").await;
201 assert_eq!(node_events.len(), 1);
202
203 let query_events = publisher.get_events_by_type("query_executed").await;
204 assert_eq!(query_events.len(), 1);
205 }
206
207 #[tokio::test]
208 async fn test_noop_publisher() {
209 let publisher = NoOpPublisher;
210
211 let event = MemoryGraphEvent::NodeCreated {
212 node_id: NodeId::new(),
213 node_type: NodeType::Prompt,
214 session_id: None,
215 timestamp: Utc::now(),
216 metadata: HashMap::new(),
217 };
218
219 publisher.publish(event).await.unwrap();
221 publisher.publish_batch(vec![]).await.unwrap();
222 publisher.flush().await.unwrap();
223 }
224}