llm_memory_graph/observatory/
publisher.rs

1//! Event publisher traits and implementations
2
3use super::events::MemoryGraphEvent;
4use crate::error::Result;
5use async_trait::async_trait;
6use std::sync::Arc;
7use tokio::sync::RwLock;
8
9/// Trait for publishing Observatory events
10#[async_trait]
11pub trait EventPublisher: Send + Sync {
12    /// Publish a single event
13    async fn publish(&self, event: MemoryGraphEvent) -> Result<()>;
14
15    /// Publish a batch of events
16    async fn publish_batch(&self, events: Vec<MemoryGraphEvent>) -> Result<()> {
17        for event in events {
18            self.publish(event).await?;
19        }
20        Ok(())
21    }
22
23    /// Flush any pending events
24    async fn flush(&self) -> Result<()> {
25        Ok(())
26    }
27}
28
29/// In-memory event publisher for testing and development
30#[derive(Clone)]
31pub struct InMemoryPublisher {
32    events: Arc<RwLock<Vec<MemoryGraphEvent>>>,
33}
34
35impl InMemoryPublisher {
36    /// Create a new in-memory publisher
37    pub fn new() -> Self {
38        Self {
39            events: Arc::new(RwLock::new(Vec::new())),
40        }
41    }
42
43    /// Get all published events
44    pub async fn get_events(&self) -> Vec<MemoryGraphEvent> {
45        self.events.read().await.clone()
46    }
47
48    /// Get the number of published events
49    pub async fn count(&self) -> usize {
50        self.events.read().await.len()
51    }
52
53    /// Clear all events
54    pub async fn clear(&self) {
55        self.events.write().await.clear();
56    }
57
58    /// Get events of a specific type
59    pub async fn get_events_by_type(&self, event_type: &str) -> Vec<MemoryGraphEvent> {
60        self.events
61            .read()
62            .await
63            .iter()
64            .filter(|e| e.event_type() == event_type)
65            .cloned()
66            .collect()
67    }
68}
69
70impl Default for InMemoryPublisher {
71    fn default() -> Self {
72        Self::new()
73    }
74}
75
76#[async_trait]
77impl EventPublisher for InMemoryPublisher {
78    async fn publish(&self, event: MemoryGraphEvent) -> Result<()> {
79        self.events.write().await.push(event);
80        Ok(())
81    }
82
83    async fn publish_batch(&self, events: Vec<MemoryGraphEvent>) -> Result<()> {
84        self.events.write().await.extend(events);
85        Ok(())
86    }
87}
88
89/// No-op publisher that discards all events
90#[derive(Clone, Copy)]
91pub struct NoOpPublisher;
92
93#[async_trait]
94impl EventPublisher for NoOpPublisher {
95    async fn publish(&self, _event: MemoryGraphEvent) -> Result<()> {
96        Ok(())
97    }
98
99    async fn publish_batch(&self, _events: Vec<MemoryGraphEvent>) -> Result<()> {
100        Ok(())
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use super::*;
107    use crate::types::{NodeId, NodeType, SessionId};
108    use chrono::Utc;
109    use std::collections::HashMap;
110
111    #[tokio::test]
112    async fn test_in_memory_publisher() {
113        let publisher = InMemoryPublisher::new();
114
115        let event = MemoryGraphEvent::NodeCreated {
116            node_id: NodeId::new(),
117            node_type: NodeType::Prompt,
118            session_id: Some(SessionId::new()),
119            timestamp: Utc::now(),
120            metadata: HashMap::new(),
121        };
122
123        publisher.publish(event.clone()).await.unwrap();
124
125        let events = publisher.get_events().await;
126        assert_eq!(events.len(), 1);
127        assert_eq!(events[0].event_type(), event.event_type());
128
129        let count = publisher.count().await;
130        assert_eq!(count, 1);
131    }
132
133    #[tokio::test]
134    async fn test_in_memory_publisher_batch() {
135        let publisher = InMemoryPublisher::new();
136
137        let events = vec![
138            MemoryGraphEvent::NodeCreated {
139                node_id: NodeId::new(),
140                node_type: NodeType::Prompt,
141                session_id: None,
142                timestamp: Utc::now(),
143                metadata: HashMap::new(),
144            },
145            MemoryGraphEvent::NodeCreated {
146                node_id: NodeId::new(),
147                node_type: NodeType::Response,
148                session_id: None,
149                timestamp: Utc::now(),
150                metadata: HashMap::new(),
151            },
152        ];
153
154        publisher.publish_batch(events).await.unwrap();
155
156        assert_eq!(publisher.count().await, 2);
157    }
158
159    #[tokio::test]
160    async fn test_in_memory_publisher_clear() {
161        let publisher = InMemoryPublisher::new();
162
163        let event = MemoryGraphEvent::NodeCreated {
164            node_id: NodeId::new(),
165            node_type: NodeType::Prompt,
166            session_id: None,
167            timestamp: Utc::now(),
168            metadata: HashMap::new(),
169        };
170
171        publisher.publish(event).await.unwrap();
172        assert_eq!(publisher.count().await, 1);
173
174        publisher.clear().await;
175        assert_eq!(publisher.count().await, 0);
176    }
177
178    #[tokio::test]
179    async fn test_in_memory_publisher_filter_by_type() {
180        let publisher = InMemoryPublisher::new();
181
182        let node_event = MemoryGraphEvent::NodeCreated {
183            node_id: NodeId::new(),
184            node_type: NodeType::Prompt,
185            session_id: None,
186            timestamp: Utc::now(),
187            metadata: HashMap::new(),
188        };
189
190        let query_event = MemoryGraphEvent::QueryExecuted {
191            query_type: "test".to_string(),
192            results_count: 10,
193            duration_ms: 50,
194            timestamp: Utc::now(),
195        };
196
197        publisher.publish(node_event).await.unwrap();
198        publisher.publish(query_event).await.unwrap();
199
200        let node_events = publisher.get_events_by_type("node_created").await;
201        assert_eq!(node_events.len(), 1);
202
203        let query_events = publisher.get_events_by_type("query_executed").await;
204        assert_eq!(query_events.len(), 1);
205    }
206
207    #[tokio::test]
208    async fn test_noop_publisher() {
209        let publisher = NoOpPublisher;
210
211        let event = MemoryGraphEvent::NodeCreated {
212            node_id: NodeId::new(),
213            node_type: NodeType::Prompt,
214            session_id: None,
215            timestamp: Utc::now(),
216            metadata: HashMap::new(),
217        };
218
219        // Should not panic or error
220        publisher.publish(event).await.unwrap();
221        publisher.publish_batch(vec![]).await.unwrap();
222        publisher.flush().await.unwrap();
223    }
224}