thread_flow/incremental/concurrency.rs
1// SPDX-FileCopyrightText: 2025 Knitli Inc. <knitli@knit.li>
2// SPDX-License-Identifier: AGPL-3.0-or-later
3
4//! Concurrency abstraction layer for incremental analysis.
5//!
6//! Provides unified interface for parallel execution across different deployment targets:
7//! - **RayonExecutor**: CPU-bound parallelism for CLI (multi-core)
8//! - **TokioExecutor**: Async I/O concurrency for all deployments
9//! - **SequentialExecutor**: Fallback for single-threaded execution
10//!
11//! ## Architecture
12//!
13//! The concurrency layer adapts to deployment context via feature flags:
14//! - CLI with `parallel` feature: Rayon for CPU-bound work
15//! - All deployments: tokio for async I/O operations
16//! - Fallback: Sequential execution when parallelism unavailable
17//!
18//! ## Examples
19//!
20//! ### Basic Usage
21//!
22//! ```rust
23//! use thread_flow::incremental::concurrency::{
24//! create_executor, ConcurrencyMode, ExecutionError,
25//! };
26//!
27//! # async fn example() -> Result<(), ExecutionError> {
28//! // Create executor for current deployment
29//! let executor = create_executor(ConcurrencyMode::Tokio { max_concurrent: 10 });
30//!
31//! // Process batch of items
32//! let items = vec![1, 2, 3, 4, 5];
33//! let results = executor.execute_batch(items, |n| {
34//! // Your work here
35//! Ok(())
36//! }).await?;
37//!
38//! assert_eq!(results.len(), 5);
39//! # Ok(())
40//! # }
41//! ```
42//!
43//! ### Feature-Aware Execution
44//!
45//! ```rust
46//! use thread_flow::incremental::concurrency::{
47//! create_executor, ConcurrencyMode,
48//! };
49//!
50//! # async fn example() {
51//! // Automatically uses best executor for current build
52//! #[cfg(feature = "parallel")]
53//! let executor = create_executor(ConcurrencyMode::Rayon { num_threads: None });
54//!
55//! #[cfg(not(feature = "parallel"))]
56//! let executor = create_executor(ConcurrencyMode::Tokio { max_concurrent: 10 });
57//! # }
58//! ```
59
60use async_trait::async_trait;
61use std::sync::Arc;
62use thiserror::Error;
63
64/// Errors that can occur during batch execution.
65#[derive(Debug, Error)]
66pub enum ExecutionError {
67 /// Generic execution failure with description.
68 #[error("Execution failed: {0}")]
69 Failed(String),
70
71 /// Thread pool creation or management error.
72 #[error("Thread pool error: {0}")]
73 ThreadPool(String),
74
75 /// Task join or coordination error.
76 #[error("Task join error: {0}")]
77 Join(String),
78}
79
80/// Unified interface for concurrent batch execution.
81///
82/// Implementations provide different parallelism strategies:
83/// - **Rayon**: CPU-bound parallelism (multi-threaded)
84/// - **Tokio**: I/O-bound concurrency (async tasks)
85/// - **Sequential**: Single-threaded fallback
86#[async_trait]
87pub trait ConcurrencyExecutor: Send + Sync {
88 /// Execute operation on batch of items concurrently.
89 ///
90 /// Returns vector of results in same order as input items.
91 /// Individual item failures don't stop processing of other items.
92 ///
93 /// # Arguments
94 ///
95 /// * `items` - Batch of items to process
96 /// * `op` - Operation to apply to each item
97 ///
98 /// # Returns
99 ///
100 /// Vector of results for each item. Length matches input items.
101 ///
102 /// # Errors
103 ///
104 /// Returns error if batch execution infrastructure fails.
105 /// Individual item failures are captured in result vector.
106 async fn execute_batch<F, T>(
107 &self,
108 items: Vec<T>,
109 op: F,
110 ) -> Result<Vec<Result<(), ExecutionError>>, ExecutionError>
111 where
112 F: Fn(T) -> Result<(), ExecutionError> + Send + Sync + 'static,
113 T: Send + 'static;
114
115 /// Get executor implementation name for debugging.
116 fn name(&self) -> &str;
117}
118
119// ============================================================================
120// Rayon Executor (CPU-bound parallelism, CLI only)
121// ============================================================================
122
123#[cfg(feature = "parallel")]
124/// CPU-bound parallel executor using Rayon thread pool.
125///
126/// Optimized for multi-core CLI deployments processing independent items.
127/// Not available in edge deployments (no `parallel` feature).
128#[derive(Debug)]
129pub struct RayonExecutor {
130 thread_pool: rayon::ThreadPool,
131}
132
133#[cfg(feature = "parallel")]
134impl RayonExecutor {
135 /// Create new Rayon executor with optional thread count.
136 ///
137 /// # Arguments
138 ///
139 /// * `num_threads` - Optional thread count (None = use all cores)
140 ///
141 /// # Errors
142 ///
143 /// Returns [`ExecutionError::ThreadPool`] if pool creation fails.
144 pub fn new(num_threads: Option<usize>) -> Result<Self, ExecutionError> {
145 let mut builder = rayon::ThreadPoolBuilder::new();
146
147 if let Some(threads) = num_threads {
148 if threads == 0 {
149 return Err(ExecutionError::ThreadPool(
150 "Thread count must be > 0".to_string(),
151 ));
152 }
153 builder = builder.num_threads(threads);
154 }
155
156 let thread_pool = builder.build().map_err(|e| {
157 ExecutionError::ThreadPool(format!("Failed to create thread pool: {}", e))
158 })?;
159
160 Ok(Self { thread_pool })
161 }
162}
163
164#[cfg(feature = "parallel")]
165#[async_trait]
166impl ConcurrencyExecutor for RayonExecutor {
167 async fn execute_batch<F, T>(
168 &self,
169 items: Vec<T>,
170 op: F,
171 ) -> Result<Vec<Result<(), ExecutionError>>, ExecutionError>
172 where
173 F: Fn(T) -> Result<(), ExecutionError> + Send + Sync + 'static,
174 T: Send + 'static,
175 {
176 // Wrap operation for thread safety
177 let op = Arc::new(op);
178
179 // Process items in parallel using Rayon
180 let results = self.thread_pool.install(|| {
181 use rayon::prelude::*;
182 items
183 .into_par_iter()
184 .map(|item| op(item))
185 .collect::<Vec<_>>()
186 });
187
188 Ok(results)
189 }
190
191 fn name(&self) -> &str {
192 "rayon"
193 }
194}
195
196// ============================================================================
197// Tokio Executor (I/O-bound concurrency, always available)
198// ============================================================================
199
200/// Async I/O executor using tokio tasks with concurrency limit.
201///
202/// Optimized for I/O-bound operations (network, disk, async operations).
203/// Available in all deployments (tokio is standard dependency).
204#[derive(Debug)]
205pub struct TokioExecutor {
206 max_concurrent: usize,
207}
208
209impl TokioExecutor {
210 /// Create new Tokio executor with concurrency limit.
211 ///
212 /// # Arguments
213 ///
214 /// * `max_concurrent` - Maximum number of concurrent async tasks
215 pub fn new(max_concurrent: usize) -> Self {
216 Self { max_concurrent }
217 }
218}
219
220#[async_trait]
221impl ConcurrencyExecutor for TokioExecutor {
222 async fn execute_batch<F, T>(
223 &self,
224 items: Vec<T>,
225 op: F,
226 ) -> Result<Vec<Result<(), ExecutionError>>, ExecutionError>
227 where
228 F: Fn(T) -> Result<(), ExecutionError> + Send + Sync + 'static,
229 T: Send + 'static,
230 {
231 use tokio::sync::Semaphore;
232 use tokio::task;
233
234 // Semaphore for concurrency control
235 let semaphore = Arc::new(Semaphore::new(self.max_concurrent));
236 let op = Arc::new(op);
237
238 // Spawn tasks with concurrency limit
239 let mut handles = Vec::with_capacity(items.len());
240 for item in items {
241 let permit = semaphore.clone().acquire_owned().await.map_err(|e| {
242 ExecutionError::Join(format!("Semaphore acquisition failed: {}", e))
243 })?;
244
245 let op = Arc::clone(&op);
246 let handle = task::spawn_blocking(move || {
247 let result = op(item);
248 drop(permit); // Release permit
249 result
250 });
251
252 handles.push(handle);
253 }
254
255 // Collect results in order
256 let mut results = Vec::with_capacity(handles.len());
257 for handle in handles {
258 let result = handle
259 .await
260 .map_err(|e| ExecutionError::Join(format!("Task join failed: {}", e)))?;
261 results.push(result);
262 }
263
264 Ok(results)
265 }
266
267 fn name(&self) -> &str {
268 "tokio"
269 }
270}
271
272// ============================================================================
273// Sequential Executor (Single-threaded fallback)
274// ============================================================================
275
276/// Sequential executor processing items one at a time.
277///
278/// Fallback executor when parallelism is unavailable or undesired.
279/// Always available regardless of feature flags.
280#[derive(Debug)]
281pub struct SequentialExecutor;
282
283#[async_trait]
284impl ConcurrencyExecutor for SequentialExecutor {
285 async fn execute_batch<F, T>(
286 &self,
287 items: Vec<T>,
288 op: F,
289 ) -> Result<Vec<Result<(), ExecutionError>>, ExecutionError>
290 where
291 F: Fn(T) -> Result<(), ExecutionError> + Send + Sync + 'static,
292 T: Send + 'static,
293 {
294 // Process items sequentially
295 let results = items.into_iter().map(op).collect();
296 Ok(results)
297 }
298
299 fn name(&self) -> &str {
300 "sequential"
301 }
302}
303
304// ============================================================================
305// Factory Pattern
306// ============================================================================
307
308/// Unified executor enum combining all concurrency strategies.
309///
310/// Wraps different executor implementations in a single enum for type-safe usage.
311/// Automatically routes to appropriate implementation based on configuration.
312#[derive(Debug)]
313pub enum Executor {
314 /// Sequential executor (always available).
315 Sequential(SequentialExecutor),
316
317 /// Tokio async executor (always available).
318 Tokio(TokioExecutor),
319
320 /// Rayon parallel executor (requires `parallel` feature).
321 #[cfg(feature = "parallel")]
322 Rayon(RayonExecutor),
323}
324
325impl Executor {
326 /// Create Sequential executor.
327 pub fn sequential() -> Self {
328 Self::Sequential(SequentialExecutor)
329 }
330
331 /// Create Tokio executor with concurrency limit.
332 pub fn tokio(max_concurrent: usize) -> Self {
333 Self::Tokio(TokioExecutor::new(max_concurrent))
334 }
335
336 /// Create Rayon executor with optional thread count (requires `parallel` feature).
337 #[cfg(feature = "parallel")]
338 pub fn rayon(num_threads: Option<usize>) -> Result<Self, ExecutionError> {
339 RayonExecutor::new(num_threads).map(Self::Rayon)
340 }
341
342 /// Get executor implementation name for debugging.
343 pub fn name(&self) -> &str {
344 match self {
345 Self::Sequential(_) => "sequential",
346 Self::Tokio(_) => "tokio",
347 #[cfg(feature = "parallel")]
348 Self::Rayon(_) => "rayon",
349 }
350 }
351
352 /// Execute operation on batch of items concurrently.
353 ///
354 /// Returns vector of results in same order as input items.
355 /// Individual item failures don't stop processing of other items.
356 pub async fn execute_batch<F, T>(
357 &self,
358 items: Vec<T>,
359 op: F,
360 ) -> Result<Vec<Result<(), ExecutionError>>, ExecutionError>
361 where
362 F: Fn(T) -> Result<(), ExecutionError> + Send + Sync + 'static,
363 T: Send + 'static,
364 {
365 match self {
366 Self::Sequential(exec) => exec.execute_batch(items, op).await,
367 Self::Tokio(exec) => exec.execute_batch(items, op).await,
368 #[cfg(feature = "parallel")]
369 Self::Rayon(exec) => exec.execute_batch(items, op).await,
370 }
371 }
372}
373
374/// Concurrency mode selection for executor factory.
375#[derive(Debug, Clone)]
376pub enum ConcurrencyMode {
377 /// Rayon parallel executor (requires `parallel` feature).
378 Rayon { num_threads: Option<usize> },
379
380 /// Tokio async executor (always available).
381 Tokio { max_concurrent: usize },
382
383 /// Sequential fallback executor.
384 Sequential,
385}
386
387/// Create executor instance based on mode and available features.
388///
389/// Automatically falls back to Sequential when requested mode unavailable.
390///
391/// # Arguments
392///
393/// * `mode` - Desired concurrency mode
394///
395/// # Returns
396///
397/// Executor enum instance ready for use.
398///
399/// # Examples
400///
401/// ```rust
402/// use thread_flow::incremental::concurrency::{
403/// create_executor, ConcurrencyMode,
404/// };
405///
406/// # async fn example() {
407/// // Request Rayon (falls back to Sequential if `parallel` feature disabled)
408/// let executor = create_executor(ConcurrencyMode::Rayon { num_threads: Some(4) });
409///
410/// // Tokio always available
411/// let executor = create_executor(ConcurrencyMode::Tokio { max_concurrent: 10 });
412/// # }
413/// ```
414pub fn create_executor(mode: ConcurrencyMode) -> Executor {
415 match mode {
416 #[cfg(feature = "parallel")]
417 ConcurrencyMode::Rayon { num_threads } => {
418 match Executor::rayon(num_threads) {
419 Ok(executor) => executor,
420 Err(_) => {
421 // Fall back to Sequential on Rayon initialization failure
422 Executor::sequential()
423 }
424 }
425 }
426
427 #[cfg(not(feature = "parallel"))]
428 ConcurrencyMode::Rayon { .. } => {
429 // Graceful degradation when `parallel` feature disabled
430 Executor::sequential()
431 }
432
433 ConcurrencyMode::Tokio { max_concurrent } => Executor::tokio(max_concurrent),
434
435 ConcurrencyMode::Sequential => Executor::sequential(),
436 }
437}
438
439#[cfg(test)]
440mod tests {
441 use super::*;
442
443 #[tokio::test]
444 async fn test_sequential_basic() {
445 let executor = SequentialExecutor;
446 let items = vec![1, 2, 3];
447 let results = executor.execute_batch(items, |_| Ok(())).await.unwrap();
448
449 assert_eq!(results.len(), 3);
450 assert!(results.iter().all(|r| r.is_ok()));
451 }
452
453 #[tokio::test]
454 async fn test_tokio_basic() {
455 let executor = TokioExecutor::new(2);
456 let items = vec![1, 2, 3];
457 let results = executor.execute_batch(items, |_| Ok(())).await.unwrap();
458
459 assert_eq!(results.len(), 3);
460 assert!(results.iter().all(|r| r.is_ok()));
461 }
462
463 #[cfg(feature = "parallel")]
464 #[tokio::test]
465 async fn test_rayon_basic() {
466 let executor = RayonExecutor::new(None).unwrap();
467 let items = vec![1, 2, 3];
468 let results = executor.execute_batch(items, |_| Ok(())).await.unwrap();
469
470 assert_eq!(results.len(), 3);
471 assert!(results.iter().all(|r| r.is_ok()));
472 }
473
474 #[test]
475 fn test_factory_sequential() {
476 let executor = create_executor(ConcurrencyMode::Sequential);
477 assert_eq!(executor.name(), "sequential");
478 }
479
480 #[test]
481 fn test_factory_tokio() {
482 let executor = create_executor(ConcurrencyMode::Tokio { max_concurrent: 5 });
483 assert_eq!(executor.name(), "tokio");
484 }
485
486 #[cfg(feature = "parallel")]
487 #[test]
488 fn test_factory_rayon() {
489 let executor = create_executor(ConcurrencyMode::Rayon { num_threads: None });
490 assert_eq!(executor.name(), "rayon");
491 }
492
493 #[cfg(not(feature = "parallel"))]
494 #[test]
495 fn test_factory_rayon_fallback() {
496 let executor = create_executor(ConcurrencyMode::Rayon { num_threads: None });
497 // Falls back to sequential when parallel feature disabled
498 assert_eq!(executor.name(), "sequential");
499 }
500}