Skip to main content

graphrag_core/async_processing/
monitoring.rs

1//! Performance monitoring and metrics collection for GraphRAG operations.
2//!
3//! This module provides the [`ProcessingMetrics`] collector for tracking comprehensive
4//! statistics about query execution, document processing, batch operations, rate limiting,
5//! and system resource usage.
6//!
7//! # Main Types
8//!
9//! - [`ProcessingMetrics`]: Thread-safe metrics collector using atomic operations
10//! - [`MetricsSummary`]: Comprehensive snapshot of all collected metrics
11//! - [`QueryMetrics`]: Query-specific statistics
12//! - [`DocumentMetrics`]: Document processing statistics
13//! - [`SystemMetrics`]: System-level performance metrics
14//!
15//! # Features
16//!
17//! - Thread-safe atomic counters for concurrent access
18//! - Duration tracking with automatic sliding window (last 1000 entries)
19//! - Success rate calculations for queries and document processing
20//! - Peak memory usage tracking
21//! - Uptime monitoring
22//! - Statistical aggregations (averages, rates)
23//! - Formatted summary reporting
24//!
25//! # Basic Usage
26//!
27//! ```rust,ignore
28//! use graphrag_core::async_processing::ProcessingMetrics;
29//! use std::time::Instant;
30//!
31//! let metrics = ProcessingMetrics::new();
32//!
33//! // Track a query
34//! metrics.increment_query_started();
35//! let start = Instant::now();
36//! // ... perform query ...
37//! metrics.record_query_duration(start.elapsed());
38//! metrics.increment_query_success();
39//!
40//! // Track document processing
41//! metrics.increment_document_processing_started();
42//! // ... process document ...
43//! metrics.increment_document_processing_success();
44//!
45//! // Get summary statistics
46//! let summary = metrics.get_summary();
47//! println!("Query success rate: {:.1}%",
48//!     summary.queries.success_rate * 100.0
49//! );
50//! println!("Average document duration: {:?}",
51//!     summary.documents.average_duration
52//! );
53//!
54//! // Print full report
55//! metrics.print_summary();
56//! ```
57
58use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
59use std::sync::{Arc, RwLock};
60use std::time::{Duration, Instant};
61
62/// Metrics collector for tracking processing performance and statistics
63///
64/// Thread-safe metrics tracking for queries, document processing, batches,
65/// rate limiting, and system resource usage. Uses atomic operations for
66/// counters and locks for duration collections.
67#[derive(Debug)]
68pub struct ProcessingMetrics {
69    // Query metrics
70    /// Number of queries started
71    queries_started: AtomicUsize,
72    /// Number of queries that completed successfully
73    queries_succeeded: AtomicUsize,
74    /// Number of queries that failed
75    queries_failed: AtomicUsize,
76    /// Collection of query execution durations (capped at 1000 entries)
77    query_durations: Arc<RwLock<Vec<Duration>>>,
78
79    // Document processing metrics
80    /// Number of document processing operations started
81    documents_started: AtomicUsize,
82    /// Number of documents processed successfully
83    documents_succeeded: AtomicUsize,
84    /// Number of document processing operations that failed
85    documents_failed: AtomicUsize,
86    /// Collection of document processing durations (capped at 1000 entries)
87    document_durations: Arc<RwLock<Vec<Duration>>>,
88
89    // Batch processing metrics
90    /// Number of batch processing operations started
91    batches_started: AtomicUsize,
92    /// Collection of batch processing durations (capped at 100 entries)
93    batch_durations: Arc<RwLock<Vec<Duration>>>,
94
95    // Rate limiting metrics
96    /// Number of rate limiting errors encountered
97    rate_limit_errors: AtomicUsize,
98
99    // System metrics
100    /// Peak memory usage observed in bytes
101    peak_memory_usage: AtomicU64,
102    /// Timestamp when metrics tracking started
103    creation_time: Instant,
104}
105
106impl ProcessingMetrics {
107    /// Creates a new metrics collector with all counters initialized to zero
108    pub fn new() -> Self {
109        Self {
110            queries_started: AtomicUsize::new(0),
111            queries_succeeded: AtomicUsize::new(0),
112            queries_failed: AtomicUsize::new(0),
113            query_durations: Arc::new(RwLock::new(Vec::new())),
114
115            documents_started: AtomicUsize::new(0),
116            documents_succeeded: AtomicUsize::new(0),
117            documents_failed: AtomicUsize::new(0),
118            document_durations: Arc::new(RwLock::new(Vec::new())),
119
120            batches_started: AtomicUsize::new(0),
121            batch_durations: Arc::new(RwLock::new(Vec::new())),
122
123            rate_limit_errors: AtomicUsize::new(0),
124
125            peak_memory_usage: AtomicU64::new(0),
126            creation_time: Instant::now(),
127        }
128    }
129
130    // Query metrics
131    /// Increments the counter for queries started
132    pub fn increment_query_started(&self) {
133        self.queries_started.fetch_add(1, Ordering::Relaxed);
134    }
135
136    /// Increments the counter for successfully completed queries
137    pub fn increment_query_success(&self) {
138        self.queries_succeeded.fetch_add(1, Ordering::Relaxed);
139    }
140
141    /// Increments the counter for failed queries
142    pub fn increment_query_error(&self) {
143        self.queries_failed.fetch_add(1, Ordering::Relaxed);
144    }
145
146    /// Records the execution duration of a query
147    ///
148    /// Keeps only the most recent 1000 measurements to prevent unbounded memory growth.
149    ///
150    /// # Parameters
151    /// - `duration`: Time taken to execute the query
152    pub fn record_query_duration(&self, duration: Duration) {
153        let mut durations = self.query_durations.write().expect("Lock poisoned");
154        durations.push(duration);
155        // Keep only last 1000 measurements to prevent memory growth
156        if durations.len() > 1000 {
157            durations.remove(0);
158        }
159    }
160
161    // Document processing metrics
162    /// Increments the counter for document processing operations started
163    pub fn increment_document_processing_started(&self) {
164        self.documents_started.fetch_add(1, Ordering::Relaxed);
165    }
166
167    /// Increments the counter for successfully processed documents
168    pub fn increment_document_processing_success(&self) {
169        self.documents_succeeded.fetch_add(1, Ordering::Relaxed);
170    }
171
172    /// Increments the counter for failed document processing operations
173    pub fn increment_document_processing_error(&self) {
174        self.documents_failed.fetch_add(1, Ordering::Relaxed);
175    }
176
177    /// Records the processing duration of a document
178    ///
179    /// Keeps only the most recent 1000 measurements to prevent unbounded memory growth.
180    ///
181    /// # Parameters
182    /// - `duration`: Time taken to process the document
183    pub fn record_document_processing_duration(&self, duration: Duration) {
184        let mut durations = self.document_durations.write().expect("Lock poisoned");
185        durations.push(duration);
186        // Keep only last 1000 measurements to prevent memory growth
187        if durations.len() > 1000 {
188            durations.remove(0);
189        }
190    }
191
192    // Batch processing metrics
193    /// Increments the counter for batch processing operations started
194    pub fn increment_batch_processing_started(&self) {
195        self.batches_started.fetch_add(1, Ordering::Relaxed);
196    }
197
198    /// Records the processing duration of a batch
199    ///
200    /// Keeps only the most recent 100 measurements to prevent unbounded memory growth.
201    ///
202    /// # Parameters
203    /// - `duration`: Time taken to process the batch
204    pub fn record_batch_processing_duration(&self, duration: Duration) {
205        let mut durations = self.batch_durations.write().expect("Lock poisoned");
206        durations.push(duration);
207        // Keep only last 100 measurements to prevent memory growth
208        if durations.len() > 100 {
209            durations.remove(0);
210        }
211    }
212
213    // Rate limiting metrics
214    /// Increments the counter for rate limiting errors
215    pub fn increment_rate_limit_errors(&self) {
216        self.rate_limit_errors.fetch_add(1, Ordering::Relaxed);
217    }
218
219    // System metrics
220    /// Updates peak memory usage if the new value exceeds the current peak
221    ///
222    /// # Parameters
223    /// - `memory_bytes`: Current memory usage in bytes
224    pub fn update_peak_memory_usage(&self, memory_bytes: u64) {
225        let current = self.peak_memory_usage.load(Ordering::Relaxed);
226        if memory_bytes > current {
227            self.peak_memory_usage
228                .store(memory_bytes, Ordering::Relaxed);
229        }
230    }
231
232    // Getters for current values
233    /// Returns the number of queries started
234    pub fn get_queries_started(&self) -> usize {
235        self.queries_started.load(Ordering::Relaxed)
236    }
237
238    /// Returns the number of queries that succeeded
239    pub fn get_queries_succeeded(&self) -> usize {
240        self.queries_succeeded.load(Ordering::Relaxed)
241    }
242
243    /// Returns the number of queries that failed
244    pub fn get_queries_failed(&self) -> usize {
245        self.queries_failed.load(Ordering::Relaxed)
246    }
247
248    /// Returns the number of document processing operations started
249    pub fn get_documents_started(&self) -> usize {
250        self.documents_started.load(Ordering::Relaxed)
251    }
252
253    /// Returns the number of documents processed successfully
254    pub fn get_documents_succeeded(&self) -> usize {
255        self.documents_succeeded.load(Ordering::Relaxed)
256    }
257
258    /// Returns the number of document processing operations that failed
259    pub fn get_documents_failed(&self) -> usize {
260        self.documents_failed.load(Ordering::Relaxed)
261    }
262
263    /// Returns the number of batch processing operations started
264    pub fn get_batches_started(&self) -> usize {
265        self.batches_started.load(Ordering::Relaxed)
266    }
267
268    /// Returns the number of rate limiting errors encountered
269    pub fn get_rate_limit_errors(&self) -> usize {
270        self.rate_limit_errors.load(Ordering::Relaxed)
271    }
272
273    /// Returns the peak memory usage observed in bytes
274    pub fn get_peak_memory_usage(&self) -> u64 {
275        self.peak_memory_usage.load(Ordering::Relaxed)
276    }
277
278    /// Returns the time elapsed since metrics tracking started
279    pub fn get_uptime(&self) -> Duration {
280        self.creation_time.elapsed()
281    }
282
283    // Statistical methods
284    /// Calculates the average query execution duration
285    ///
286    /// # Returns
287    /// Average duration if queries have been recorded, None otherwise
288    pub fn get_average_query_duration(&self) -> Option<Duration> {
289        let durations = self.query_durations.read().expect("Lock poisoned");
290        if durations.is_empty() {
291            None
292        } else {
293            let total_nanos: u64 = durations.iter().map(|d| d.as_nanos() as u64).sum();
294            Some(Duration::from_nanos(total_nanos / durations.len() as u64))
295        }
296    }
297
298    /// Calculates the average document processing duration
299    ///
300    /// # Returns
301    /// Average duration if documents have been processed, None otherwise
302    pub fn get_average_document_duration(&self) -> Option<Duration> {
303        let durations = self.document_durations.read().expect("Lock poisoned");
304        if durations.is_empty() {
305            None
306        } else {
307            let total_nanos: u64 = durations.iter().map(|d| d.as_nanos() as u64).sum();
308            Some(Duration::from_nanos(total_nanos / durations.len() as u64))
309        }
310    }
311
312    /// Calculates the query success rate
313    ///
314    /// # Returns
315    /// Ratio of successful queries to total queries (0.0-1.0), or 1.0 if no queries
316    pub fn get_query_success_rate(&self) -> f64 {
317        let total = self.get_queries_started();
318        if total == 0 {
319            1.0
320        } else {
321            self.get_queries_succeeded() as f64 / total as f64
322        }
323    }
324
325    /// Calculates the document processing success rate
326    ///
327    /// # Returns
328    /// Ratio of successful documents to total documents (0.0-1.0), or 1.0 if no documents
329    pub fn get_document_success_rate(&self) -> f64 {
330        let total = self.get_documents_started();
331        if total == 0 {
332            1.0
333        } else {
334            self.get_documents_succeeded() as f64 / total as f64
335        }
336    }
337
338    // Summary report
339    /// Generates a comprehensive summary of all metrics
340    ///
341    /// # Returns
342    /// Structured summary containing query, document, and system metrics
343    pub fn get_summary(&self) -> MetricsSummary {
344        MetricsSummary {
345            queries: QueryMetrics {
346                started: self.get_queries_started(),
347                succeeded: self.get_queries_succeeded(),
348                failed: self.get_queries_failed(),
349                success_rate: self.get_query_success_rate(),
350                average_duration: self.get_average_query_duration(),
351            },
352            documents: DocumentMetrics {
353                started: self.get_documents_started(),
354                succeeded: self.get_documents_succeeded(),
355                failed: self.get_documents_failed(),
356                success_rate: self.get_document_success_rate(),
357                average_duration: self.get_average_document_duration(),
358            },
359            system: SystemMetrics {
360                batches_processed: self.get_batches_started(),
361                rate_limit_errors: self.get_rate_limit_errors(),
362                peak_memory_usage: self.get_peak_memory_usage(),
363                uptime: self.get_uptime(),
364            },
365        }
366    }
367
368    /// Prints a formatted summary of all metrics to the log
369    pub fn print_summary(&self) {
370        let summary = self.get_summary();
371        tracing::info!("Processing Metrics Summary");
372
373        tracing::info!(
374            started = summary.queries.started,
375            succeeded = summary.queries.succeeded,
376            failed = summary.queries.failed,
377            success_rate = format!("{:.1}%", summary.queries.success_rate * 100.0),
378            average_duration = ?summary.queries.average_duration,
379            "Query metrics"
380        );
381
382        tracing::info!(
383            started = summary.documents.started,
384            succeeded = summary.documents.succeeded,
385            failed = summary.documents.failed,
386            success_rate = format!("{:.1}%", summary.documents.success_rate * 100.0),
387            average_duration = ?summary.documents.average_duration,
388            "Document metrics"
389        );
390
391        let peak_memory_mb = if summary.system.peak_memory_usage > 0 {
392            Some(summary.system.peak_memory_usage / 1024 / 1024)
393        } else {
394            None
395        };
396
397        tracing::info!(
398            batches_processed = summary.system.batches_processed,
399            rate_limit_errors = summary.system.rate_limit_errors,
400            peak_memory_mb = ?peak_memory_mb,
401            uptime = ?summary.system.uptime,
402            "System metrics"
403        );
404    }
405}
406
407/// Comprehensive metrics summary containing all tracked statistics
408#[derive(Debug, Clone)]
409pub struct MetricsSummary {
410    /// Query-related metrics
411    pub queries: QueryMetrics,
412    /// Document processing metrics
413    pub documents: DocumentMetrics,
414    /// System-level metrics
415    pub system: SystemMetrics,
416}
417
418/// Statistics for query operations
419#[derive(Debug, Clone)]
420pub struct QueryMetrics {
421    /// Number of queries started
422    pub started: usize,
423    /// Number of queries that succeeded
424    pub succeeded: usize,
425    /// Number of queries that failed
426    pub failed: usize,
427    /// Success rate (0.0-1.0)
428    pub success_rate: f64,
429    /// Average query execution duration
430    pub average_duration: Option<Duration>,
431}
432
433/// Statistics for document processing operations
434#[derive(Debug, Clone)]
435pub struct DocumentMetrics {
436    /// Number of document processing operations started
437    pub started: usize,
438    /// Number of documents processed successfully
439    pub succeeded: usize,
440    /// Number of document processing operations that failed
441    pub failed: usize,
442    /// Success rate (0.0-1.0)
443    pub success_rate: f64,
444    /// Average document processing duration
445    pub average_duration: Option<Duration>,
446}
447
448/// System-level performance metrics
449#[derive(Debug, Clone)]
450pub struct SystemMetrics {
451    /// Number of batch processing operations completed
452    pub batches_processed: usize,
453    /// Number of rate limiting errors encountered
454    pub rate_limit_errors: usize,
455    /// Peak memory usage observed in bytes
456    pub peak_memory_usage: u64,
457    /// Time elapsed since metrics tracking started
458    pub uptime: Duration,
459}
460
461impl Default for ProcessingMetrics {
462    fn default() -> Self {
463        Self::new()
464    }
465}