1use futures::future::try_join_all;
8use futures::stream::{self, FuturesUnordered, StreamExt};
9use fxhash::FxHashMap;
10use parking_lot::Mutex;
11use std::path::{Path, PathBuf};
12use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
13use std::sync::Arc;
14use std::time::{Duration, Instant, SystemTime};
15use tokio::sync::{mpsc, RwLock, Semaphore};
16use tokio::task::JoinHandle;
17
18#[derive(Debug)]
20pub struct ParallelController {
21 concurrency_limit: Arc<AtomicUsize>,
23 semaphore: Arc<Semaphore>,
25 io_latency_tracker: Arc<IoLatencyTracker>,
27 memory_tracker: Arc<MemoryTracker>,
29 metrics: Arc<Mutex<ParallelMetrics>>,
31 config: ParallelConfig,
33}
34
35#[derive(Debug, Clone)]
37pub struct ParallelConfig {
38 pub initial_concurrency: usize,
40 pub min_concurrency: usize,
42 pub max_concurrency: usize,
44 pub target_io_latency_ms: u64,
46 pub memory_threshold_mb: u64,
48 pub adaptation_interval: Duration,
50 pub queue_size_per_thread: usize,
52 pub batch_size_range: (usize, usize),
54}
55
56#[derive(Debug)]
58struct IoLatencyTracker {
59 recent_latencies: Arc<RwLock<Vec<u64>>>,
60 window_size: usize,
61 last_adaptation: Arc<Mutex<Instant>>,
62}
63
64#[derive(Debug)]
66struct MemoryTracker {
67 baseline_memory: Arc<AtomicU64>,
68 current_memory: Arc<AtomicU64>,
69 peak_memory: Arc<AtomicU64>,
70 measurements: Arc<AtomicU64>,
71}
72
73#[derive(Debug, Default, Clone)]
75pub struct ParallelMetrics {
76 pub tasks_completed: u64,
78 pub tasks_queued: u64,
80 pub active_tasks: u64,
82 pub avg_io_latency_us: u64,
84 pub peak_concurrency: usize,
86 pub current_concurrency: usize,
88 pub memory_usage_bytes: u64,
90 pub throughput: f64,
92 pub concurrency_adaptations: u64,
94 pub backpressure_events: u64,
96 pub queue_overflows: u64,
98}
99
100#[derive(Debug, Clone)]
102pub struct AdaptiveBatch {
103 pub size: usize,
104 pub timeout: Duration,
105 pub memory_limit: u64,
106}
107
108#[derive(Debug, Clone)]
110pub struct WorkItem<T> {
111 pub data: T,
112 pub priority: u8, pub estimated_cost: u32, pub enqueued_at: Instant,
115}
116
117impl Default for ParallelConfig {
118 fn default() -> Self {
119 let cpu_count = num_cpus::get();
120 Self {
121 initial_concurrency: (cpu_count * 2).min(16),
122 min_concurrency: 1,
123 max_concurrency: (cpu_count * 4).min(32),
124 target_io_latency_ms: 50,
125 memory_threshold_mb: 512,
126 adaptation_interval: Duration::from_secs(5),
127 queue_size_per_thread: 100,
128 batch_size_range: (10, 1000),
129 }
130 }
131}
132
133impl ParallelController {
134 pub fn new(config: ParallelConfig) -> Self {
136 let concurrency_limit = Arc::new(AtomicUsize::new(config.initial_concurrency));
137 let semaphore = Arc::new(Semaphore::new(config.initial_concurrency));
138
139 let mut initial_metrics = ParallelMetrics::default();
140 initial_metrics.current_concurrency = config.initial_concurrency;
141
142 Self {
143 concurrency_limit,
144 semaphore,
145 io_latency_tracker: Arc::new(IoLatencyTracker::new(50)),
146 memory_tracker: Arc::new(MemoryTracker::new()),
147 metrics: Arc::new(Mutex::new(initial_metrics)),
148 config,
149 }
150 }
151
152 pub async fn process_parallel<T, F, Fut, R>(
154 &self,
155 items: Vec<WorkItem<T>>,
156 processor: F,
157 ) -> Vec<Result<R, String>>
158 where
159 T: Send + Sync + Clone + 'static,
160 F: Fn(T) -> Fut + Send + Sync + Clone + 'static,
161 Fut: std::future::Future<Output = Result<R, String>> + Send + 'static,
162 R: Send + 'static,
163 {
164 if items.is_empty() {
165 return Vec::new();
166 }
167
168 let total_items = items.len();
169 log::info!("Starting parallel processing of {} items", total_items);
170
171 {
173 let mut metrics = self.metrics.lock();
174 metrics.tasks_queued += total_items as u64;
175 metrics.current_concurrency = self.concurrency_limit.load(Ordering::Relaxed);
176 }
177
178 let batches = self.create_adaptive_batches(items).await;
180 log::debug!("Created {} adaptive batches", batches.len());
181
182 let mut all_results = Vec::with_capacity(total_items);
183 let start_time = Instant::now();
184 let mut completed_tasks = 0u64;
185
186 for batch in batches {
188 let batch_results = self
189 .process_batch_with_backpressure(batch, processor.clone())
190 .await;
191
192 completed_tasks += batch_results.len() as u64;
193 all_results.extend(batch_results);
194
195 let elapsed_secs = start_time.elapsed().as_secs_f64();
197 if elapsed_secs > 0.0 {
198 let mut metrics = self.metrics.lock();
199 metrics.throughput = completed_tasks as f64 / elapsed_secs;
200 metrics.tasks_completed = completed_tasks;
201 }
202
203 if self.should_adapt_concurrency().await {
205 self.adapt_concurrency().await;
206 }
207 }
208
209 log::info!(
210 "Completed parallel processing: {}/{} items in {:.2}s ({:.1} items/sec)",
211 completed_tasks,
212 total_items,
213 start_time.elapsed().as_secs_f64(),
214 completed_tasks as f64 / start_time.elapsed().as_secs_f64()
215 );
216
217 all_results
218 }
219
220 async fn process_batch_with_backpressure<T, F, Fut, R>(
222 &self,
223 batch: Vec<WorkItem<T>>,
224 processor: F,
225 ) -> Vec<Result<R, String>>
226 where
227 T: Send + Sync + 'static,
228 F: Fn(T) -> Fut + Send + Sync + Clone + 'static,
229 Fut: std::future::Future<Output = Result<R, String>> + Send + 'static,
230 R: Send + 'static,
231 {
232 let batch_size = batch.len();
233 let mut futures = FuturesUnordered::new();
234
235 for item in batch {
237 let semaphore = Arc::clone(&self.semaphore);
238 let processor = processor.clone();
239 let io_tracker = Arc::clone(&self.io_latency_tracker);
240 let memory_tracker = Arc::clone(&self.memory_tracker);
241 let metrics = Arc::clone(&self.metrics);
242
243 let future = async move {
244 let _permit = semaphore.acquire().await.unwrap();
246
247 {
249 let mut m = metrics.lock();
250 m.active_tasks += 1;
251 }
252
253 let start_time = Instant::now();
255
256 memory_tracker.sample_memory().await;
258
259 let result = processor(item.data).await;
261
262 let io_latency_us = start_time.elapsed().as_micros() as u64;
264 io_tracker.record_latency(io_latency_us / 1000).await; memory_tracker.sample_memory().await;
268
269 {
271 let mut m = metrics.lock();
272 m.active_tasks = m.active_tasks.saturating_sub(1);
273 m.avg_io_latency_us = (m.avg_io_latency_us + io_latency_us) / 2;
274 }
275
276 result
277 };
278
279 futures.push(future);
280 }
281
282 let results: Vec<_> = futures.collect().await;
284
285 log::debug!("Processed batch of {} items", batch_size);
286 results
287 }
288
289 async fn create_adaptive_batches<T: Clone>(
291 &self,
292 items: Vec<WorkItem<T>>,
293 ) -> Vec<Vec<WorkItem<T>>> {
294 let total_items = items.len();
295 let current_concurrency = self.concurrency_limit.load(Ordering::Relaxed);
296
297 let base_batch_size =
299 (total_items / current_concurrency).max(self.config.batch_size_range.0);
300 let batch_size = base_batch_size.min(self.config.batch_size_range.1);
301
302 let mut sorted_items = items;
304 sorted_items.sort_by_key(|item| (item.priority, item.estimated_cost));
305
306 let mut batches = Vec::new();
308 for chunk in sorted_items.chunks(batch_size) {
309 batches.push(chunk.to_vec());
310 }
311
312 log::debug!(
313 "Created {} batches with average size {} (concurrency: {})",
314 batches.len(),
315 batch_size,
316 current_concurrency
317 );
318
319 batches
320 }
321
322 async fn should_adapt_concurrency(&self) -> bool {
324 let last_adaptation = *self.io_latency_tracker.last_adaptation.lock();
325 last_adaptation.elapsed() > self.config.adaptation_interval
326 }
327
328 async fn adapt_concurrency(&self) {
330 let avg_latency = self.io_latency_tracker.average_latency().await;
331 let memory_pressure = self.memory_tracker.memory_pressure().await;
332 let current_concurrency = self.concurrency_limit.load(Ordering::Relaxed);
333
334 let mut new_concurrency = current_concurrency;
335
336 if avg_latency > self.config.target_io_latency_ms || memory_pressure > 0.8 {
338 new_concurrency = (current_concurrency * 8 / 10).max(self.config.min_concurrency);
339
340 let mut metrics = self.metrics.lock();
341 metrics.backpressure_events += 1;
342
343 log::debug!(
344 "Reducing concurrency: {} -> {} (latency: {}ms, memory pressure: {:.1}%)",
345 current_concurrency,
346 new_concurrency,
347 avg_latency,
348 memory_pressure * 100.0
349 );
350 }
351 else if avg_latency < self.config.target_io_latency_ms / 2 && memory_pressure < 0.5 {
353 new_concurrency = (current_concurrency * 12 / 10).min(self.config.max_concurrency);
354
355 log::debug!(
356 "Increasing concurrency: {} -> {} (latency: {}ms, memory pressure: {:.1}%)",
357 current_concurrency,
358 new_concurrency,
359 avg_latency,
360 memory_pressure * 100.0
361 );
362 }
363
364 if new_concurrency != current_concurrency {
366 self.concurrency_limit
367 .store(new_concurrency, Ordering::Relaxed);
368
369 if new_concurrency > current_concurrency {
371 self.semaphore
372 .add_permits(new_concurrency - current_concurrency);
373 }
374 {
378 let mut metrics = self.metrics.lock();
379 metrics.current_concurrency = new_concurrency;
380 metrics.peak_concurrency = metrics.peak_concurrency.max(new_concurrency);
381 metrics.concurrency_adaptations += 1;
382 }
383 }
384
385 *self.io_latency_tracker.last_adaptation.lock() = Instant::now();
387 }
388
389 pub fn metrics(&self) -> ParallelMetrics {
391 let mut metrics = self.metrics.lock().clone();
392 metrics.memory_usage_bytes = self.memory_tracker.current_memory.load(Ordering::Relaxed);
393 metrics
394 }
395
396 pub fn reset_metrics(&self) {
398 let mut metrics = self.metrics.lock();
399 *metrics = ParallelMetrics::default();
400 metrics.current_concurrency = self.concurrency_limit.load(Ordering::Relaxed);
401 }
402}
403
404impl IoLatencyTracker {
405 fn new(window_size: usize) -> Self {
406 Self {
407 recent_latencies: Arc::new(RwLock::new(Vec::with_capacity(window_size))),
408 window_size,
409 last_adaptation: Arc::new(Mutex::new(Instant::now())),
410 }
411 }
412
413 async fn record_latency(&self, latency_ms: u64) {
414 let mut latencies = self.recent_latencies.write().await;
415 latencies.push(latency_ms);
416
417 if latencies.len() > self.window_size {
418 latencies.remove(0);
419 }
420 }
421
422 async fn average_latency(&self) -> u64 {
423 let latencies = self.recent_latencies.read().await;
424 if latencies.is_empty() {
425 0
426 } else {
427 latencies.iter().sum::<u64>() / latencies.len() as u64
428 }
429 }
430}
431
432impl MemoryTracker {
433 fn new() -> Self {
434 let initial_memory = Self::get_memory_usage();
435 Self {
436 baseline_memory: Arc::new(AtomicU64::new(initial_memory)),
437 current_memory: Arc::new(AtomicU64::new(initial_memory)),
438 peak_memory: Arc::new(AtomicU64::new(initial_memory)),
439 measurements: Arc::new(AtomicU64::new(0)),
440 }
441 }
442
443 async fn sample_memory(&self) {
444 let current = Self::get_memory_usage();
445 self.current_memory.store(current, Ordering::Relaxed);
446
447 self.peak_memory.fetch_max(current, Ordering::Relaxed);
449 self.measurements.fetch_add(1, Ordering::Relaxed);
450 }
451
452 async fn memory_pressure(&self) -> f64 {
453 let current = self.current_memory.load(Ordering::Relaxed);
454 let baseline = self.baseline_memory.load(Ordering::Relaxed);
455
456 if baseline == 0 {
457 0.0
458 } else {
459 (current as f64 - baseline as f64) / (baseline as f64 * 4.0) }
461 }
462
463 fn get_memory_usage() -> u64 {
464 #[cfg(target_os = "linux")]
466 {
467 if let Ok(contents) = std::fs::read_to_string("/proc/self/status") {
468 for line in contents.lines() {
469 if line.starts_with("VmRSS:") {
470 if let Some(kb_str) = line.split_whitespace().nth(1) {
471 if let Ok(kb) = kb_str.parse::<u64>() {
472 return kb * 1024; }
474 }
475 }
476 }
477 }
478 }
479
480 0
482 }
483}
484
485impl<T> WorkItem<T> {
486 pub fn new(data: T) -> Self {
487 Self {
488 data,
489 priority: 128, estimated_cost: 100, enqueued_at: Instant::now(),
492 }
493 }
494
495 pub fn with_priority(mut self, priority: u8) -> Self {
496 self.priority = priority;
497 self
498 }
499
500 pub fn with_estimated_cost(mut self, cost: u32) -> Self {
501 self.estimated_cost = cost;
502 self
503 }
504
505 pub fn queue_time(&self) -> Duration {
506 self.enqueued_at.elapsed()
507 }
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513 use std::sync::atomic::AtomicUsize;
514
515 #[tokio::test]
516 async fn test_parallel_controller_creation() {
517 let config = ParallelConfig::default();
518 let controller = ParallelController::new(config.clone());
519
520 let metrics = controller.metrics();
521 assert_eq!(metrics.current_concurrency, config.initial_concurrency);
522 assert_eq!(metrics.tasks_completed, 0);
523 }
524
525 #[tokio::test]
526 async fn test_work_item_creation() {
527 let item = WorkItem::new("test_data")
528 .with_priority(10)
529 .with_estimated_cost(500);
530
531 assert_eq!(item.data, "test_data");
532 assert_eq!(item.priority, 10);
533 assert_eq!(item.estimated_cost, 500);
534 assert!(item.queue_time().as_millis() < 10);
535 }
536
537 #[tokio::test]
538 async fn test_parallel_processing() {
539 let config = ParallelConfig {
540 initial_concurrency: 2,
541 max_concurrency: 4,
542 ..Default::default()
543 };
544 let controller = ParallelController::new(config);
545
546 let items: Vec<WorkItem<usize>> = (0..10)
548 .map(|i| WorkItem::new(i).with_priority(i as u8))
549 .collect();
550
551 let processor = |x: usize| async move {
553 tokio::time::sleep(Duration::from_millis(10)).await;
554 Ok(x * 2)
555 };
556
557 let results = controller.process_parallel(items, processor).await;
558
559 assert_eq!(results.len(), 10);
560
561 for (i, result) in results.iter().enumerate() {
563 match result {
564 Ok(value) => assert_eq!(*value, i * 2),
565 Err(e) => panic!("Unexpected error: {}", e),
566 }
567 }
568
569 let metrics = controller.metrics();
570 assert_eq!(metrics.tasks_completed, 10);
571 assert!(metrics.throughput > 0.0);
572 }
573
574 #[tokio::test]
575 async fn test_batch_creation() {
576 let config = ParallelConfig {
577 initial_concurrency: 4,
578 batch_size_range: (2, 5),
579 ..Default::default()
580 };
581 let controller = ParallelController::new(config);
582
583 let items: Vec<WorkItem<usize>> = (0..12).map(|i| WorkItem::new(i)).collect();
584
585 let batches = controller.create_adaptive_batches(items).await;
586
587 assert!(batches.len() > 1);
589
590 for batch in &batches {
592 assert!(batch.len() >= 1);
593 assert!(batch.len() <= 5);
594 }
595
596 let total_items: usize = batches.iter().map(|b| b.len()).sum();
598 assert_eq!(total_items, 12);
599 }
600
601 #[tokio::test]
602 async fn test_latency_tracking() {
603 let tracker = IoLatencyTracker::new(5);
604
605 tracker.record_latency(10).await;
607 tracker.record_latency(20).await;
608 tracker.record_latency(30).await;
609
610 let avg = tracker.average_latency().await;
611 assert_eq!(avg, 20); for i in 0..10 {
615 tracker.record_latency(100 + i).await;
616 }
617
618 let latencies = tracker.recent_latencies.read().await;
619 assert_eq!(latencies.len(), 5); }
621
622 #[tokio::test]
623 async fn test_memory_tracking() {
624 let tracker = MemoryTracker::new();
625
626 tracker.sample_memory().await;
627 let pressure = tracker.memory_pressure().await;
628
629 assert!(pressure >= 0.0);
631 assert!(pressure <= 10.0); }
633
634 #[tokio::test]
635 async fn test_error_handling() {
636 let config = ParallelConfig::default();
637 let controller = ParallelController::new(config);
638
639 let items: Vec<WorkItem<usize>> =
640 vec![WorkItem::new(1), WorkItem::new(2), WorkItem::new(3)];
641
642 let processor = |x: usize| async move {
644 if x % 2 == 0 {
645 Err(format!("Error processing {}", x))
646 } else {
647 Ok(x * 10)
648 }
649 };
650
651 let results = controller.process_parallel(items, processor).await;
652
653 assert_eq!(results.len(), 3);
654 assert!(results[0].is_ok()); assert!(results[1].is_err()); assert!(results[2].is_ok()); assert_eq!(results[0].as_ref().unwrap(), &10);
660 assert_eq!(results[2].as_ref().unwrap(), &30);
661 }
662
663 #[tokio::test]
664 async fn test_metrics_updates() {
665 let config = ParallelConfig {
666 initial_concurrency: 2,
667 ..Default::default()
668 };
669 let controller = ParallelController::new(config);
670
671 let initial_metrics = controller.metrics();
673 assert_eq!(initial_metrics.tasks_completed, 0);
674 assert_eq!(initial_metrics.current_concurrency, 2);
675
676 let items: Vec<WorkItem<usize>> = vec![WorkItem::new(1), WorkItem::new(2)];
678 let processor = |x: usize| async move { Ok(x) };
679
680 controller.process_parallel(items, processor).await;
681
682 let final_metrics = controller.metrics();
684 assert_eq!(final_metrics.tasks_completed, 2);
685 assert!(final_metrics.throughput > 0.0);
686 }
687}