Skip to main content

graphrag_core/async_processing/
mod.rs

1//! Async processing utilities for GraphRAG operations
2//!
3//! This module provides async processing capabilities including:
4//! - Concurrent document processing
5//! - Rate limiting for API calls
6//! - Performance monitoring and metrics
7//! - Thread pool management
8//! - Task scheduling and coordination
9
10use indexmap::IndexMap;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::RwLock;
14
15use crate::core::{Document, GraphRAGError, KnowledgeGraph};
16
17pub mod concurrent_pipeline;
18pub mod monitoring;
19pub mod rate_limiting;
20
21pub use concurrent_pipeline::ConcurrentProcessor;
22pub use monitoring::ProcessingMetrics;
23pub use rate_limiting::RateLimiter;
24
25/// Result of processing a single document
26#[derive(Debug, Clone)]
27pub struct ProcessingResult {
28    /// Unique identifier of the processed document
29    pub document_id: crate::core::DocumentId,
30    /// Number of entities successfully extracted from the document
31    pub entities_extracted: usize,
32    /// Number of text chunks processed
33    pub chunks_processed: usize,
34    /// Total time taken to process the document
35    pub processing_time: Duration,
36    /// Whether the processing completed successfully
37    pub success: bool,
38}
39
40/// Configuration for async processing operations
41#[derive(Debug, Clone)]
42pub struct AsyncConfig {
43    /// Maximum number of concurrent LLM API calls allowed
44    pub max_concurrent_llm_calls: usize,
45    /// Maximum number of concurrent embedding API calls allowed
46    pub max_concurrent_embeddings: usize,
47    /// Maximum number of documents to process concurrently
48    pub max_concurrent_documents: usize,
49    /// Rate limit for LLM API calls (requests per second)
50    pub llm_rate_limit_per_second: f64,
51    /// Rate limit for embedding API calls (requests per second)
52    pub embedding_rate_limit_per_second: f64,
53}
54
55impl Default for AsyncConfig {
56    fn default() -> Self {
57        Self {
58            max_concurrent_llm_calls: 3,
59            max_concurrent_embeddings: 5,
60            max_concurrent_documents: 10,
61            llm_rate_limit_per_second: 2.0,
62            embedding_rate_limit_per_second: 10.0,
63        }
64    }
65}
66
67/// Core async GraphRAG processor with concurrency control and monitoring
68#[derive(Debug)]
69pub struct AsyncGraphRAGCore {
70    /// Shared knowledge graph for storing entities and relationships
71    graph: Arc<RwLock<KnowledgeGraph>>,
72    /// Rate limiter for throttling API calls
73    rate_limiter: Arc<RateLimiter>,
74    /// Concurrent processor for batch document processing
75    concurrent_processor: Arc<ConcurrentProcessor>,
76    /// Metrics collector for tracking processing statistics
77    metrics: Arc<ProcessingMetrics>,
78    /// Configuration settings for async operations
79    config: AsyncConfig,
80}
81
82impl AsyncGraphRAGCore {
83    /// Creates a new async GraphRAG core instance with configuration
84    ///
85    /// Initializes the knowledge graph, rate limiter, concurrent processor,
86    /// and metrics tracking system.
87    ///
88    /// # Parameters
89    /// - `graph`: Knowledge graph for entity storage
90    /// - `config`: Configuration for async processing behavior
91    ///
92    /// # Returns
93    /// Configured async GraphRAG instance, or an error if initialization fails
94    pub async fn new(graph: KnowledgeGraph, config: AsyncConfig) -> Result<Self, GraphRAGError> {
95        let rate_limiter = Arc::new(RateLimiter::new(&config));
96        let concurrent_processor =
97            Arc::new(ConcurrentProcessor::new(config.max_concurrent_documents));
98        let metrics = Arc::new(ProcessingMetrics::new());
99
100        Ok(Self {
101            graph: Arc::new(RwLock::new(graph)),
102            rate_limiter,
103            concurrent_processor,
104            metrics,
105            config,
106        })
107    }
108
109    /// Processes multiple documents concurrently with rate limiting
110    ///
111    /// Distributes documents across concurrent workers while respecting
112    /// rate limits and collecting metrics.
113    ///
114    /// # Parameters
115    /// - `documents`: Collection of documents to process
116    ///
117    /// # Returns
118    /// Vector of processing results for all documents, or an error
119    pub async fn process_documents_async(
120        &self,
121        documents: Vec<Document>,
122    ) -> Result<Vec<ProcessingResult>, GraphRAGError> {
123        let start_time = Instant::now();
124        self.metrics.increment_batch_processing_started();
125
126        tracing::info!(
127            document_count = documents.len(),
128            "Processing documents concurrently"
129        );
130
131        let results = self
132            .concurrent_processor
133            .process_batch(
134                documents,
135                Arc::clone(&self.graph),
136                Arc::clone(&self.rate_limiter),
137                Arc::clone(&self.metrics),
138            )
139            .await?;
140
141        let duration = start_time.elapsed();
142        self.metrics.record_batch_processing_duration(duration);
143
144        tracing::info!(
145            duration_ms = duration.as_millis(),
146            successes = results.len(),
147            "Batch processing completed"
148        );
149
150        Ok(results)
151    }
152
153    /// Processes a single document asynchronously with rate limiting
154    ///
155    /// Applies entity extraction and updates the knowledge graph for one document.
156    /// Automatically handles rate limiting and metrics collection.
157    ///
158    /// # Parameters
159    /// - `document`: Document to process
160    ///
161    /// # Returns
162    /// Processing result containing extraction statistics, or an error
163    pub async fn process_single_document_async(
164        &self,
165        document: Document,
166    ) -> Result<ProcessingResult, GraphRAGError> {
167        let start_time = Instant::now();
168        self.metrics.increment_document_processing_started();
169
170        // Acquire rate limiting permits
171        let _llm_permit = self.rate_limiter.acquire_llm_permit().await?;
172
173        let result = {
174            let _graph = self.graph.read().await;
175            // For now, create a simple processing result
176            // In a full implementation, this would use proper entity extraction
177            ProcessingResult {
178                document_id: document.id.clone(),
179                entities_extracted: 0,
180                chunks_processed: document.chunks.len(),
181                processing_time: start_time.elapsed(),
182                success: true,
183            }
184        };
185
186        let duration = start_time.elapsed();
187
188        if result.success {
189            self.metrics.increment_document_processing_success();
190            self.metrics.record_document_processing_duration(duration);
191        } else {
192            self.metrics.increment_document_processing_error();
193            tracing::warn!(document_id = %result.document_id, "Document processing failed");
194        }
195
196        Ok(result)
197    }
198
199    /// Executes a query against the knowledge graph asynchronously
200    ///
201    /// Processes a user query by searching the knowledge graph and generating
202    /// a response using retrieved entities and relationships.
203    ///
204    /// # Parameters
205    /// - `query`: User's query string
206    ///
207    /// # Returns
208    /// Generated response string, or an error if processing fails
209    pub async fn query_async(&self, query: &str) -> Result<String, GraphRAGError> {
210        let start_time = Instant::now();
211        self.metrics.increment_query_started();
212
213        // Acquire rate limiting permits
214        let _llm_permit = self.rate_limiter.acquire_llm_permit().await?;
215
216        // Basic implementation - in production this would use proper query processing
217        let result = {
218            let graph = self.graph.read().await;
219            let entity_count = graph.entities().count();
220
221            if entity_count == 0 {
222                Err(GraphRAGError::Unsupported {
223                    operation: "query processing".to_string(),
224                    reason: "No entities in knowledge graph".to_string(),
225                })
226            } else {
227                // Simple placeholder response
228                Ok(format!(
229                    "Query processed: '{query}'. Found {entity_count} entities in graph. This is a basic implementation."
230                ))
231            }
232        };
233
234        let duration = start_time.elapsed();
235
236        match &result {
237            Ok(_) => {
238                self.metrics.increment_query_success();
239                self.metrics.record_query_duration(duration);
240                tracing::info!(
241                    duration_ms = duration.as_millis(),
242                    "Query completed successfully"
243                );
244            },
245            Err(e) => {
246                self.metrics.increment_query_error();
247                tracing::error!(error = %e, "Query processing error");
248            },
249        }
250
251        result
252    }
253
254    /// Retrieves current processing metrics
255    ///
256    /// # Returns
257    /// Reference to the metrics collector
258    pub fn get_metrics(&self) -> &ProcessingMetrics {
259        &self.metrics
260    }
261
262    /// Retrieves current configuration
263    ///
264    /// # Returns
265    /// Reference to the async processing configuration
266    pub fn get_config(&self) -> &AsyncConfig {
267        &self.config
268    }
269
270    /// Performs health check on all system components
271    ///
272    /// Checks the status of the knowledge graph and rate limiter to determine
273    /// overall system health.
274    ///
275    /// # Returns
276    /// Health status summary for all components
277    pub async fn health_check(&self) -> HealthStatus {
278        let graph_status = {
279            let graph = self.graph.read().await;
280            if graph.entities().count() > 0 {
281                ComponentStatus::Healthy
282            } else {
283                ComponentStatus::Warning("No entities in graph".to_string())
284            }
285        };
286
287        let rate_limiter_status = self.rate_limiter.health_check();
288
289        HealthStatus {
290            overall: if matches!(graph_status, ComponentStatus::Healthy)
291                && matches!(rate_limiter_status, ComponentStatus::Healthy)
292            {
293                ComponentStatus::Healthy
294            } else {
295                ComponentStatus::Warning("Some components have issues".to_string())
296            },
297            components: indexmap::indexmap! {
298                "graph".to_string() => graph_status,
299                "rate_limiter".to_string() => rate_limiter_status,
300            },
301        }
302    }
303
304    /// Shuts down the async processor gracefully
305    ///
306    /// Allows current operations to complete before shutting down. In a full
307    /// implementation, this cancels pending tasks and cleans up resources.
308    ///
309    /// # Returns
310    /// Ok on successful shutdown, or an error if cleanup fails
311    pub async fn shutdown(&self) -> Result<(), GraphRAGError> {
312        tracing::info!("Shutting down async GraphRAG processor");
313
314        // In a full implementation, this would:
315        // - Cancel running tasks
316        // - Wait for current operations to complete
317        // - Clean up resources
318
319        tracing::info!("Async processor shutdown complete");
320        Ok(())
321    }
322}
323
324/// Status of individual system components
325#[derive(Debug, Clone)]
326pub enum ComponentStatus {
327    /// Component is functioning normally
328    Healthy,
329    /// Component is operational but has issues (includes warning message)
330    Warning(String),
331    /// Component has failed (includes error message)
332    Error(String),
333}
334
335/// Overall health status including all system components
336#[derive(Debug, Clone)]
337pub struct HealthStatus {
338    /// Aggregate health status across all components
339    pub overall: ComponentStatus,
340    /// Individual status for each named component
341    pub components: IndexMap<String, ComponentStatus>,
342}
343
344/// Simple task scheduler for managing async operations
345#[derive(Debug)]
346pub struct TaskScheduler {
347    /// Maximum number of tasks allowed to run concurrently
348    max_concurrent_tasks: usize,
349}
350
351impl TaskScheduler {
352    /// Creates a new task scheduler with specified concurrency limit
353    ///
354    /// # Parameters
355    /// - `max_concurrent_tasks`: Maximum number of concurrent tasks
356    pub fn new(max_concurrent_tasks: usize) -> Self {
357        Self {
358            max_concurrent_tasks,
359        }
360    }
361
362    /// Schedules and executes an async task
363    ///
364    /// In production, this would implement proper task queuing and scheduling.
365    /// Currently executes tasks immediately.
366    ///
367    /// # Parameters
368    /// - `task`: Future to execute
369    ///
370    /// # Returns
371    /// Result of the task execution
372    pub async fn schedule_task<F, T>(&self, task: F) -> Result<T, GraphRAGError>
373    where
374        F: std::future::Future<Output = Result<T, GraphRAGError>>,
375    {
376        // Basic implementation - in production this would use proper task scheduling
377        task.await
378    }
379
380    /// Returns the maximum number of concurrent tasks allowed
381    pub fn max_concurrent_tasks(&self) -> usize {
382        self.max_concurrent_tasks
383    }
384}
385
386/// Performance tracker for monitoring async operation timing
387#[derive(Debug, Default)]
388pub struct PerformanceTracker {
389    /// Total number of operations recorded
390    total_operations: std::sync::atomic::AtomicU64,
391    /// Cumulative duration of all operations
392    total_duration: std::sync::Mutex<Duration>,
393}
394
395impl PerformanceTracker {
396    /// Creates a new performance tracker
397    pub fn new() -> Self {
398        Self::default()
399    }
400
401    /// Records the duration of a completed operation
402    ///
403    /// # Parameters
404    /// - `duration`: Time taken for the operation
405    pub fn record_operation(&self, duration: Duration) {
406        self.total_operations
407            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
408
409        let mut total_duration = self.total_duration.lock().unwrap();
410        *total_duration += duration;
411    }
412
413    /// Calculates the average duration per operation
414    ///
415    /// # Returns
416    /// Average duration, or zero if no operations have been recorded
417    pub fn get_average_duration(&self) -> Duration {
418        let total_ops = self
419            .total_operations
420            .load(std::sync::atomic::Ordering::Relaxed);
421        if total_ops == 0 {
422            return Duration::from_secs(0);
423        }
424
425        let total_duration = *self.total_duration.lock().unwrap();
426        total_duration / total_ops as u32
427    }
428
429    /// Returns the total number of operations recorded
430    pub fn get_total_operations(&self) -> u64 {
431        self.total_operations
432            .load(std::sync::atomic::Ordering::Relaxed)
433    }
434}