graphrag_core/async_processing/monitoring.rs
1//! Performance monitoring and metrics collection for GraphRAG operations.
2//!
3//! This module provides the [`ProcessingMetrics`] collector for tracking comprehensive
4//! statistics about query execution, document processing, batch operations, rate limiting,
5//! and system resource usage.
6//!
7//! # Main Types
8//!
9//! - [`ProcessingMetrics`]: Thread-safe metrics collector using atomic operations
10//! - [`MetricsSummary`]: Comprehensive snapshot of all collected metrics
11//! - [`QueryMetrics`]: Query-specific statistics
12//! - [`DocumentMetrics`]: Document processing statistics
13//! - [`SystemMetrics`]: System-level performance metrics
14//!
15//! # Features
16//!
17//! - Thread-safe atomic counters for concurrent access
18//! - Duration tracking with automatic sliding window (last 1000 entries)
19//! - Success rate calculations for queries and document processing
20//! - Peak memory usage tracking
21//! - Uptime monitoring
22//! - Statistical aggregations (averages, rates)
23//! - Formatted summary reporting
24//!
25//! # Basic Usage
26//!
27//! ```rust,ignore
28//! use graphrag_core::async_processing::ProcessingMetrics;
29//! use std::time::Instant;
30//!
31//! let metrics = ProcessingMetrics::new();
32//!
33//! // Track a query
34//! metrics.increment_query_started();
35//! let start = Instant::now();
36//! // ... perform query ...
37//! metrics.record_query_duration(start.elapsed());
38//! metrics.increment_query_success();
39//!
40//! // Track document processing
41//! metrics.increment_document_processing_started();
42//! // ... process document ...
43//! metrics.increment_document_processing_success();
44//!
45//! // Get summary statistics
46//! let summary = metrics.get_summary();
47//! println!("Query success rate: {:.1}%",
48//! summary.queries.success_rate * 100.0
49//! );
50//! println!("Average document duration: {:?}",
51//! summary.documents.average_duration
52//! );
53//!
54//! // Print full report
55//! metrics.print_summary();
56//! ```
57
58use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
59use std::sync::{Arc, RwLock};
60use std::time::{Duration, Instant};
61
62/// Metrics collector for tracking processing performance and statistics
63///
64/// Thread-safe metrics tracking for queries, document processing, batches,
65/// rate limiting, and system resource usage. Uses atomic operations for
66/// counters and locks for duration collections.
67#[derive(Debug)]
68pub struct ProcessingMetrics {
69 // Query metrics
70 /// Number of queries started
71 queries_started: AtomicUsize,
72 /// Number of queries that completed successfully
73 queries_succeeded: AtomicUsize,
74 /// Number of queries that failed
75 queries_failed: AtomicUsize,
76 /// Collection of query execution durations (capped at 1000 entries)
77 query_durations: Arc<RwLock<Vec<Duration>>>,
78
79 // Document processing metrics
80 /// Number of document processing operations started
81 documents_started: AtomicUsize,
82 /// Number of documents processed successfully
83 documents_succeeded: AtomicUsize,
84 /// Number of document processing operations that failed
85 documents_failed: AtomicUsize,
86 /// Collection of document processing durations (capped at 1000 entries)
87 document_durations: Arc<RwLock<Vec<Duration>>>,
88
89 // Batch processing metrics
90 /// Number of batch processing operations started
91 batches_started: AtomicUsize,
92 /// Collection of batch processing durations (capped at 100 entries)
93 batch_durations: Arc<RwLock<Vec<Duration>>>,
94
95 // Rate limiting metrics
96 /// Number of rate limiting errors encountered
97 rate_limit_errors: AtomicUsize,
98
99 // System metrics
100 /// Peak memory usage observed in bytes
101 peak_memory_usage: AtomicU64,
102 /// Timestamp when metrics tracking started
103 creation_time: Instant,
104}
105
106impl ProcessingMetrics {
107 /// Creates a new metrics collector with all counters initialized to zero
108 pub fn new() -> Self {
109 Self {
110 queries_started: AtomicUsize::new(0),
111 queries_succeeded: AtomicUsize::new(0),
112 queries_failed: AtomicUsize::new(0),
113 query_durations: Arc::new(RwLock::new(Vec::new())),
114
115 documents_started: AtomicUsize::new(0),
116 documents_succeeded: AtomicUsize::new(0),
117 documents_failed: AtomicUsize::new(0),
118 document_durations: Arc::new(RwLock::new(Vec::new())),
119
120 batches_started: AtomicUsize::new(0),
121 batch_durations: Arc::new(RwLock::new(Vec::new())),
122
123 rate_limit_errors: AtomicUsize::new(0),
124
125 peak_memory_usage: AtomicU64::new(0),
126 creation_time: Instant::now(),
127 }
128 }
129
130 // Query metrics
131 /// Increments the counter for queries started
132 pub fn increment_query_started(&self) {
133 self.queries_started.fetch_add(1, Ordering::Relaxed);
134 }
135
136 /// Increments the counter for successfully completed queries
137 pub fn increment_query_success(&self) {
138 self.queries_succeeded.fetch_add(1, Ordering::Relaxed);
139 }
140
141 /// Increments the counter for failed queries
142 pub fn increment_query_error(&self) {
143 self.queries_failed.fetch_add(1, Ordering::Relaxed);
144 }
145
146 /// Records the execution duration of a query
147 ///
148 /// Keeps only the most recent 1000 measurements to prevent unbounded memory growth.
149 ///
150 /// # Parameters
151 /// - `duration`: Time taken to execute the query
152 pub fn record_query_duration(&self, duration: Duration) {
153 let mut durations = self.query_durations.write().expect("Lock poisoned");
154 durations.push(duration);
155 // Keep only last 1000 measurements to prevent memory growth
156 if durations.len() > 1000 {
157 durations.remove(0);
158 }
159 }
160
161 // Document processing metrics
162 /// Increments the counter for document processing operations started
163 pub fn increment_document_processing_started(&self) {
164 self.documents_started.fetch_add(1, Ordering::Relaxed);
165 }
166
167 /// Increments the counter for successfully processed documents
168 pub fn increment_document_processing_success(&self) {
169 self.documents_succeeded.fetch_add(1, Ordering::Relaxed);
170 }
171
172 /// Increments the counter for failed document processing operations
173 pub fn increment_document_processing_error(&self) {
174 self.documents_failed.fetch_add(1, Ordering::Relaxed);
175 }
176
177 /// Records the processing duration of a document
178 ///
179 /// Keeps only the most recent 1000 measurements to prevent unbounded memory growth.
180 ///
181 /// # Parameters
182 /// - `duration`: Time taken to process the document
183 pub fn record_document_processing_duration(&self, duration: Duration) {
184 let mut durations = self.document_durations.write().expect("Lock poisoned");
185 durations.push(duration);
186 // Keep only last 1000 measurements to prevent memory growth
187 if durations.len() > 1000 {
188 durations.remove(0);
189 }
190 }
191
192 // Batch processing metrics
193 /// Increments the counter for batch processing operations started
194 pub fn increment_batch_processing_started(&self) {
195 self.batches_started.fetch_add(1, Ordering::Relaxed);
196 }
197
198 /// Records the processing duration of a batch
199 ///
200 /// Keeps only the most recent 100 measurements to prevent unbounded memory growth.
201 ///
202 /// # Parameters
203 /// - `duration`: Time taken to process the batch
204 pub fn record_batch_processing_duration(&self, duration: Duration) {
205 let mut durations = self.batch_durations.write().expect("Lock poisoned");
206 durations.push(duration);
207 // Keep only last 100 measurements to prevent memory growth
208 if durations.len() > 100 {
209 durations.remove(0);
210 }
211 }
212
213 // Rate limiting metrics
214 /// Increments the counter for rate limiting errors
215 pub fn increment_rate_limit_errors(&self) {
216 self.rate_limit_errors.fetch_add(1, Ordering::Relaxed);
217 }
218
219 // System metrics
220 /// Updates peak memory usage if the new value exceeds the current peak
221 ///
222 /// # Parameters
223 /// - `memory_bytes`: Current memory usage in bytes
224 pub fn update_peak_memory_usage(&self, memory_bytes: u64) {
225 let current = self.peak_memory_usage.load(Ordering::Relaxed);
226 if memory_bytes > current {
227 self.peak_memory_usage
228 .store(memory_bytes, Ordering::Relaxed);
229 }
230 }
231
232 // Getters for current values
233 /// Returns the number of queries started
234 pub fn get_queries_started(&self) -> usize {
235 self.queries_started.load(Ordering::Relaxed)
236 }
237
238 /// Returns the number of queries that succeeded
239 pub fn get_queries_succeeded(&self) -> usize {
240 self.queries_succeeded.load(Ordering::Relaxed)
241 }
242
243 /// Returns the number of queries that failed
244 pub fn get_queries_failed(&self) -> usize {
245 self.queries_failed.load(Ordering::Relaxed)
246 }
247
248 /// Returns the number of document processing operations started
249 pub fn get_documents_started(&self) -> usize {
250 self.documents_started.load(Ordering::Relaxed)
251 }
252
253 /// Returns the number of documents processed successfully
254 pub fn get_documents_succeeded(&self) -> usize {
255 self.documents_succeeded.load(Ordering::Relaxed)
256 }
257
258 /// Returns the number of document processing operations that failed
259 pub fn get_documents_failed(&self) -> usize {
260 self.documents_failed.load(Ordering::Relaxed)
261 }
262
263 /// Returns the number of batch processing operations started
264 pub fn get_batches_started(&self) -> usize {
265 self.batches_started.load(Ordering::Relaxed)
266 }
267
268 /// Returns the number of rate limiting errors encountered
269 pub fn get_rate_limit_errors(&self) -> usize {
270 self.rate_limit_errors.load(Ordering::Relaxed)
271 }
272
273 /// Returns the peak memory usage observed in bytes
274 pub fn get_peak_memory_usage(&self) -> u64 {
275 self.peak_memory_usage.load(Ordering::Relaxed)
276 }
277
278 /// Returns the time elapsed since metrics tracking started
279 pub fn get_uptime(&self) -> Duration {
280 self.creation_time.elapsed()
281 }
282
283 // Statistical methods
284 /// Calculates the average query execution duration
285 ///
286 /// # Returns
287 /// Average duration if queries have been recorded, None otherwise
288 pub fn get_average_query_duration(&self) -> Option<Duration> {
289 let durations = self.query_durations.read().expect("Lock poisoned");
290 if durations.is_empty() {
291 None
292 } else {
293 let total_nanos: u64 = durations.iter().map(|d| d.as_nanos() as u64).sum();
294 Some(Duration::from_nanos(total_nanos / durations.len() as u64))
295 }
296 }
297
298 /// Calculates the average document processing duration
299 ///
300 /// # Returns
301 /// Average duration if documents have been processed, None otherwise
302 pub fn get_average_document_duration(&self) -> Option<Duration> {
303 let durations = self.document_durations.read().expect("Lock poisoned");
304 if durations.is_empty() {
305 None
306 } else {
307 let total_nanos: u64 = durations.iter().map(|d| d.as_nanos() as u64).sum();
308 Some(Duration::from_nanos(total_nanos / durations.len() as u64))
309 }
310 }
311
312 /// Calculates the query success rate
313 ///
314 /// # Returns
315 /// Ratio of successful queries to total queries (0.0-1.0), or 1.0 if no queries
316 pub fn get_query_success_rate(&self) -> f64 {
317 let total = self.get_queries_started();
318 if total == 0 {
319 1.0
320 } else {
321 self.get_queries_succeeded() as f64 / total as f64
322 }
323 }
324
325 /// Calculates the document processing success rate
326 ///
327 /// # Returns
328 /// Ratio of successful documents to total documents (0.0-1.0), or 1.0 if no documents
329 pub fn get_document_success_rate(&self) -> f64 {
330 let total = self.get_documents_started();
331 if total == 0 {
332 1.0
333 } else {
334 self.get_documents_succeeded() as f64 / total as f64
335 }
336 }
337
338 // Summary report
339 /// Generates a comprehensive summary of all metrics
340 ///
341 /// # Returns
342 /// Structured summary containing query, document, and system metrics
343 pub fn get_summary(&self) -> MetricsSummary {
344 MetricsSummary {
345 queries: QueryMetrics {
346 started: self.get_queries_started(),
347 succeeded: self.get_queries_succeeded(),
348 failed: self.get_queries_failed(),
349 success_rate: self.get_query_success_rate(),
350 average_duration: self.get_average_query_duration(),
351 },
352 documents: DocumentMetrics {
353 started: self.get_documents_started(),
354 succeeded: self.get_documents_succeeded(),
355 failed: self.get_documents_failed(),
356 success_rate: self.get_document_success_rate(),
357 average_duration: self.get_average_document_duration(),
358 },
359 system: SystemMetrics {
360 batches_processed: self.get_batches_started(),
361 rate_limit_errors: self.get_rate_limit_errors(),
362 peak_memory_usage: self.get_peak_memory_usage(),
363 uptime: self.get_uptime(),
364 },
365 }
366 }
367
368 /// Prints a formatted summary of all metrics to the log
369 pub fn print_summary(&self) {
370 let summary = self.get_summary();
371 tracing::info!("Processing Metrics Summary");
372
373 tracing::info!(
374 started = summary.queries.started,
375 succeeded = summary.queries.succeeded,
376 failed = summary.queries.failed,
377 success_rate = format!("{:.1}%", summary.queries.success_rate * 100.0),
378 average_duration = ?summary.queries.average_duration,
379 "Query metrics"
380 );
381
382 tracing::info!(
383 started = summary.documents.started,
384 succeeded = summary.documents.succeeded,
385 failed = summary.documents.failed,
386 success_rate = format!("{:.1}%", summary.documents.success_rate * 100.0),
387 average_duration = ?summary.documents.average_duration,
388 "Document metrics"
389 );
390
391 let peak_memory_mb = if summary.system.peak_memory_usage > 0 {
392 Some(summary.system.peak_memory_usage / 1024 / 1024)
393 } else {
394 None
395 };
396
397 tracing::info!(
398 batches_processed = summary.system.batches_processed,
399 rate_limit_errors = summary.system.rate_limit_errors,
400 peak_memory_mb = ?peak_memory_mb,
401 uptime = ?summary.system.uptime,
402 "System metrics"
403 );
404 }
405}
406
407/// Comprehensive metrics summary containing all tracked statistics
408#[derive(Debug, Clone)]
409pub struct MetricsSummary {
410 /// Query-related metrics
411 pub queries: QueryMetrics,
412 /// Document processing metrics
413 pub documents: DocumentMetrics,
414 /// System-level metrics
415 pub system: SystemMetrics,
416}
417
418/// Statistics for query operations
419#[derive(Debug, Clone)]
420pub struct QueryMetrics {
421 /// Number of queries started
422 pub started: usize,
423 /// Number of queries that succeeded
424 pub succeeded: usize,
425 /// Number of queries that failed
426 pub failed: usize,
427 /// Success rate (0.0-1.0)
428 pub success_rate: f64,
429 /// Average query execution duration
430 pub average_duration: Option<Duration>,
431}
432
433/// Statistics for document processing operations
434#[derive(Debug, Clone)]
435pub struct DocumentMetrics {
436 /// Number of document processing operations started
437 pub started: usize,
438 /// Number of documents processed successfully
439 pub succeeded: usize,
440 /// Number of document processing operations that failed
441 pub failed: usize,
442 /// Success rate (0.0-1.0)
443 pub success_rate: f64,
444 /// Average document processing duration
445 pub average_duration: Option<Duration>,
446}
447
448/// System-level performance metrics
449#[derive(Debug, Clone)]
450pub struct SystemMetrics {
451 /// Number of batch processing operations completed
452 pub batches_processed: usize,
453 /// Number of rate limiting errors encountered
454 pub rate_limit_errors: usize,
455 /// Peak memory usage observed in bytes
456 pub peak_memory_usage: u64,
457 /// Time elapsed since metrics tracking started
458 pub uptime: Duration,
459}
460
461impl Default for ProcessingMetrics {
462 fn default() -> Self {
463 Self::new()
464 }
465}