Skip to main content

graphrag_core/async_processing/
concurrent_pipeline.rs

1//! Concurrent document processing pipeline for parallel GraphRAG operations.
2//!
3//! This module provides the [`ConcurrentProcessor`] for processing multiple documents
4//! in parallel while respecting concurrency limits and coordinating with rate limiters
5//! and metrics tracking.
6//!
7//! # Main Types
8//!
9//! - [`ConcurrentProcessor`]: Manages concurrent document processing with configurable
10//!   parallelism limits
11//!
12//! # Features
13//!
14//! - Concurrent processing of document batches with configurable limits
15//! - Automatic chunking to respect concurrency constraints
16//! - Integration with rate limiting to prevent API throttling
17//! - Comprehensive metrics tracking for all operations
18//! - Error isolation: failures in one document don't affect others
19//! - Task spawning with tokio for true parallel execution
20//!
21//! # Basic Usage
22//!
23//! ```rust,ignore
24//! use graphrag_core::async_processing::ConcurrentProcessor;
25//! use std::sync::Arc;
26//!
27//! let processor = ConcurrentProcessor::new(10); // Max 10 concurrent documents
28//!
29//! let results = processor.process_batch(
30//!     documents,
31//!     graph,
32//!     rate_limiter,
33//!     metrics
34//! ).await?;
35//!
36//! for result in results {
37//!     println!("Processed document {} in {:?}",
38//!         result.document_id,
39//!         result.processing_time
40//!     );
41//! }
42//! ```
43
44use futures::future::join_all;
45use std::sync::Arc;
46use std::time::Instant;
47use tokio::sync::RwLock;
48
49use super::ProcessingResult;
50use super::{ProcessingMetrics, RateLimiter};
51use crate::core::{Document, GraphRAGError, KnowledgeGraph};
52
53/// Concurrent document processor for parallel GraphRAG operations
54///
55/// Manages concurrent processing of multiple documents while respecting
56/// concurrency limits and coordinating with rate limiters and metrics tracking.
57#[derive(Debug)]
58pub struct ConcurrentProcessor {
59    /// Maximum number of documents to process concurrently
60    max_concurrent_documents: usize,
61}
62
63impl ConcurrentProcessor {
64    /// Creates a new concurrent processor with specified concurrency limit
65    ///
66    /// # Parameters
67    /// - `max_concurrent_documents`: Maximum number of documents to process in parallel
68    pub fn new(max_concurrent_documents: usize) -> Self {
69        Self {
70            max_concurrent_documents,
71        }
72    }
73
74    /// Processes a batch of documents concurrently with rate limiting and metrics tracking
75    ///
76    /// Documents are processed in chunks according to the concurrency limit. Each chunk
77    /// is fully processed before moving to the next, with a small delay between chunks
78    /// to prevent system overload.
79    ///
80    /// # Parameters
81    /// - `documents`: Collection of documents to process
82    /// - `graph`: Shared knowledge graph for storing extracted entities
83    /// - `rate_limiter`: Rate limiter for API call throttling
84    /// - `metrics`: Metrics collector for tracking processing statistics
85    ///
86    /// # Returns
87    /// Vector of processing results for successfully processed documents, or an error
88    pub async fn process_batch(
89        &self,
90        documents: Vec<Document>,
91        graph: Arc<RwLock<KnowledgeGraph>>,
92        rate_limiter: Arc<RateLimiter>,
93        metrics: Arc<ProcessingMetrics>,
94    ) -> Result<Vec<ProcessingResult>, GraphRAGError> {
95        if documents.is_empty() {
96            return Ok(Vec::new());
97        }
98
99        tracing::info!(
100            document_count = documents.len(),
101            max_concurrency = self.max_concurrent_documents,
102            "Processing documents batch"
103        );
104
105        // Process documents in chunks to respect concurrency limits
106        let chunk_size = self.max_concurrent_documents;
107        let mut all_results = Vec::new();
108        let mut total_errors = 0;
109
110        for (chunk_idx, chunk) in documents.chunks(chunk_size).enumerate() {
111            tracing::debug!(
112                chunk_number = chunk_idx + 1,
113                chunk_size = chunk.len(),
114                "Processing chunk"
115            );
116
117            let chunk_start = Instant::now();
118
119            // Create tasks for this chunk
120            let tasks: Vec<_> = chunk
121                .iter()
122                .cloned()
123                .map(|document| {
124                    let graph = Arc::clone(&graph);
125                    let rate_limiter = Arc::clone(&rate_limiter);
126                    let metrics = Arc::clone(&metrics);
127                    let doc_id = document.id.clone();
128
129                    tokio::spawn(async move {
130                        let doc_start = Instant::now();
131
132                        // Acquire rate limiting permit
133                        let _permit = match rate_limiter.acquire_llm_permit().await {
134                            Ok(permit) => permit,
135                            Err(e) => {
136                                metrics.increment_rate_limit_errors();
137                                return Err(e);
138                            }
139                        };
140
141                        // Process the document
142                        let result =
143                            Self::process_single_document_internal(&graph, document, &metrics)
144                                .await;
145
146                        let duration = doc_start.elapsed();
147
148                        match &result {
149                            Ok(_) => {
150                                tracing::debug!(document_id = %doc_id, duration_ms = duration.as_millis(), "Document completed");
151                                metrics.record_document_processing_duration(duration);
152                            }
153                            Err(e) => {
154                                tracing::warn!(document_id = %doc_id, duration_ms = duration.as_millis(), error = %e, "Document failed");
155                                metrics.increment_document_processing_error();
156                            }
157                        }
158
159                        result
160                    })
161                })
162                .collect();
163
164            // Wait for all tasks in this chunk to complete
165            let chunk_results = join_all(tasks).await;
166
167            // Collect results and handle errors
168            for (task_idx, task_result) in chunk_results.into_iter().enumerate() {
169                match task_result {
170                    Ok(Ok(processing_result)) => {
171                        all_results.push(processing_result);
172                        metrics.increment_document_processing_success();
173                    }
174                    Ok(Err(processing_error)) => {
175                        total_errors += 1;
176                        tracing::error!(
177                            chunk_number = chunk_idx + 1,
178                            task_number = task_idx + 1,
179                            error = %processing_error,
180                            "Processing error"
181                        );
182                    }
183                    Err(join_error) => {
184                        total_errors += 1;
185                        tracing::error!(
186                            chunk_number = chunk_idx + 1,
187                            task_number = task_idx + 1,
188                            error = %join_error,
189                            "Task join error"
190                        );
191                    }
192                }
193            }
194
195            let chunk_duration = chunk_start.elapsed();
196            tracing::debug!(
197                chunk_number = chunk_idx + 1,
198                duration_ms = chunk_duration.as_millis(),
199                successes = chunk.len() - total_errors.min(chunk.len()),
200                errors = total_errors.min(chunk.len()),
201                "Chunk completed"
202            );
203
204            // Small delay between chunks to prevent overwhelming the system
205            if chunk_idx + 1 < documents.chunks(chunk_size).len() {
206                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
207            }
208        }
209
210        if total_errors > 0 {
211            tracing::warn!(
212                total_errors = total_errors,
213                total_documents = documents.len(),
214                "Batch processing completed with errors"
215            );
216        }
217
218        Ok(all_results)
219    }
220
221    /// Processes a single document internally with graph access
222    ///
223    /// This is the core document processing logic executed within concurrent tasks.
224    /// Currently implements a basic placeholder that will be replaced with full
225    /// entity extraction in production.
226    ///
227    /// # Parameters
228    /// - `graph`: Shared knowledge graph for entity storage
229    /// - `document`: Document to process
230    /// - `_metrics`: Metrics collector (currently unused in implementation)
231    ///
232    /// # Returns
233    /// Processing result containing extraction statistics and timing
234    async fn process_single_document_internal(
235        graph: &Arc<RwLock<KnowledgeGraph>>,
236        document: Document,
237        _metrics: &ProcessingMetrics,
238    ) -> Result<ProcessingResult, GraphRAGError> {
239        let start_time = Instant::now();
240
241        // For now, create a simple processing result
242        // In a full async implementation, this would use async entity extraction
243        let result = {
244            let _graph_read = graph.read().await;
245            ProcessingResult {
246                document_id: document.id.clone(),
247                entities_extracted: 0, // Would be actual count from extraction
248                chunks_processed: document.chunks.len(),
249                processing_time: start_time.elapsed(),
250                success: true,
251            }
252        };
253
254        Ok(result)
255    }
256}