Skip to main content

graphrag_core/async_processing/
mod.rs

1//! Async processing utilities for GraphRAG operations
2//!
3//! This module provides async processing capabilities including:
4//! - Concurrent document processing
5//! - Rate limiting for API calls
6//! - Performance monitoring and metrics
7//! - Thread pool management
8//! - Task scheduling and coordination
9
10use indexmap::IndexMap;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::RwLock;
14
15use crate::core::{Document, GraphRAGError, KnowledgeGraph};
16
17pub mod concurrent_pipeline;
18pub mod monitoring;
19pub mod rate_limiting;
20
21pub use concurrent_pipeline::ConcurrentProcessor;
22pub use monitoring::ProcessingMetrics;
23pub use rate_limiting::RateLimiter;
24
25/// Result of processing a single document
26#[derive(Debug, Clone)]
27pub struct ProcessingResult {
28    /// Unique identifier of the processed document
29    pub document_id: crate::core::DocumentId,
30    /// Number of entities successfully extracted from the document
31    pub entities_extracted: usize,
32    /// Number of text chunks processed
33    pub chunks_processed: usize,
34    /// Total time taken to process the document
35    pub processing_time: Duration,
36    /// Whether the processing completed successfully
37    pub success: bool,
38}
39
40/// Configuration for async processing operations
41#[derive(Debug, Clone)]
42pub struct AsyncConfig {
43    /// Maximum number of concurrent LLM API calls allowed
44    pub max_concurrent_llm_calls: usize,
45    /// Maximum number of concurrent embedding API calls allowed
46    pub max_concurrent_embeddings: usize,
47    /// Maximum number of documents to process concurrently
48    pub max_concurrent_documents: usize,
49    /// Rate limit for LLM API calls (requests per second)
50    pub llm_rate_limit_per_second: f64,
51    /// Rate limit for embedding API calls (requests per second)
52    pub embedding_rate_limit_per_second: f64,
53}
54
55impl Default for AsyncConfig {
56    fn default() -> Self {
57        Self {
58            max_concurrent_llm_calls: 3,
59            max_concurrent_embeddings: 5,
60            max_concurrent_documents: 10,
61            llm_rate_limit_per_second: 2.0,
62            embedding_rate_limit_per_second: 10.0,
63        }
64    }
65}
66
67/// Core async GraphRAG processor with concurrency control and monitoring
68#[derive(Debug)]
69pub struct AsyncGraphRAGCore {
70    /// Shared knowledge graph for storing entities and relationships
71    graph: Arc<RwLock<KnowledgeGraph>>,
72    /// Rate limiter for throttling API calls
73    rate_limiter: Arc<RateLimiter>,
74    /// Concurrent processor for batch document processing
75    concurrent_processor: Arc<ConcurrentProcessor>,
76    /// Metrics collector for tracking processing statistics
77    metrics: Arc<ProcessingMetrics>,
78    /// Configuration settings for async operations
79    config: AsyncConfig,
80}
81
82impl AsyncGraphRAGCore {
83    /// Creates a new async GraphRAG core instance with configuration
84    ///
85    /// Initializes the knowledge graph, rate limiter, concurrent processor,
86    /// and metrics tracking system.
87    ///
88    /// # Parameters
89    /// - `graph`: Knowledge graph for entity storage
90    /// - `config`: Configuration for async processing behavior
91    ///
92    /// # Returns
93    /// Configured async GraphRAG instance, or an error if initialization fails
94    pub async fn new(graph: KnowledgeGraph, config: AsyncConfig) -> Result<Self, GraphRAGError> {
95        let rate_limiter = Arc::new(RateLimiter::new(&config));
96        let concurrent_processor = Arc::new(ConcurrentProcessor::new(config.max_concurrent_documents));
97        let metrics = Arc::new(ProcessingMetrics::new());
98
99        Ok(Self {
100            graph: Arc::new(RwLock::new(graph)),
101            rate_limiter,
102            concurrent_processor,
103            metrics,
104            config,
105        })
106    }
107
108    /// Processes multiple documents concurrently with rate limiting
109    ///
110    /// Distributes documents across concurrent workers while respecting
111    /// rate limits and collecting metrics.
112    ///
113    /// # Parameters
114    /// - `documents`: Collection of documents to process
115    ///
116    /// # Returns
117    /// Vector of processing results for all documents, or an error
118    pub async fn process_documents_async(
119        &self,
120        documents: Vec<Document>,
121    ) -> Result<Vec<ProcessingResult>, GraphRAGError> {
122        let start_time = Instant::now();
123        self.metrics.increment_batch_processing_started();
124
125        tracing::info!(document_count = documents.len(), "Processing documents concurrently");
126
127        let results = self
128            .concurrent_processor
129            .process_batch(
130                documents,
131                Arc::clone(&self.graph),
132                Arc::clone(&self.rate_limiter),
133                Arc::clone(&self.metrics),
134            )
135            .await?;
136
137        let duration = start_time.elapsed();
138        self.metrics.record_batch_processing_duration(duration);
139
140        tracing::info!(
141            duration_ms = duration.as_millis(),
142            successes = results.len(),
143            "Batch processing completed"
144        );
145
146        Ok(results)
147    }
148
149    /// Processes a single document asynchronously with rate limiting
150    ///
151    /// Applies entity extraction and updates the knowledge graph for one document.
152    /// Automatically handles rate limiting and metrics collection.
153    ///
154    /// # Parameters
155    /// - `document`: Document to process
156    ///
157    /// # Returns
158    /// Processing result containing extraction statistics, or an error
159    pub async fn process_single_document_async(
160        &self,
161        document: Document,
162    ) -> Result<ProcessingResult, GraphRAGError> {
163        let start_time = Instant::now();
164        self.metrics.increment_document_processing_started();
165
166        // Acquire rate limiting permits
167        let _llm_permit = self.rate_limiter.acquire_llm_permit().await?;
168
169        let result = {
170            let _graph = self.graph.read().await;
171            // For now, create a simple processing result
172            // In a full implementation, this would use proper entity extraction
173            ProcessingResult {
174                document_id: document.id.clone(),
175                entities_extracted: 0,
176                chunks_processed: document.chunks.len(),
177                processing_time: start_time.elapsed(),
178                success: true,
179            }
180        };
181
182        let duration = start_time.elapsed();
183
184        if result.success {
185            self.metrics.increment_document_processing_success();
186            self.metrics.record_document_processing_duration(duration);
187        } else {
188            self.metrics.increment_document_processing_error();
189            tracing::warn!(document_id = %result.document_id, "Document processing failed");
190        }
191
192        Ok(result)
193    }
194
195    /// Executes a query against the knowledge graph asynchronously
196    ///
197    /// Processes a user query by searching the knowledge graph and generating
198    /// a response using retrieved entities and relationships.
199    ///
200    /// # Parameters
201    /// - `query`: User's query string
202    ///
203    /// # Returns
204    /// Generated response string, or an error if processing fails
205    pub async fn query_async(
206        &self,
207        query: &str,
208    ) -> Result<String, GraphRAGError> {
209        let start_time = Instant::now();
210        self.metrics.increment_query_started();
211
212        // Acquire rate limiting permits
213        let _llm_permit = self.rate_limiter.acquire_llm_permit().await?;
214
215        // Basic implementation - in production this would use proper query processing
216        let result = {
217            let graph = self.graph.read().await;
218            let entity_count = graph.entities().count();
219
220            if entity_count == 0 {
221                Err(GraphRAGError::Unsupported {
222                    operation: "query processing".to_string(),
223                    reason: "No entities in knowledge graph".to_string(),
224                })
225            } else {
226                // Simple placeholder response
227                Ok(format!(
228                    "Query processed: '{query}'. Found {entity_count} entities in graph. This is a basic implementation."
229                ))
230            }
231        };
232
233        let duration = start_time.elapsed();
234
235        match &result {
236            Ok(_) => {
237                self.metrics.increment_query_success();
238                self.metrics.record_query_duration(duration);
239                tracing::info!(duration_ms = duration.as_millis(), "Query completed successfully");
240            }
241            Err(e) => {
242                self.metrics.increment_query_error();
243                tracing::error!(error = %e, "Query processing error");
244            }
245        }
246
247        result
248    }
249
250    /// Retrieves current processing metrics
251    ///
252    /// # Returns
253    /// Reference to the metrics collector
254    pub fn get_metrics(&self) -> &ProcessingMetrics {
255        &self.metrics
256    }
257
258    /// Retrieves current configuration
259    ///
260    /// # Returns
261    /// Reference to the async processing configuration
262    pub fn get_config(&self) -> &AsyncConfig {
263        &self.config
264    }
265
266    /// Performs health check on all system components
267    ///
268    /// Checks the status of the knowledge graph and rate limiter to determine
269    /// overall system health.
270    ///
271    /// # Returns
272    /// Health status summary for all components
273    pub async fn health_check(&self) -> HealthStatus {
274        let graph_status = {
275            let graph = self.graph.read().await;
276            if graph.entities().count() > 0 {
277                ComponentStatus::Healthy
278            } else {
279                ComponentStatus::Warning("No entities in graph".to_string())
280            }
281        };
282
283        let rate_limiter_status = self.rate_limiter.health_check();
284
285        HealthStatus {
286            overall: if matches!(graph_status, ComponentStatus::Healthy)
287                && matches!(rate_limiter_status, ComponentStatus::Healthy)
288            {
289                ComponentStatus::Healthy
290            } else {
291                ComponentStatus::Warning("Some components have issues".to_string())
292            },
293            components: indexmap::indexmap! {
294                "graph".to_string() => graph_status,
295                "rate_limiter".to_string() => rate_limiter_status,
296            },
297        }
298    }
299
300    /// Shuts down the async processor gracefully
301    ///
302    /// Allows current operations to complete before shutting down. In a full
303    /// implementation, this cancels pending tasks and cleans up resources.
304    ///
305    /// # Returns
306    /// Ok on successful shutdown, or an error if cleanup fails
307    pub async fn shutdown(&self) -> Result<(), GraphRAGError> {
308        tracing::info!("Shutting down async GraphRAG processor");
309
310        // In a full implementation, this would:
311        // - Cancel running tasks
312        // - Wait for current operations to complete
313        // - Clean up resources
314
315        tracing::info!("Async processor shutdown complete");
316        Ok(())
317    }
318}
319
320/// Status of individual system components
321#[derive(Debug, Clone)]
322pub enum ComponentStatus {
323    /// Component is functioning normally
324    Healthy,
325    /// Component is operational but has issues (includes warning message)
326    Warning(String),
327    /// Component has failed (includes error message)
328    Error(String),
329}
330
331/// Overall health status including all system components
332#[derive(Debug, Clone)]
333pub struct HealthStatus {
334    /// Aggregate health status across all components
335    pub overall: ComponentStatus,
336    /// Individual status for each named component
337    pub components: IndexMap<String, ComponentStatus>,
338}
339
340/// Simple task scheduler for managing async operations
341#[derive(Debug)]
342pub struct TaskScheduler {
343    /// Maximum number of tasks allowed to run concurrently
344    max_concurrent_tasks: usize,
345}
346
347impl TaskScheduler {
348    /// Creates a new task scheduler with specified concurrency limit
349    ///
350    /// # Parameters
351    /// - `max_concurrent_tasks`: Maximum number of concurrent tasks
352    pub fn new(max_concurrent_tasks: usize) -> Self {
353        Self {
354            max_concurrent_tasks,
355        }
356    }
357
358    /// Schedules and executes an async task
359    ///
360    /// In production, this would implement proper task queuing and scheduling.
361    /// Currently executes tasks immediately.
362    ///
363    /// # Parameters
364    /// - `task`: Future to execute
365    ///
366    /// # Returns
367    /// Result of the task execution
368    pub async fn schedule_task<F, T>(&self, task: F) -> Result<T, GraphRAGError>
369    where
370        F: std::future::Future<Output = Result<T, GraphRAGError>>,
371    {
372        // Basic implementation - in production this would use proper task scheduling
373        task.await
374    }
375
376    /// Returns the maximum number of concurrent tasks allowed
377    pub fn max_concurrent_tasks(&self) -> usize {
378        self.max_concurrent_tasks
379    }
380}
381
382/// Performance tracker for monitoring async operation timing
383#[derive(Debug, Default)]
384pub struct PerformanceTracker {
385    /// Total number of operations recorded
386    total_operations: std::sync::atomic::AtomicU64,
387    /// Cumulative duration of all operations
388    total_duration: std::sync::Mutex<Duration>,
389}
390
391impl PerformanceTracker {
392    /// Creates a new performance tracker
393    pub fn new() -> Self {
394        Self::default()
395    }
396
397    /// Records the duration of a completed operation
398    ///
399    /// # Parameters
400    /// - `duration`: Time taken for the operation
401    pub fn record_operation(&self, duration: Duration) {
402        self.total_operations.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
403
404        let mut total_duration = self.total_duration.lock().unwrap();
405        *total_duration += duration;
406    }
407
408    /// Calculates the average duration per operation
409    ///
410    /// # Returns
411    /// Average duration, or zero if no operations have been recorded
412    pub fn get_average_duration(&self) -> Duration {
413        let total_ops = self.total_operations.load(std::sync::atomic::Ordering::Relaxed);
414        if total_ops == 0 {
415            return Duration::from_secs(0);
416        }
417
418        let total_duration = *self.total_duration.lock().unwrap();
419        total_duration / total_ops as u32
420    }
421
422    /// Returns the total number of operations recorded
423    pub fn get_total_operations(&self) -> u64 {
424        self.total_operations.load(std::sync::atomic::Ordering::Relaxed)
425    }
426}