graphrag_core/async_processing/concurrent_pipeline.rs
1//! Concurrent document processing pipeline for parallel GraphRAG operations.
2//!
3//! This module provides the [`ConcurrentProcessor`] for processing multiple documents
4//! in parallel while respecting concurrency limits and coordinating with rate limiters
5//! and metrics tracking.
6//!
7//! # Main Types
8//!
9//! - [`ConcurrentProcessor`]: Manages concurrent document processing with configurable
10//! parallelism limits
11//!
12//! # Features
13//!
14//! - Concurrent processing of document batches with configurable limits
15//! - Automatic chunking to respect concurrency constraints
16//! - Integration with rate limiting to prevent API throttling
17//! - Comprehensive metrics tracking for all operations
18//! - Error isolation: failures in one document don't affect others
19//! - Task spawning with tokio for true parallel execution
20//!
21//! # Basic Usage
22//!
23//! ```rust,ignore
24//! use graphrag_core::async_processing::ConcurrentProcessor;
25//! use std::sync::Arc;
26//!
27//! let processor = ConcurrentProcessor::new(10); // Max 10 concurrent documents
28//!
29//! let results = processor.process_batch(
30//! documents,
31//! graph,
32//! rate_limiter,
33//! metrics
34//! ).await?;
35//!
36//! for result in results {
37//! println!("Processed document {} in {:?}",
38//! result.document_id,
39//! result.processing_time
40//! );
41//! }
42//! ```
43
44use futures::future::join_all;
45use std::sync::Arc;
46use std::time::Instant;
47use tokio::sync::RwLock;
48
49use super::ProcessingResult;
50use super::{ProcessingMetrics, RateLimiter};
51use crate::core::{Document, GraphRAGError, KnowledgeGraph};
52
53/// Concurrent document processor for parallel GraphRAG operations
54///
55/// Manages concurrent processing of multiple documents while respecting
56/// concurrency limits and coordinating with rate limiters and metrics tracking.
57#[derive(Debug)]
58pub struct ConcurrentProcessor {
59 /// Maximum number of documents to process concurrently
60 max_concurrent_documents: usize,
61}
62
63impl ConcurrentProcessor {
64 /// Creates a new concurrent processor with specified concurrency limit
65 ///
66 /// # Parameters
67 /// - `max_concurrent_documents`: Maximum number of documents to process in parallel
68 pub fn new(max_concurrent_documents: usize) -> Self {
69 Self {
70 max_concurrent_documents,
71 }
72 }
73
74 /// Processes a batch of documents concurrently with rate limiting and metrics tracking
75 ///
76 /// Documents are processed in chunks according to the concurrency limit. Each chunk
77 /// is fully processed before moving to the next, with a small delay between chunks
78 /// to prevent system overload.
79 ///
80 /// # Parameters
81 /// - `documents`: Collection of documents to process
82 /// - `graph`: Shared knowledge graph for storing extracted entities
83 /// - `rate_limiter`: Rate limiter for API call throttling
84 /// - `metrics`: Metrics collector for tracking processing statistics
85 ///
86 /// # Returns
87 /// Vector of processing results for successfully processed documents, or an error
88 pub async fn process_batch(
89 &self,
90 documents: Vec<Document>,
91 graph: Arc<RwLock<KnowledgeGraph>>,
92 rate_limiter: Arc<RateLimiter>,
93 metrics: Arc<ProcessingMetrics>,
94 ) -> Result<Vec<ProcessingResult>, GraphRAGError> {
95 if documents.is_empty() {
96 return Ok(Vec::new());
97 }
98
99 tracing::info!(
100 document_count = documents.len(),
101 max_concurrency = self.max_concurrent_documents,
102 "Processing documents batch"
103 );
104
105 // Process documents in chunks to respect concurrency limits
106 let chunk_size = self.max_concurrent_documents;
107 let mut all_results = Vec::new();
108 let mut total_errors = 0;
109
110 for (chunk_idx, chunk) in documents.chunks(chunk_size).enumerate() {
111 tracing::debug!(
112 chunk_number = chunk_idx + 1,
113 chunk_size = chunk.len(),
114 "Processing chunk"
115 );
116
117 let chunk_start = Instant::now();
118
119 // Create tasks for this chunk
120 let tasks: Vec<_> = chunk
121 .iter()
122 .cloned()
123 .map(|document| {
124 let graph = Arc::clone(&graph);
125 let rate_limiter = Arc::clone(&rate_limiter);
126 let metrics = Arc::clone(&metrics);
127 let doc_id = document.id.clone();
128
129 tokio::spawn(async move {
130 let doc_start = Instant::now();
131
132 // Acquire rate limiting permit
133 let _permit = match rate_limiter.acquire_llm_permit().await {
134 Ok(permit) => permit,
135 Err(e) => {
136 metrics.increment_rate_limit_errors();
137 return Err(e);
138 }
139 };
140
141 // Process the document
142 let result =
143 Self::process_single_document_internal(&graph, document, &metrics)
144 .await;
145
146 let duration = doc_start.elapsed();
147
148 match &result {
149 Ok(_) => {
150 tracing::debug!(document_id = %doc_id, duration_ms = duration.as_millis(), "Document completed");
151 metrics.record_document_processing_duration(duration);
152 }
153 Err(e) => {
154 tracing::warn!(document_id = %doc_id, duration_ms = duration.as_millis(), error = %e, "Document failed");
155 metrics.increment_document_processing_error();
156 }
157 }
158
159 result
160 })
161 })
162 .collect();
163
164 // Wait for all tasks in this chunk to complete
165 let chunk_results = join_all(tasks).await;
166
167 // Collect results and handle errors
168 for (task_idx, task_result) in chunk_results.into_iter().enumerate() {
169 match task_result {
170 Ok(Ok(processing_result)) => {
171 all_results.push(processing_result);
172 metrics.increment_document_processing_success();
173 }
174 Ok(Err(processing_error)) => {
175 total_errors += 1;
176 tracing::error!(
177 chunk_number = chunk_idx + 1,
178 task_number = task_idx + 1,
179 error = %processing_error,
180 "Processing error"
181 );
182 }
183 Err(join_error) => {
184 total_errors += 1;
185 tracing::error!(
186 chunk_number = chunk_idx + 1,
187 task_number = task_idx + 1,
188 error = %join_error,
189 "Task join error"
190 );
191 }
192 }
193 }
194
195 let chunk_duration = chunk_start.elapsed();
196 tracing::debug!(
197 chunk_number = chunk_idx + 1,
198 duration_ms = chunk_duration.as_millis(),
199 successes = chunk.len() - total_errors.min(chunk.len()),
200 errors = total_errors.min(chunk.len()),
201 "Chunk completed"
202 );
203
204 // Small delay between chunks to prevent overwhelming the system
205 if chunk_idx + 1 < documents.chunks(chunk_size).len() {
206 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
207 }
208 }
209
210 if total_errors > 0 {
211 tracing::warn!(
212 total_errors = total_errors,
213 total_documents = documents.len(),
214 "Batch processing completed with errors"
215 );
216 }
217
218 Ok(all_results)
219 }
220
221 /// Processes a single document internally with graph access
222 ///
223 /// This is the core document processing logic executed within concurrent tasks.
224 /// Currently implements a basic placeholder that will be replaced with full
225 /// entity extraction in production.
226 ///
227 /// # Parameters
228 /// - `graph`: Shared knowledge graph for entity storage
229 /// - `document`: Document to process
230 /// - `_metrics`: Metrics collector (currently unused in implementation)
231 ///
232 /// # Returns
233 /// Processing result containing extraction statistics and timing
234 async fn process_single_document_internal(
235 graph: &Arc<RwLock<KnowledgeGraph>>,
236 document: Document,
237 _metrics: &ProcessingMetrics,
238 ) -> Result<ProcessingResult, GraphRAGError> {
239 let start_time = Instant::now();
240
241 // For now, create a simple processing result
242 // In a full async implementation, this would use async entity extraction
243 let result = {
244 let _graph_read = graph.read().await;
245 ProcessingResult {
246 document_id: document.id.clone(),
247 entities_extracted: 0, // Would be actual count from extraction
248 chunks_processed: document.chunks.len(),
249 processing_time: start_time.elapsed(),
250 success: true,
251 }
252 };
253
254 Ok(result)
255 }
256}