graphrag_core/async_processing/mod.rs
1//! Async processing utilities for GraphRAG operations
2//!
3//! This module provides async processing capabilities including:
4//! - Concurrent document processing
5//! - Rate limiting for API calls
6//! - Performance monitoring and metrics
7//! - Thread pool management
8//! - Task scheduling and coordination
9
10use indexmap::IndexMap;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::RwLock;
14
15use crate::core::{Document, GraphRAGError, KnowledgeGraph};
16
17pub mod concurrent_pipeline;
18pub mod monitoring;
19pub mod rate_limiting;
20
21pub use concurrent_pipeline::ConcurrentProcessor;
22pub use monitoring::ProcessingMetrics;
23pub use rate_limiting::RateLimiter;
24
25/// Result of processing a single document
26#[derive(Debug, Clone)]
27pub struct ProcessingResult {
28 /// Unique identifier of the processed document
29 pub document_id: crate::core::DocumentId,
30 /// Number of entities successfully extracted from the document
31 pub entities_extracted: usize,
32 /// Number of text chunks processed
33 pub chunks_processed: usize,
34 /// Total time taken to process the document
35 pub processing_time: Duration,
36 /// Whether the processing completed successfully
37 pub success: bool,
38}
39
40/// Configuration for async processing operations
41#[derive(Debug, Clone)]
42pub struct AsyncConfig {
43 /// Maximum number of concurrent LLM API calls allowed
44 pub max_concurrent_llm_calls: usize,
45 /// Maximum number of concurrent embedding API calls allowed
46 pub max_concurrent_embeddings: usize,
47 /// Maximum number of documents to process concurrently
48 pub max_concurrent_documents: usize,
49 /// Rate limit for LLM API calls (requests per second)
50 pub llm_rate_limit_per_second: f64,
51 /// Rate limit for embedding API calls (requests per second)
52 pub embedding_rate_limit_per_second: f64,
53}
54
55impl Default for AsyncConfig {
56 fn default() -> Self {
57 Self {
58 max_concurrent_llm_calls: 3,
59 max_concurrent_embeddings: 5,
60 max_concurrent_documents: 10,
61 llm_rate_limit_per_second: 2.0,
62 embedding_rate_limit_per_second: 10.0,
63 }
64 }
65}
66
67/// Core async GraphRAG processor with concurrency control and monitoring
68#[derive(Debug)]
69pub struct AsyncGraphRAGCore {
70 /// Shared knowledge graph for storing entities and relationships
71 graph: Arc<RwLock<KnowledgeGraph>>,
72 /// Rate limiter for throttling API calls
73 rate_limiter: Arc<RateLimiter>,
74 /// Concurrent processor for batch document processing
75 concurrent_processor: Arc<ConcurrentProcessor>,
76 /// Metrics collector for tracking processing statistics
77 metrics: Arc<ProcessingMetrics>,
78 /// Configuration settings for async operations
79 config: AsyncConfig,
80}
81
82impl AsyncGraphRAGCore {
83 /// Creates a new async GraphRAG core instance with configuration
84 ///
85 /// Initializes the knowledge graph, rate limiter, concurrent processor,
86 /// and metrics tracking system.
87 ///
88 /// # Parameters
89 /// - `graph`: Knowledge graph for entity storage
90 /// - `config`: Configuration for async processing behavior
91 ///
92 /// # Returns
93 /// Configured async GraphRAG instance, or an error if initialization fails
94 pub async fn new(graph: KnowledgeGraph, config: AsyncConfig) -> Result<Self, GraphRAGError> {
95 let rate_limiter = Arc::new(RateLimiter::new(&config));
96 let concurrent_processor =
97 Arc::new(ConcurrentProcessor::new(config.max_concurrent_documents));
98 let metrics = Arc::new(ProcessingMetrics::new());
99
100 Ok(Self {
101 graph: Arc::new(RwLock::new(graph)),
102 rate_limiter,
103 concurrent_processor,
104 metrics,
105 config,
106 })
107 }
108
109 /// Processes multiple documents concurrently with rate limiting
110 ///
111 /// Distributes documents across concurrent workers while respecting
112 /// rate limits and collecting metrics.
113 ///
114 /// # Parameters
115 /// - `documents`: Collection of documents to process
116 ///
117 /// # Returns
118 /// Vector of processing results for all documents, or an error
119 pub async fn process_documents_async(
120 &self,
121 documents: Vec<Document>,
122 ) -> Result<Vec<ProcessingResult>, GraphRAGError> {
123 let start_time = Instant::now();
124 self.metrics.increment_batch_processing_started();
125
126 tracing::info!(
127 document_count = documents.len(),
128 "Processing documents concurrently"
129 );
130
131 let results = self
132 .concurrent_processor
133 .process_batch(
134 documents,
135 Arc::clone(&self.graph),
136 Arc::clone(&self.rate_limiter),
137 Arc::clone(&self.metrics),
138 )
139 .await?;
140
141 let duration = start_time.elapsed();
142 self.metrics.record_batch_processing_duration(duration);
143
144 tracing::info!(
145 duration_ms = duration.as_millis(),
146 successes = results.len(),
147 "Batch processing completed"
148 );
149
150 Ok(results)
151 }
152
153 /// Processes a single document asynchronously with rate limiting
154 ///
155 /// Applies entity extraction and updates the knowledge graph for one document.
156 /// Automatically handles rate limiting and metrics collection.
157 ///
158 /// # Parameters
159 /// - `document`: Document to process
160 ///
161 /// # Returns
162 /// Processing result containing extraction statistics, or an error
163 pub async fn process_single_document_async(
164 &self,
165 document: Document,
166 ) -> Result<ProcessingResult, GraphRAGError> {
167 let start_time = Instant::now();
168 self.metrics.increment_document_processing_started();
169
170 // Acquire rate limiting permits
171 let _llm_permit = self.rate_limiter.acquire_llm_permit().await?;
172
173 let result = {
174 let _graph = self.graph.read().await;
175 // For now, create a simple processing result
176 // In a full implementation, this would use proper entity extraction
177 ProcessingResult {
178 document_id: document.id.clone(),
179 entities_extracted: 0,
180 chunks_processed: document.chunks.len(),
181 processing_time: start_time.elapsed(),
182 success: true,
183 }
184 };
185
186 let duration = start_time.elapsed();
187
188 if result.success {
189 self.metrics.increment_document_processing_success();
190 self.metrics.record_document_processing_duration(duration);
191 } else {
192 self.metrics.increment_document_processing_error();
193 tracing::warn!(document_id = %result.document_id, "Document processing failed");
194 }
195
196 Ok(result)
197 }
198
199 /// Executes a query against the knowledge graph asynchronously
200 ///
201 /// Processes a user query by searching the knowledge graph and generating
202 /// a response using retrieved entities and relationships.
203 ///
204 /// # Parameters
205 /// - `query`: User's query string
206 ///
207 /// # Returns
208 /// Generated response string, or an error if processing fails
209 pub async fn query_async(&self, query: &str) -> Result<String, GraphRAGError> {
210 let start_time = Instant::now();
211 self.metrics.increment_query_started();
212
213 // Acquire rate limiting permits
214 let _llm_permit = self.rate_limiter.acquire_llm_permit().await?;
215
216 // Basic implementation - in production this would use proper query processing
217 let result = {
218 let graph = self.graph.read().await;
219 let entity_count = graph.entities().count();
220
221 if entity_count == 0 {
222 Err(GraphRAGError::Unsupported {
223 operation: "query processing".to_string(),
224 reason: "No entities in knowledge graph".to_string(),
225 })
226 } else {
227 // Simple placeholder response
228 Ok(format!(
229 "Query processed: '{query}'. Found {entity_count} entities in graph. This is a basic implementation."
230 ))
231 }
232 };
233
234 let duration = start_time.elapsed();
235
236 match &result {
237 Ok(_) => {
238 self.metrics.increment_query_success();
239 self.metrics.record_query_duration(duration);
240 tracing::info!(
241 duration_ms = duration.as_millis(),
242 "Query completed successfully"
243 );
244 },
245 Err(e) => {
246 self.metrics.increment_query_error();
247 tracing::error!(error = %e, "Query processing error");
248 },
249 }
250
251 result
252 }
253
254 /// Retrieves current processing metrics
255 ///
256 /// # Returns
257 /// Reference to the metrics collector
258 pub fn get_metrics(&self) -> &ProcessingMetrics {
259 &self.metrics
260 }
261
262 /// Retrieves current configuration
263 ///
264 /// # Returns
265 /// Reference to the async processing configuration
266 pub fn get_config(&self) -> &AsyncConfig {
267 &self.config
268 }
269
270 /// Performs health check on all system components
271 ///
272 /// Checks the status of the knowledge graph and rate limiter to determine
273 /// overall system health.
274 ///
275 /// # Returns
276 /// Health status summary for all components
277 pub async fn health_check(&self) -> HealthStatus {
278 let graph_status = {
279 let graph = self.graph.read().await;
280 if graph.entities().count() > 0 {
281 ComponentStatus::Healthy
282 } else {
283 ComponentStatus::Warning("No entities in graph".to_string())
284 }
285 };
286
287 let rate_limiter_status = self.rate_limiter.health_check();
288
289 HealthStatus {
290 overall: if matches!(graph_status, ComponentStatus::Healthy)
291 && matches!(rate_limiter_status, ComponentStatus::Healthy)
292 {
293 ComponentStatus::Healthy
294 } else {
295 ComponentStatus::Warning("Some components have issues".to_string())
296 },
297 components: indexmap::indexmap! {
298 "graph".to_string() => graph_status,
299 "rate_limiter".to_string() => rate_limiter_status,
300 },
301 }
302 }
303
304 /// Shuts down the async processor gracefully
305 ///
306 /// Allows current operations to complete before shutting down. In a full
307 /// implementation, this cancels pending tasks and cleans up resources.
308 ///
309 /// # Returns
310 /// Ok on successful shutdown, or an error if cleanup fails
311 pub async fn shutdown(&self) -> Result<(), GraphRAGError> {
312 tracing::info!("Shutting down async GraphRAG processor");
313
314 // In a full implementation, this would:
315 // - Cancel running tasks
316 // - Wait for current operations to complete
317 // - Clean up resources
318
319 tracing::info!("Async processor shutdown complete");
320 Ok(())
321 }
322}
323
324/// Status of individual system components
325#[derive(Debug, Clone)]
326pub enum ComponentStatus {
327 /// Component is functioning normally
328 Healthy,
329 /// Component is operational but has issues (includes warning message)
330 Warning(String),
331 /// Component has failed (includes error message)
332 Error(String),
333}
334
335/// Overall health status including all system components
336#[derive(Debug, Clone)]
337pub struct HealthStatus {
338 /// Aggregate health status across all components
339 pub overall: ComponentStatus,
340 /// Individual status for each named component
341 pub components: IndexMap<String, ComponentStatus>,
342}
343
344/// Simple task scheduler for managing async operations
345#[derive(Debug)]
346pub struct TaskScheduler {
347 /// Maximum number of tasks allowed to run concurrently
348 max_concurrent_tasks: usize,
349}
350
351impl TaskScheduler {
352 /// Creates a new task scheduler with specified concurrency limit
353 ///
354 /// # Parameters
355 /// - `max_concurrent_tasks`: Maximum number of concurrent tasks
356 pub fn new(max_concurrent_tasks: usize) -> Self {
357 Self {
358 max_concurrent_tasks,
359 }
360 }
361
362 /// Schedules and executes an async task
363 ///
364 /// In production, this would implement proper task queuing and scheduling.
365 /// Currently executes tasks immediately.
366 ///
367 /// # Parameters
368 /// - `task`: Future to execute
369 ///
370 /// # Returns
371 /// Result of the task execution
372 pub async fn schedule_task<F, T>(&self, task: F) -> Result<T, GraphRAGError>
373 where
374 F: std::future::Future<Output = Result<T, GraphRAGError>>,
375 {
376 // Basic implementation - in production this would use proper task scheduling
377 task.await
378 }
379
380 /// Returns the maximum number of concurrent tasks allowed
381 pub fn max_concurrent_tasks(&self) -> usize {
382 self.max_concurrent_tasks
383 }
384}
385
386/// Performance tracker for monitoring async operation timing
387#[derive(Debug, Default)]
388pub struct PerformanceTracker {
389 /// Total number of operations recorded
390 total_operations: std::sync::atomic::AtomicU64,
391 /// Cumulative duration of all operations
392 total_duration: std::sync::Mutex<Duration>,
393}
394
395impl PerformanceTracker {
396 /// Creates a new performance tracker
397 pub fn new() -> Self {
398 Self::default()
399 }
400
401 /// Records the duration of a completed operation
402 ///
403 /// # Parameters
404 /// - `duration`: Time taken for the operation
405 pub fn record_operation(&self, duration: Duration) {
406 self.total_operations
407 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
408
409 let mut total_duration = self.total_duration.lock().unwrap();
410 *total_duration += duration;
411 }
412
413 /// Calculates the average duration per operation
414 ///
415 /// # Returns
416 /// Average duration, or zero if no operations have been recorded
417 pub fn get_average_duration(&self) -> Duration {
418 let total_ops = self
419 .total_operations
420 .load(std::sync::atomic::Ordering::Relaxed);
421 if total_ops == 0 {
422 return Duration::from_secs(0);
423 }
424
425 let total_duration = *self.total_duration.lock().unwrap();
426 total_duration / total_ops as u32
427 }
428
429 /// Returns the total number of operations recorded
430 pub fn get_total_operations(&self) -> u64 {
431 self.total_operations
432 .load(std::sync::atomic::Ordering::Relaxed)
433 }
434}