llm_memory_graph/observatory/
metrics.rs

1//! Metrics collection for memory graph operations
2
3use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
4use std::sync::Arc;
5
6/// Metrics collector for memory graph operations
7#[derive(Clone)]
8pub struct MemoryGraphMetrics {
9    // Counters
10    nodes_created: Arc<AtomicUsize>,
11    edges_created: Arc<AtomicUsize>,
12    prompts_submitted: Arc<AtomicUsize>,
13    responses_generated: Arc<AtomicUsize>,
14    tools_invoked: Arc<AtomicUsize>,
15    queries_executed: Arc<AtomicUsize>,
16
17    // Latency tracking (in microseconds for precision)
18    total_write_latency_us: Arc<AtomicU64>,
19    write_count: Arc<AtomicUsize>,
20    total_read_latency_us: Arc<AtomicU64>,
21    read_count: Arc<AtomicUsize>,
22}
23
24impl MemoryGraphMetrics {
25    /// Create a new metrics collector
26    pub fn new() -> Self {
27        Self {
28            nodes_created: Arc::new(AtomicUsize::new(0)),
29            edges_created: Arc::new(AtomicUsize::new(0)),
30            prompts_submitted: Arc::new(AtomicUsize::new(0)),
31            responses_generated: Arc::new(AtomicUsize::new(0)),
32            tools_invoked: Arc::new(AtomicUsize::new(0)),
33            queries_executed: Arc::new(AtomicUsize::new(0)),
34            total_write_latency_us: Arc::new(AtomicU64::new(0)),
35            write_count: Arc::new(AtomicUsize::new(0)),
36            total_read_latency_us: Arc::new(AtomicU64::new(0)),
37            read_count: Arc::new(AtomicUsize::new(0)),
38        }
39    }
40
41    /// Record a node creation
42    pub fn record_node_created(&self) {
43        self.nodes_created.fetch_add(1, Ordering::Relaxed);
44    }
45
46    /// Record an edge creation
47    pub fn record_edge_created(&self) {
48        self.edges_created.fetch_add(1, Ordering::Relaxed);
49    }
50
51    /// Record a prompt submission
52    pub fn record_prompt_submitted(&self) {
53        self.prompts_submitted.fetch_add(1, Ordering::Relaxed);
54    }
55
56    /// Record a response generation
57    pub fn record_response_generated(&self) {
58        self.responses_generated.fetch_add(1, Ordering::Relaxed);
59    }
60
61    /// Record a tool invocation
62    pub fn record_tool_invoked(&self) {
63        self.tools_invoked.fetch_add(1, Ordering::Relaxed);
64    }
65
66    /// Record a query execution
67    pub fn record_query_executed(&self) {
68        self.queries_executed.fetch_add(1, Ordering::Relaxed);
69    }
70
71    /// Record write latency in microseconds
72    pub fn record_write_latency_us(&self, latency_us: u64) {
73        self.total_write_latency_us
74            .fetch_add(latency_us, Ordering::Relaxed);
75        self.write_count.fetch_add(1, Ordering::Relaxed);
76    }
77
78    /// Record read latency in microseconds
79    pub fn record_read_latency_us(&self, latency_us: u64) {
80        self.total_read_latency_us
81            .fetch_add(latency_us, Ordering::Relaxed);
82        self.read_count.fetch_add(1, Ordering::Relaxed);
83    }
84
85    /// Get a snapshot of current metrics
86    pub fn snapshot(&self) -> MetricsSnapshot {
87        let write_count = self.write_count.load(Ordering::Relaxed);
88        let read_count = self.read_count.load(Ordering::Relaxed);
89
90        let avg_write_latency_us = if write_count > 0 {
91            self.total_write_latency_us.load(Ordering::Relaxed) as f64 / write_count as f64
92        } else {
93            0.0
94        };
95
96        let avg_read_latency_us = if read_count > 0 {
97            self.total_read_latency_us.load(Ordering::Relaxed) as f64 / read_count as f64
98        } else {
99            0.0
100        };
101
102        MetricsSnapshot {
103            nodes_created: self.nodes_created.load(Ordering::Relaxed),
104            edges_created: self.edges_created.load(Ordering::Relaxed),
105            prompts_submitted: self.prompts_submitted.load(Ordering::Relaxed),
106            responses_generated: self.responses_generated.load(Ordering::Relaxed),
107            tools_invoked: self.tools_invoked.load(Ordering::Relaxed),
108            queries_executed: self.queries_executed.load(Ordering::Relaxed),
109            avg_write_latency_ms: avg_write_latency_us / 1000.0,
110            avg_read_latency_ms: avg_read_latency_us / 1000.0,
111        }
112    }
113
114    /// Reset all metrics
115    pub fn reset(&self) {
116        self.nodes_created.store(0, Ordering::Relaxed);
117        self.edges_created.store(0, Ordering::Relaxed);
118        self.prompts_submitted.store(0, Ordering::Relaxed);
119        self.responses_generated.store(0, Ordering::Relaxed);
120        self.tools_invoked.store(0, Ordering::Relaxed);
121        self.queries_executed.store(0, Ordering::Relaxed);
122        self.total_write_latency_us.store(0, Ordering::Relaxed);
123        self.write_count.store(0, Ordering::Relaxed);
124        self.total_read_latency_us.store(0, Ordering::Relaxed);
125        self.read_count.store(0, Ordering::Relaxed);
126    }
127}
128
129impl Default for MemoryGraphMetrics {
130    fn default() -> Self {
131        Self::new()
132    }
133}
134
135/// Snapshot of metrics at a point in time
136#[derive(Debug, Clone)]
137pub struct MetricsSnapshot {
138    /// Total nodes created
139    pub nodes_created: usize,
140    /// Total edges created
141    pub edges_created: usize,
142    /// Total prompts submitted
143    pub prompts_submitted: usize,
144    /// Total responses generated
145    pub responses_generated: usize,
146    /// Total tools invoked
147    pub tools_invoked: usize,
148    /// Total queries executed
149    pub queries_executed: usize,
150    /// Average write latency in milliseconds
151    pub avg_write_latency_ms: f64,
152    /// Average read latency in milliseconds
153    pub avg_read_latency_ms: f64,
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159
160    #[test]
161    fn test_metrics_creation() {
162        let metrics = MemoryGraphMetrics::new();
163        let snapshot = metrics.snapshot();
164
165        assert_eq!(snapshot.nodes_created, 0);
166        assert_eq!(snapshot.edges_created, 0);
167        assert_eq!(snapshot.prompts_submitted, 0);
168    }
169
170    #[test]
171    fn test_metrics_recording() {
172        let metrics = MemoryGraphMetrics::new();
173
174        metrics.record_node_created();
175        metrics.record_node_created();
176        metrics.record_edge_created();
177        metrics.record_prompt_submitted();
178
179        let snapshot = metrics.snapshot();
180        assert_eq!(snapshot.nodes_created, 2);
181        assert_eq!(snapshot.edges_created, 1);
182        assert_eq!(snapshot.prompts_submitted, 1);
183    }
184
185    #[test]
186    fn test_latency_tracking() {
187        let metrics = MemoryGraphMetrics::new();
188
189        // Record some write latencies (in microseconds)
190        metrics.record_write_latency_us(1000); // 1ms
191        metrics.record_write_latency_us(2000); // 2ms
192        metrics.record_write_latency_us(3000); // 3ms
193
194        let snapshot = metrics.snapshot();
195        assert_eq!(snapshot.avg_write_latency_ms, 2.0); // Average of 1, 2, 3 ms
196    }
197
198    #[test]
199    fn test_metrics_reset() {
200        let metrics = MemoryGraphMetrics::new();
201
202        metrics.record_node_created();
203        metrics.record_edge_created();
204        metrics.record_prompt_submitted();
205
206        let snapshot_before = metrics.snapshot();
207        assert_eq!(snapshot_before.nodes_created, 1);
208
209        metrics.reset();
210
211        let snapshot_after = metrics.snapshot();
212        assert_eq!(snapshot_after.nodes_created, 0);
213        assert_eq!(snapshot_after.edges_created, 0);
214        assert_eq!(snapshot_after.prompts_submitted, 0);
215    }
216
217    #[test]
218    fn test_concurrent_metrics_update() {
219        use std::sync::Arc;
220        use std::thread;
221
222        let metrics = Arc::new(MemoryGraphMetrics::new());
223        let mut handles = vec![];
224
225        // Spawn 10 threads, each incrementing counters 100 times
226        for _ in 0..10 {
227            let metrics_clone = Arc::clone(&metrics);
228            let handle = thread::spawn(move || {
229                for _ in 0..100 {
230                    metrics_clone.record_node_created();
231                }
232            });
233            handles.push(handle);
234        }
235
236        // Wait for all threads to complete
237        for handle in handles {
238            handle.join().unwrap();
239        }
240
241        let snapshot = metrics.snapshot();
242        assert_eq!(snapshot.nodes_created, 1000);
243    }
244}