graphrag_core/async_processing/mod.rs
1//! Async processing utilities for GraphRAG operations
2//!
3//! This module provides async processing capabilities including:
4//! - Concurrent document processing
5//! - Rate limiting for API calls
6//! - Performance monitoring and metrics
7//! - Thread pool management
8//! - Task scheduling and coordination
9
10use indexmap::IndexMap;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::RwLock;
14
15use crate::core::{Document, GraphRAGError, KnowledgeGraph};
16
17pub mod concurrent_pipeline;
18pub mod monitoring;
19pub mod rate_limiting;
20
21pub use concurrent_pipeline::ConcurrentProcessor;
22pub use monitoring::ProcessingMetrics;
23pub use rate_limiting::RateLimiter;
24
25/// Result of processing a single document
26#[derive(Debug, Clone)]
27pub struct ProcessingResult {
28 /// Unique identifier of the processed document
29 pub document_id: crate::core::DocumentId,
30 /// Number of entities successfully extracted from the document
31 pub entities_extracted: usize,
32 /// Number of text chunks processed
33 pub chunks_processed: usize,
34 /// Total time taken to process the document
35 pub processing_time: Duration,
36 /// Whether the processing completed successfully
37 pub success: bool,
38}
39
40/// Configuration for async processing operations
41#[derive(Debug, Clone)]
42pub struct AsyncConfig {
43 /// Maximum number of concurrent LLM API calls allowed
44 pub max_concurrent_llm_calls: usize,
45 /// Maximum number of concurrent embedding API calls allowed
46 pub max_concurrent_embeddings: usize,
47 /// Maximum number of documents to process concurrently
48 pub max_concurrent_documents: usize,
49 /// Rate limit for LLM API calls (requests per second)
50 pub llm_rate_limit_per_second: f64,
51 /// Rate limit for embedding API calls (requests per second)
52 pub embedding_rate_limit_per_second: f64,
53}
54
55impl Default for AsyncConfig {
56 fn default() -> Self {
57 Self {
58 max_concurrent_llm_calls: 3,
59 max_concurrent_embeddings: 5,
60 max_concurrent_documents: 10,
61 llm_rate_limit_per_second: 2.0,
62 embedding_rate_limit_per_second: 10.0,
63 }
64 }
65}
66
67/// Core async GraphRAG processor with concurrency control and monitoring
68#[derive(Debug)]
69pub struct AsyncGraphRAGCore {
70 /// Shared knowledge graph for storing entities and relationships
71 graph: Arc<RwLock<KnowledgeGraph>>,
72 /// Rate limiter for throttling API calls
73 rate_limiter: Arc<RateLimiter>,
74 /// Concurrent processor for batch document processing
75 concurrent_processor: Arc<ConcurrentProcessor>,
76 /// Metrics collector for tracking processing statistics
77 metrics: Arc<ProcessingMetrics>,
78 /// Configuration settings for async operations
79 config: AsyncConfig,
80}
81
82impl AsyncGraphRAGCore {
83 /// Creates a new async GraphRAG core instance with configuration
84 ///
85 /// Initializes the knowledge graph, rate limiter, concurrent processor,
86 /// and metrics tracking system.
87 ///
88 /// # Parameters
89 /// - `graph`: Knowledge graph for entity storage
90 /// - `config`: Configuration for async processing behavior
91 ///
92 /// # Returns
93 /// Configured async GraphRAG instance, or an error if initialization fails
94 pub async fn new(graph: KnowledgeGraph, config: AsyncConfig) -> Result<Self, GraphRAGError> {
95 let rate_limiter = Arc::new(RateLimiter::new(&config));
96 let concurrent_processor = Arc::new(ConcurrentProcessor::new(config.max_concurrent_documents));
97 let metrics = Arc::new(ProcessingMetrics::new());
98
99 Ok(Self {
100 graph: Arc::new(RwLock::new(graph)),
101 rate_limiter,
102 concurrent_processor,
103 metrics,
104 config,
105 })
106 }
107
108 /// Processes multiple documents concurrently with rate limiting
109 ///
110 /// Distributes documents across concurrent workers while respecting
111 /// rate limits and collecting metrics.
112 ///
113 /// # Parameters
114 /// - `documents`: Collection of documents to process
115 ///
116 /// # Returns
117 /// Vector of processing results for all documents, or an error
118 pub async fn process_documents_async(
119 &self,
120 documents: Vec<Document>,
121 ) -> Result<Vec<ProcessingResult>, GraphRAGError> {
122 let start_time = Instant::now();
123 self.metrics.increment_batch_processing_started();
124
125 tracing::info!(document_count = documents.len(), "Processing documents concurrently");
126
127 let results = self
128 .concurrent_processor
129 .process_batch(
130 documents,
131 Arc::clone(&self.graph),
132 Arc::clone(&self.rate_limiter),
133 Arc::clone(&self.metrics),
134 )
135 .await?;
136
137 let duration = start_time.elapsed();
138 self.metrics.record_batch_processing_duration(duration);
139
140 tracing::info!(
141 duration_ms = duration.as_millis(),
142 successes = results.len(),
143 "Batch processing completed"
144 );
145
146 Ok(results)
147 }
148
149 /// Processes a single document asynchronously with rate limiting
150 ///
151 /// Applies entity extraction and updates the knowledge graph for one document.
152 /// Automatically handles rate limiting and metrics collection.
153 ///
154 /// # Parameters
155 /// - `document`: Document to process
156 ///
157 /// # Returns
158 /// Processing result containing extraction statistics, or an error
159 pub async fn process_single_document_async(
160 &self,
161 document: Document,
162 ) -> Result<ProcessingResult, GraphRAGError> {
163 let start_time = Instant::now();
164 self.metrics.increment_document_processing_started();
165
166 // Acquire rate limiting permits
167 let _llm_permit = self.rate_limiter.acquire_llm_permit().await?;
168
169 let result = {
170 let _graph = self.graph.read().await;
171 // For now, create a simple processing result
172 // In a full implementation, this would use proper entity extraction
173 ProcessingResult {
174 document_id: document.id.clone(),
175 entities_extracted: 0,
176 chunks_processed: document.chunks.len(),
177 processing_time: start_time.elapsed(),
178 success: true,
179 }
180 };
181
182 let duration = start_time.elapsed();
183
184 if result.success {
185 self.metrics.increment_document_processing_success();
186 self.metrics.record_document_processing_duration(duration);
187 } else {
188 self.metrics.increment_document_processing_error();
189 tracing::warn!(document_id = %result.document_id, "Document processing failed");
190 }
191
192 Ok(result)
193 }
194
195 /// Executes a query against the knowledge graph asynchronously
196 ///
197 /// Processes a user query by searching the knowledge graph and generating
198 /// a response using retrieved entities and relationships.
199 ///
200 /// # Parameters
201 /// - `query`: User's query string
202 ///
203 /// # Returns
204 /// Generated response string, or an error if processing fails
205 pub async fn query_async(
206 &self,
207 query: &str,
208 ) -> Result<String, GraphRAGError> {
209 let start_time = Instant::now();
210 self.metrics.increment_query_started();
211
212 // Acquire rate limiting permits
213 let _llm_permit = self.rate_limiter.acquire_llm_permit().await?;
214
215 // Basic implementation - in production this would use proper query processing
216 let result = {
217 let graph = self.graph.read().await;
218 let entity_count = graph.entities().count();
219
220 if entity_count == 0 {
221 Err(GraphRAGError::Unsupported {
222 operation: "query processing".to_string(),
223 reason: "No entities in knowledge graph".to_string(),
224 })
225 } else {
226 // Simple placeholder response
227 Ok(format!(
228 "Query processed: '{query}'. Found {entity_count} entities in graph. This is a basic implementation."
229 ))
230 }
231 };
232
233 let duration = start_time.elapsed();
234
235 match &result {
236 Ok(_) => {
237 self.metrics.increment_query_success();
238 self.metrics.record_query_duration(duration);
239 tracing::info!(duration_ms = duration.as_millis(), "Query completed successfully");
240 }
241 Err(e) => {
242 self.metrics.increment_query_error();
243 tracing::error!(error = %e, "Query processing error");
244 }
245 }
246
247 result
248 }
249
250 /// Retrieves current processing metrics
251 ///
252 /// # Returns
253 /// Reference to the metrics collector
254 pub fn get_metrics(&self) -> &ProcessingMetrics {
255 &self.metrics
256 }
257
258 /// Retrieves current configuration
259 ///
260 /// # Returns
261 /// Reference to the async processing configuration
262 pub fn get_config(&self) -> &AsyncConfig {
263 &self.config
264 }
265
266 /// Performs health check on all system components
267 ///
268 /// Checks the status of the knowledge graph and rate limiter to determine
269 /// overall system health.
270 ///
271 /// # Returns
272 /// Health status summary for all components
273 pub async fn health_check(&self) -> HealthStatus {
274 let graph_status = {
275 let graph = self.graph.read().await;
276 if graph.entities().count() > 0 {
277 ComponentStatus::Healthy
278 } else {
279 ComponentStatus::Warning("No entities in graph".to_string())
280 }
281 };
282
283 let rate_limiter_status = self.rate_limiter.health_check();
284
285 HealthStatus {
286 overall: if matches!(graph_status, ComponentStatus::Healthy)
287 && matches!(rate_limiter_status, ComponentStatus::Healthy)
288 {
289 ComponentStatus::Healthy
290 } else {
291 ComponentStatus::Warning("Some components have issues".to_string())
292 },
293 components: indexmap::indexmap! {
294 "graph".to_string() => graph_status,
295 "rate_limiter".to_string() => rate_limiter_status,
296 },
297 }
298 }
299
300 /// Shuts down the async processor gracefully
301 ///
302 /// Allows current operations to complete before shutting down. In a full
303 /// implementation, this cancels pending tasks and cleans up resources.
304 ///
305 /// # Returns
306 /// Ok on successful shutdown, or an error if cleanup fails
307 pub async fn shutdown(&self) -> Result<(), GraphRAGError> {
308 tracing::info!("Shutting down async GraphRAG processor");
309
310 // In a full implementation, this would:
311 // - Cancel running tasks
312 // - Wait for current operations to complete
313 // - Clean up resources
314
315 tracing::info!("Async processor shutdown complete");
316 Ok(())
317 }
318}
319
320/// Status of individual system components
321#[derive(Debug, Clone)]
322pub enum ComponentStatus {
323 /// Component is functioning normally
324 Healthy,
325 /// Component is operational but has issues (includes warning message)
326 Warning(String),
327 /// Component has failed (includes error message)
328 Error(String),
329}
330
331/// Overall health status including all system components
332#[derive(Debug, Clone)]
333pub struct HealthStatus {
334 /// Aggregate health status across all components
335 pub overall: ComponentStatus,
336 /// Individual status for each named component
337 pub components: IndexMap<String, ComponentStatus>,
338}
339
340/// Simple task scheduler for managing async operations
341#[derive(Debug)]
342pub struct TaskScheduler {
343 /// Maximum number of tasks allowed to run concurrently
344 max_concurrent_tasks: usize,
345}
346
347impl TaskScheduler {
348 /// Creates a new task scheduler with specified concurrency limit
349 ///
350 /// # Parameters
351 /// - `max_concurrent_tasks`: Maximum number of concurrent tasks
352 pub fn new(max_concurrent_tasks: usize) -> Self {
353 Self {
354 max_concurrent_tasks,
355 }
356 }
357
358 /// Schedules and executes an async task
359 ///
360 /// In production, this would implement proper task queuing and scheduling.
361 /// Currently executes tasks immediately.
362 ///
363 /// # Parameters
364 /// - `task`: Future to execute
365 ///
366 /// # Returns
367 /// Result of the task execution
368 pub async fn schedule_task<F, T>(&self, task: F) -> Result<T, GraphRAGError>
369 where
370 F: std::future::Future<Output = Result<T, GraphRAGError>>,
371 {
372 // Basic implementation - in production this would use proper task scheduling
373 task.await
374 }
375
376 /// Returns the maximum number of concurrent tasks allowed
377 pub fn max_concurrent_tasks(&self) -> usize {
378 self.max_concurrent_tasks
379 }
380}
381
382/// Performance tracker for monitoring async operation timing
383#[derive(Debug, Default)]
384pub struct PerformanceTracker {
385 /// Total number of operations recorded
386 total_operations: std::sync::atomic::AtomicU64,
387 /// Cumulative duration of all operations
388 total_duration: std::sync::Mutex<Duration>,
389}
390
391impl PerformanceTracker {
392 /// Creates a new performance tracker
393 pub fn new() -> Self {
394 Self::default()
395 }
396
397 /// Records the duration of a completed operation
398 ///
399 /// # Parameters
400 /// - `duration`: Time taken for the operation
401 pub fn record_operation(&self, duration: Duration) {
402 self.total_operations.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
403
404 let mut total_duration = self.total_duration.lock().unwrap();
405 *total_duration += duration;
406 }
407
408 /// Calculates the average duration per operation
409 ///
410 /// # Returns
411 /// Average duration, or zero if no operations have been recorded
412 pub fn get_average_duration(&self) -> Duration {
413 let total_ops = self.total_operations.load(std::sync::atomic::Ordering::Relaxed);
414 if total_ops == 0 {
415 return Duration::from_secs(0);
416 }
417
418 let total_duration = *self.total_duration.lock().unwrap();
419 total_duration / total_ops as u32
420 }
421
422 /// Returns the total number of operations recorded
423 pub fn get_total_operations(&self) -> u64 {
424 self.total_operations.load(std::sync::atomic::Ordering::Relaxed)
425 }
426}