scribe_scanner/
parallel.rs

1//! Bounded parallelism with backpressure control and adaptive batching.
2//!
3//! This module implements intelligent parallelism that adapts to system load,
4//! I/O latency, and memory pressure to prevent thrashing while maximizing
5//! throughput for file scanning operations.
6
7use futures::future::try_join_all;
8use futures::stream::{self, FuturesUnordered, StreamExt};
9use fxhash::FxHashMap;
10use parking_lot::Mutex;
11use std::path::{Path, PathBuf};
12use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
13use std::sync::Arc;
14use std::time::{Duration, Instant, SystemTime};
15use tokio::sync::{mpsc, RwLock, Semaphore};
16use tokio::task::JoinHandle;
17
18/// Adaptive parallelism controller with backpressure
19#[derive(Debug)]
20pub struct ParallelController {
21    /// Current concurrency limit (adaptive)
22    concurrency_limit: Arc<AtomicUsize>,
23    /// Semaphore for concurrency control
24    semaphore: Arc<Semaphore>,
25    /// I/O latency tracker for adaptation
26    io_latency_tracker: Arc<IoLatencyTracker>,
27    /// Memory pressure detector  
28    memory_tracker: Arc<MemoryTracker>,
29    /// Performance metrics
30    metrics: Arc<Mutex<ParallelMetrics>>,
31    /// Configuration
32    config: ParallelConfig,
33}
34
35/// Configuration for parallel processing
36#[derive(Debug, Clone)]
37pub struct ParallelConfig {
38    /// Initial concurrency level
39    pub initial_concurrency: usize,
40    /// Minimum concurrency (never go below this)
41    pub min_concurrency: usize,
42    /// Maximum concurrency (never exceed this)
43    pub max_concurrency: usize,
44    /// Target I/O latency (ms) for backpressure
45    pub target_io_latency_ms: u64,
46    /// Memory usage threshold (MB) for backpressure
47    pub memory_threshold_mb: u64,
48    /// Adaptation interval (how often to adjust)
49    pub adaptation_interval: Duration,
50    /// Work queue size per thread
51    pub queue_size_per_thread: usize,
52    /// Batch size adaptation range
53    pub batch_size_range: (usize, usize),
54}
55
56/// I/O latency tracking for adaptive concurrency
57#[derive(Debug)]
58struct IoLatencyTracker {
59    recent_latencies: Arc<RwLock<Vec<u64>>>,
60    window_size: usize,
61    last_adaptation: Arc<Mutex<Instant>>,
62}
63
64/// Memory usage tracking for backpressure
65#[derive(Debug)]
66struct MemoryTracker {
67    baseline_memory: Arc<AtomicU64>,
68    current_memory: Arc<AtomicU64>,
69    peak_memory: Arc<AtomicU64>,
70    measurements: Arc<AtomicU64>,
71}
72
73/// Performance metrics for parallel processing
74#[derive(Debug, Default, Clone)]
75pub struct ParallelMetrics {
76    /// Tasks completed
77    pub tasks_completed: u64,
78    /// Tasks queued
79    pub tasks_queued: u64,
80    /// Current active tasks
81    pub active_tasks: u64,
82    /// Average I/O latency (microseconds)
83    pub avg_io_latency_us: u64,
84    /// Peak concurrency reached
85    pub peak_concurrency: usize,
86    /// Current concurrency level
87    pub current_concurrency: usize,
88    /// Memory usage (bytes)
89    pub memory_usage_bytes: u64,
90    /// Throughput (tasks/second)
91    pub throughput: f64,
92    /// Adaptation events
93    pub concurrency_adaptations: u64,
94    /// Backpressure events
95    pub backpressure_events: u64,
96    /// Queue overflow events
97    pub queue_overflows: u64,
98}
99
100/// Adaptive batch configuration
101#[derive(Debug, Clone)]
102pub struct AdaptiveBatch {
103    pub size: usize,
104    pub timeout: Duration,
105    pub memory_limit: u64,
106}
107
108/// Work item for parallel processing
109#[derive(Debug, Clone)]
110pub struct WorkItem<T> {
111    pub data: T,
112    pub priority: u8,        // 0 = highest priority
113    pub estimated_cost: u32, // relative processing cost
114    pub enqueued_at: Instant,
115}
116
117impl Default for ParallelConfig {
118    fn default() -> Self {
119        let cpu_count = num_cpus::get();
120        Self {
121            initial_concurrency: (cpu_count * 2).min(16),
122            min_concurrency: 1,
123            max_concurrency: (cpu_count * 4).min(32),
124            target_io_latency_ms: 50,
125            memory_threshold_mb: 512,
126            adaptation_interval: Duration::from_secs(5),
127            queue_size_per_thread: 100,
128            batch_size_range: (10, 1000),
129        }
130    }
131}
132
133impl ParallelController {
134    /// Create a new parallel controller
135    pub fn new(config: ParallelConfig) -> Self {
136        let concurrency_limit = Arc::new(AtomicUsize::new(config.initial_concurrency));
137        let semaphore = Arc::new(Semaphore::new(config.initial_concurrency));
138
139        let mut initial_metrics = ParallelMetrics::default();
140        initial_metrics.current_concurrency = config.initial_concurrency;
141
142        Self {
143            concurrency_limit,
144            semaphore,
145            io_latency_tracker: Arc::new(IoLatencyTracker::new(50)),
146            memory_tracker: Arc::new(MemoryTracker::new()),
147            metrics: Arc::new(Mutex::new(initial_metrics)),
148            config,
149        }
150    }
151
152    /// Process work items with adaptive parallelism and backpressure
153    pub async fn process_parallel<T, F, Fut, R>(
154        &self,
155        items: Vec<WorkItem<T>>,
156        processor: F,
157    ) -> Vec<Result<R, String>>
158    where
159        T: Send + Sync + Clone + 'static,
160        F: Fn(T) -> Fut + Send + Sync + Clone + 'static,
161        Fut: std::future::Future<Output = Result<R, String>> + Send + 'static,
162        R: Send + 'static,
163    {
164        if items.is_empty() {
165            return Vec::new();
166        }
167
168        let total_items = items.len();
169        log::info!("Starting parallel processing of {} items", total_items);
170
171        // Update metrics
172        {
173            let mut metrics = self.metrics.lock();
174            metrics.tasks_queued += total_items as u64;
175            metrics.current_concurrency = self.concurrency_limit.load(Ordering::Relaxed);
176        }
177
178        // Create adaptive batches
179        let batches = self.create_adaptive_batches(items).await;
180        log::debug!("Created {} adaptive batches", batches.len());
181
182        let mut all_results = Vec::with_capacity(total_items);
183        let start_time = Instant::now();
184        let mut completed_tasks = 0u64;
185
186        // Process batches with bounded parallelism
187        for batch in batches {
188            let batch_results = self
189                .process_batch_with_backpressure(batch, processor.clone())
190                .await;
191
192            completed_tasks += batch_results.len() as u64;
193            all_results.extend(batch_results);
194
195            // Update throughput metric
196            let elapsed_secs = start_time.elapsed().as_secs_f64();
197            if elapsed_secs > 0.0 {
198                let mut metrics = self.metrics.lock();
199                metrics.throughput = completed_tasks as f64 / elapsed_secs;
200                metrics.tasks_completed = completed_tasks;
201            }
202
203            // Adaptive concurrency adjustment
204            if self.should_adapt_concurrency().await {
205                self.adapt_concurrency().await;
206            }
207        }
208
209        log::info!(
210            "Completed parallel processing: {}/{} items in {:.2}s ({:.1} items/sec)",
211            completed_tasks,
212            total_items,
213            start_time.elapsed().as_secs_f64(),
214            completed_tasks as f64 / start_time.elapsed().as_secs_f64()
215        );
216
217        all_results
218    }
219
220    /// Process a single batch with backpressure control
221    async fn process_batch_with_backpressure<T, F, Fut, R>(
222        &self,
223        batch: Vec<WorkItem<T>>,
224        processor: F,
225    ) -> Vec<Result<R, String>>
226    where
227        T: Send + Sync + 'static,
228        F: Fn(T) -> Fut + Send + Sync + Clone + 'static,
229        Fut: std::future::Future<Output = Result<R, String>> + Send + 'static,
230        R: Send + 'static,
231    {
232        let batch_size = batch.len();
233        let mut futures = FuturesUnordered::new();
234
235        // Process items with concurrency control
236        for item in batch {
237            let semaphore = Arc::clone(&self.semaphore);
238            let processor = processor.clone();
239            let io_tracker = Arc::clone(&self.io_latency_tracker);
240            let memory_tracker = Arc::clone(&self.memory_tracker);
241            let metrics = Arc::clone(&self.metrics);
242
243            let future = async move {
244                // Acquire permit (blocks if at concurrency limit)
245                let _permit = semaphore.acquire().await.unwrap();
246
247                // Update active task count
248                {
249                    let mut m = metrics.lock();
250                    m.active_tasks += 1;
251                }
252
253                // Track I/O latency
254                let start_time = Instant::now();
255
256                // Sample memory before processing
257                memory_tracker.sample_memory().await;
258
259                // Process the item
260                let result = processor(item.data).await;
261
262                // Track I/O latency
263                let io_latency_us = start_time.elapsed().as_micros() as u64;
264                io_tracker.record_latency(io_latency_us / 1000).await; // Convert to ms
265
266                // Sample memory after processing
267                memory_tracker.sample_memory().await;
268
269                // Update metrics
270                {
271                    let mut m = metrics.lock();
272                    m.active_tasks = m.active_tasks.saturating_sub(1);
273                    m.avg_io_latency_us = (m.avg_io_latency_us + io_latency_us) / 2;
274                }
275
276                result
277            };
278
279            futures.push(future);
280        }
281
282        // Collect all results
283        let results: Vec<_> = futures.collect().await;
284
285        log::debug!("Processed batch of {} items", batch_size);
286        results
287    }
288
289    /// Create adaptive batches based on system conditions
290    async fn create_adaptive_batches<T: Clone>(
291        &self,
292        items: Vec<WorkItem<T>>,
293    ) -> Vec<Vec<WorkItem<T>>> {
294        let total_items = items.len();
295        let current_concurrency = self.concurrency_limit.load(Ordering::Relaxed);
296
297        // Calculate optimal batch size based on concurrency and item count
298        let base_batch_size =
299            (total_items / current_concurrency).max(self.config.batch_size_range.0);
300        let batch_size = base_batch_size.min(self.config.batch_size_range.1);
301
302        // Sort items by priority (higher priority = lower number = first)
303        let mut sorted_items = items;
304        sorted_items.sort_by_key(|item| (item.priority, item.estimated_cost));
305
306        // Create batches
307        let mut batches = Vec::new();
308        for chunk in sorted_items.chunks(batch_size) {
309            batches.push(chunk.to_vec());
310        }
311
312        log::debug!(
313            "Created {} batches with average size {} (concurrency: {})",
314            batches.len(),
315            batch_size,
316            current_concurrency
317        );
318
319        batches
320    }
321
322    /// Check if concurrency should be adapted
323    async fn should_adapt_concurrency(&self) -> bool {
324        let last_adaptation = *self.io_latency_tracker.last_adaptation.lock();
325        last_adaptation.elapsed() > self.config.adaptation_interval
326    }
327
328    /// Adapt concurrency based on system conditions
329    async fn adapt_concurrency(&self) {
330        let avg_latency = self.io_latency_tracker.average_latency().await;
331        let memory_pressure = self.memory_tracker.memory_pressure().await;
332        let current_concurrency = self.concurrency_limit.load(Ordering::Relaxed);
333
334        let mut new_concurrency = current_concurrency;
335
336        // Reduce concurrency if high I/O latency or memory pressure
337        if avg_latency > self.config.target_io_latency_ms || memory_pressure > 0.8 {
338            new_concurrency = (current_concurrency * 8 / 10).max(self.config.min_concurrency);
339
340            let mut metrics = self.metrics.lock();
341            metrics.backpressure_events += 1;
342
343            log::debug!(
344                "Reducing concurrency: {} -> {} (latency: {}ms, memory pressure: {:.1}%)",
345                current_concurrency,
346                new_concurrency,
347                avg_latency,
348                memory_pressure * 100.0
349            );
350        }
351        // Increase concurrency if low latency and memory pressure
352        else if avg_latency < self.config.target_io_latency_ms / 2 && memory_pressure < 0.5 {
353            new_concurrency = (current_concurrency * 12 / 10).min(self.config.max_concurrency);
354
355            log::debug!(
356                "Increasing concurrency: {} -> {} (latency: {}ms, memory pressure: {:.1}%)",
357                current_concurrency,
358                new_concurrency,
359                avg_latency,
360                memory_pressure * 100.0
361            );
362        }
363
364        // Apply new concurrency limit
365        if new_concurrency != current_concurrency {
366            self.concurrency_limit
367                .store(new_concurrency, Ordering::Relaxed);
368
369            // Update semaphore permits
370            if new_concurrency > current_concurrency {
371                self.semaphore
372                    .add_permits(new_concurrency - current_concurrency);
373            }
374            // Note: We can't reduce semaphore permits without draining tasks
375
376            // Update metrics
377            {
378                let mut metrics = self.metrics.lock();
379                metrics.current_concurrency = new_concurrency;
380                metrics.peak_concurrency = metrics.peak_concurrency.max(new_concurrency);
381                metrics.concurrency_adaptations += 1;
382            }
383        }
384
385        // Update adaptation timestamp
386        *self.io_latency_tracker.last_adaptation.lock() = Instant::now();
387    }
388
389    /// Get current performance metrics
390    pub fn metrics(&self) -> ParallelMetrics {
391        let mut metrics = self.metrics.lock().clone();
392        metrics.memory_usage_bytes = self.memory_tracker.current_memory.load(Ordering::Relaxed);
393        metrics
394    }
395
396    /// Reset metrics
397    pub fn reset_metrics(&self) {
398        let mut metrics = self.metrics.lock();
399        *metrics = ParallelMetrics::default();
400        metrics.current_concurrency = self.concurrency_limit.load(Ordering::Relaxed);
401    }
402}
403
404impl IoLatencyTracker {
405    fn new(window_size: usize) -> Self {
406        Self {
407            recent_latencies: Arc::new(RwLock::new(Vec::with_capacity(window_size))),
408            window_size,
409            last_adaptation: Arc::new(Mutex::new(Instant::now())),
410        }
411    }
412
413    async fn record_latency(&self, latency_ms: u64) {
414        let mut latencies = self.recent_latencies.write().await;
415        latencies.push(latency_ms);
416
417        if latencies.len() > self.window_size {
418            latencies.remove(0);
419        }
420    }
421
422    async fn average_latency(&self) -> u64 {
423        let latencies = self.recent_latencies.read().await;
424        if latencies.is_empty() {
425            0
426        } else {
427            latencies.iter().sum::<u64>() / latencies.len() as u64
428        }
429    }
430}
431
432impl MemoryTracker {
433    fn new() -> Self {
434        let initial_memory = Self::get_memory_usage();
435        Self {
436            baseline_memory: Arc::new(AtomicU64::new(initial_memory)),
437            current_memory: Arc::new(AtomicU64::new(initial_memory)),
438            peak_memory: Arc::new(AtomicU64::new(initial_memory)),
439            measurements: Arc::new(AtomicU64::new(0)),
440        }
441    }
442
443    async fn sample_memory(&self) {
444        let current = Self::get_memory_usage();
445        self.current_memory.store(current, Ordering::Relaxed);
446
447        // Update peak
448        self.peak_memory.fetch_max(current, Ordering::Relaxed);
449        self.measurements.fetch_add(1, Ordering::Relaxed);
450    }
451
452    async fn memory_pressure(&self) -> f64 {
453        let current = self.current_memory.load(Ordering::Relaxed);
454        let baseline = self.baseline_memory.load(Ordering::Relaxed);
455
456        if baseline == 0 {
457            0.0
458        } else {
459            (current as f64 - baseline as f64) / (baseline as f64 * 4.0) // Normalize to 0-1
460        }
461    }
462
463    fn get_memory_usage() -> u64 {
464        // Platform-specific memory usage (simplified)
465        #[cfg(target_os = "linux")]
466        {
467            if let Ok(contents) = std::fs::read_to_string("/proc/self/status") {
468                for line in contents.lines() {
469                    if line.starts_with("VmRSS:") {
470                        if let Some(kb_str) = line.split_whitespace().nth(1) {
471                            if let Ok(kb) = kb_str.parse::<u64>() {
472                                return kb * 1024; // Convert KB to bytes
473                            }
474                        }
475                    }
476                }
477            }
478        }
479
480        // Fallback: use a simple heap estimation
481        0
482    }
483}
484
485impl<T> WorkItem<T> {
486    pub fn new(data: T) -> Self {
487        Self {
488            data,
489            priority: 128,       // Medium priority
490            estimated_cost: 100, // Default cost
491            enqueued_at: Instant::now(),
492        }
493    }
494
495    pub fn with_priority(mut self, priority: u8) -> Self {
496        self.priority = priority;
497        self
498    }
499
500    pub fn with_estimated_cost(mut self, cost: u32) -> Self {
501        self.estimated_cost = cost;
502        self
503    }
504
505    pub fn queue_time(&self) -> Duration {
506        self.enqueued_at.elapsed()
507    }
508}
509
510#[cfg(test)]
511mod tests {
512    use super::*;
513    use std::sync::atomic::AtomicUsize;
514
515    #[tokio::test]
516    async fn test_parallel_controller_creation() {
517        let config = ParallelConfig::default();
518        let controller = ParallelController::new(config.clone());
519
520        let metrics = controller.metrics();
521        assert_eq!(metrics.current_concurrency, config.initial_concurrency);
522        assert_eq!(metrics.tasks_completed, 0);
523    }
524
525    #[tokio::test]
526    async fn test_work_item_creation() {
527        let item = WorkItem::new("test_data")
528            .with_priority(10)
529            .with_estimated_cost(500);
530
531        assert_eq!(item.data, "test_data");
532        assert_eq!(item.priority, 10);
533        assert_eq!(item.estimated_cost, 500);
534        assert!(item.queue_time().as_millis() < 10);
535    }
536
537    #[tokio::test]
538    async fn test_parallel_processing() {
539        let config = ParallelConfig {
540            initial_concurrency: 2,
541            max_concurrency: 4,
542            ..Default::default()
543        };
544        let controller = ParallelController::new(config);
545
546        // Create test work items
547        let items: Vec<WorkItem<usize>> = (0..10)
548            .map(|i| WorkItem::new(i).with_priority(i as u8))
549            .collect();
550
551        // Simple processor that just doubles the input
552        let processor = |x: usize| async move {
553            tokio::time::sleep(Duration::from_millis(10)).await;
554            Ok(x * 2)
555        };
556
557        let results = controller.process_parallel(items, processor).await;
558
559        assert_eq!(results.len(), 10);
560
561        // Check that all results are correct (doubled values)
562        for (i, result) in results.iter().enumerate() {
563            match result {
564                Ok(value) => assert_eq!(*value, i * 2),
565                Err(e) => panic!("Unexpected error: {}", e),
566            }
567        }
568
569        let metrics = controller.metrics();
570        assert_eq!(metrics.tasks_completed, 10);
571        assert!(metrics.throughput > 0.0);
572    }
573
574    #[tokio::test]
575    async fn test_batch_creation() {
576        let config = ParallelConfig {
577            initial_concurrency: 4,
578            batch_size_range: (2, 5),
579            ..Default::default()
580        };
581        let controller = ParallelController::new(config);
582
583        let items: Vec<WorkItem<usize>> = (0..12).map(|i| WorkItem::new(i)).collect();
584
585        let batches = controller.create_adaptive_batches(items).await;
586
587        // Should create multiple batches
588        assert!(batches.len() > 1);
589
590        // Each batch should be within size limits
591        for batch in &batches {
592            assert!(batch.len() >= 1);
593            assert!(batch.len() <= 5);
594        }
595
596        // Total items should be preserved
597        let total_items: usize = batches.iter().map(|b| b.len()).sum();
598        assert_eq!(total_items, 12);
599    }
600
601    #[tokio::test]
602    async fn test_latency_tracking() {
603        let tracker = IoLatencyTracker::new(5);
604
605        // Record some latencies
606        tracker.record_latency(10).await;
607        tracker.record_latency(20).await;
608        tracker.record_latency(30).await;
609
610        let avg = tracker.average_latency().await;
611        assert_eq!(avg, 20); // (10 + 20 + 30) / 3 = 20
612
613        // Test window limit
614        for i in 0..10 {
615            tracker.record_latency(100 + i).await;
616        }
617
618        let latencies = tracker.recent_latencies.read().await;
619        assert_eq!(latencies.len(), 5); // Should be capped at window size
620    }
621
622    #[tokio::test]
623    async fn test_memory_tracking() {
624        let tracker = MemoryTracker::new();
625
626        tracker.sample_memory().await;
627        let pressure = tracker.memory_pressure().await;
628
629        // Memory pressure should be reasonable (0.0 - 1.0 range)
630        assert!(pressure >= 0.0);
631        assert!(pressure <= 10.0); // Allow some room for test environment variation
632    }
633
634    #[tokio::test]
635    async fn test_error_handling() {
636        let config = ParallelConfig::default();
637        let controller = ParallelController::new(config);
638
639        let items: Vec<WorkItem<usize>> =
640            vec![WorkItem::new(1), WorkItem::new(2), WorkItem::new(3)];
641
642        // Processor that fails on even numbers
643        let processor = |x: usize| async move {
644            if x % 2 == 0 {
645                Err(format!("Error processing {}", x))
646            } else {
647                Ok(x * 10)
648            }
649        };
650
651        let results = controller.process_parallel(items, processor).await;
652
653        assert_eq!(results.len(), 3);
654        assert!(results[0].is_ok()); // 1 -> success
655        assert!(results[1].is_err()); // 2 -> error
656        assert!(results[2].is_ok()); // 3 -> success
657
658        // Check specific values
659        assert_eq!(results[0].as_ref().unwrap(), &10);
660        assert_eq!(results[2].as_ref().unwrap(), &30);
661    }
662
663    #[tokio::test]
664    async fn test_metrics_updates() {
665        let config = ParallelConfig {
666            initial_concurrency: 2,
667            ..Default::default()
668        };
669        let controller = ParallelController::new(config);
670
671        // Initial metrics
672        let initial_metrics = controller.metrics();
673        assert_eq!(initial_metrics.tasks_completed, 0);
674        assert_eq!(initial_metrics.current_concurrency, 2);
675
676        // Process some items
677        let items: Vec<WorkItem<usize>> = vec![WorkItem::new(1), WorkItem::new(2)];
678        let processor = |x: usize| async move { Ok(x) };
679
680        controller.process_parallel(items, processor).await;
681
682        // Check updated metrics
683        let final_metrics = controller.metrics();
684        assert_eq!(final_metrics.tasks_completed, 2);
685        assert!(final_metrics.throughput > 0.0);
686    }
687}