Skip to main content

thread_flow/incremental/
concurrency.rs

1// SPDX-FileCopyrightText: 2025 Knitli Inc. <knitli@knit.li>
2// SPDX-License-Identifier: AGPL-3.0-or-later
3
4//! Concurrency abstraction layer for incremental analysis.
5//!
6//! Provides unified interface for parallel execution across different deployment targets:
7//! - **RayonExecutor**: CPU-bound parallelism for CLI (multi-core)
8//! - **TokioExecutor**: Async I/O concurrency for all deployments
9//! - **SequentialExecutor**: Fallback for single-threaded execution
10//!
11//! ## Architecture
12//!
13//! The concurrency layer adapts to deployment context via feature flags:
14//! - CLI with `parallel` feature: Rayon for CPU-bound work
15//! - All deployments: tokio for async I/O operations
16//! - Fallback: Sequential execution when parallelism unavailable
17//!
18//! ## Examples
19//!
20//! ### Basic Usage
21//!
22//! ```rust
23//! use thread_flow::incremental::concurrency::{
24//!     create_executor, ConcurrencyMode, ExecutionError,
25//! };
26//!
27//! # async fn example() -> Result<(), ExecutionError> {
28//! // Create executor for current deployment
29//! let executor = create_executor(ConcurrencyMode::Tokio { max_concurrent: 10 });
30//!
31//! // Process batch of items
32//! let items = vec![1, 2, 3, 4, 5];
33//! let results = executor.execute_batch(items, |n| {
34//!     // Your work here
35//!     Ok(())
36//! }).await?;
37//!
38//! assert_eq!(results.len(), 5);
39//! # Ok(())
40//! # }
41//! ```
42//!
43//! ### Feature-Aware Execution
44//!
45//! ```rust
46//! use thread_flow::incremental::concurrency::{
47//!     create_executor, ConcurrencyMode,
48//! };
49//!
50//! # async fn example() {
51//! // Automatically uses best executor for current build
52//! #[cfg(feature = "parallel")]
53//! let executor = create_executor(ConcurrencyMode::Rayon { num_threads: None });
54//!
55//! #[cfg(not(feature = "parallel"))]
56//! let executor = create_executor(ConcurrencyMode::Tokio { max_concurrent: 10 });
57//! # }
58//! ```
59
60use async_trait::async_trait;
61use std::sync::Arc;
62use thiserror::Error;
63
64/// Errors that can occur during batch execution.
65#[derive(Debug, Error)]
66pub enum ExecutionError {
67    /// Generic execution failure with description.
68    #[error("Execution failed: {0}")]
69    Failed(String),
70
71    /// Thread pool creation or management error.
72    #[error("Thread pool error: {0}")]
73    ThreadPool(String),
74
75    /// Task join or coordination error.
76    #[error("Task join error: {0}")]
77    Join(String),
78}
79
80/// Unified interface for concurrent batch execution.
81///
82/// Implementations provide different parallelism strategies:
83/// - **Rayon**: CPU-bound parallelism (multi-threaded)
84/// - **Tokio**: I/O-bound concurrency (async tasks)
85/// - **Sequential**: Single-threaded fallback
86#[async_trait]
87pub trait ConcurrencyExecutor: Send + Sync {
88    /// Execute operation on batch of items concurrently.
89    ///
90    /// Returns vector of results in same order as input items.
91    /// Individual item failures don't stop processing of other items.
92    ///
93    /// # Arguments
94    ///
95    /// * `items` - Batch of items to process
96    /// * `op` - Operation to apply to each item
97    ///
98    /// # Returns
99    ///
100    /// Vector of results for each item. Length matches input items.
101    ///
102    /// # Errors
103    ///
104    /// Returns error if batch execution infrastructure fails.
105    /// Individual item failures are captured in result vector.
106    async fn execute_batch<F, T>(
107        &self,
108        items: Vec<T>,
109        op: F,
110    ) -> Result<Vec<Result<(), ExecutionError>>, ExecutionError>
111    where
112        F: Fn(T) -> Result<(), ExecutionError> + Send + Sync + 'static,
113        T: Send + 'static;
114
115    /// Get executor implementation name for debugging.
116    fn name(&self) -> &str;
117}
118
119// ============================================================================
120// Rayon Executor (CPU-bound parallelism, CLI only)
121// ============================================================================
122
123#[cfg(feature = "parallel")]
124/// CPU-bound parallel executor using Rayon thread pool.
125///
126/// Optimized for multi-core CLI deployments processing independent items.
127/// Not available in edge deployments (no `parallel` feature).
128#[derive(Debug)]
129pub struct RayonExecutor {
130    thread_pool: rayon::ThreadPool,
131}
132
133#[cfg(feature = "parallel")]
134impl RayonExecutor {
135    /// Create new Rayon executor with optional thread count.
136    ///
137    /// # Arguments
138    ///
139    /// * `num_threads` - Optional thread count (None = use all cores)
140    ///
141    /// # Errors
142    ///
143    /// Returns [`ExecutionError::ThreadPool`] if pool creation fails.
144    pub fn new(num_threads: Option<usize>) -> Result<Self, ExecutionError> {
145        let mut builder = rayon::ThreadPoolBuilder::new();
146
147        if let Some(threads) = num_threads {
148            if threads == 0 {
149                return Err(ExecutionError::ThreadPool(
150                    "Thread count must be > 0".to_string(),
151                ));
152            }
153            builder = builder.num_threads(threads);
154        }
155
156        let thread_pool = builder.build().map_err(|e| {
157            ExecutionError::ThreadPool(format!("Failed to create thread pool: {}", e))
158        })?;
159
160        Ok(Self { thread_pool })
161    }
162}
163
164#[cfg(feature = "parallel")]
165#[async_trait]
166impl ConcurrencyExecutor for RayonExecutor {
167    async fn execute_batch<F, T>(
168        &self,
169        items: Vec<T>,
170        op: F,
171    ) -> Result<Vec<Result<(), ExecutionError>>, ExecutionError>
172    where
173        F: Fn(T) -> Result<(), ExecutionError> + Send + Sync + 'static,
174        T: Send + 'static,
175    {
176        // Wrap operation for thread safety
177        let op = Arc::new(op);
178
179        // Process items in parallel using Rayon
180        let results = self.thread_pool.install(|| {
181            use rayon::prelude::*;
182            items
183                .into_par_iter()
184                .map(|item| op(item))
185                .collect::<Vec<_>>()
186        });
187
188        Ok(results)
189    }
190
191    fn name(&self) -> &str {
192        "rayon"
193    }
194}
195
196// ============================================================================
197// Tokio Executor (I/O-bound concurrency, always available)
198// ============================================================================
199
200/// Async I/O executor using tokio tasks with concurrency limit.
201///
202/// Optimized for I/O-bound operations (network, disk, async operations).
203/// Available in all deployments (tokio is standard dependency).
204#[derive(Debug)]
205pub struct TokioExecutor {
206    max_concurrent: usize,
207}
208
209impl TokioExecutor {
210    /// Create new Tokio executor with concurrency limit.
211    ///
212    /// # Arguments
213    ///
214    /// * `max_concurrent` - Maximum number of concurrent async tasks
215    pub fn new(max_concurrent: usize) -> Self {
216        Self { max_concurrent }
217    }
218}
219
220#[async_trait]
221impl ConcurrencyExecutor for TokioExecutor {
222    async fn execute_batch<F, T>(
223        &self,
224        items: Vec<T>,
225        op: F,
226    ) -> Result<Vec<Result<(), ExecutionError>>, ExecutionError>
227    where
228        F: Fn(T) -> Result<(), ExecutionError> + Send + Sync + 'static,
229        T: Send + 'static,
230    {
231        use tokio::sync::Semaphore;
232        use tokio::task;
233
234        // Semaphore for concurrency control
235        let semaphore = Arc::new(Semaphore::new(self.max_concurrent));
236        let op = Arc::new(op);
237
238        // Spawn tasks with concurrency limit
239        let mut handles = Vec::with_capacity(items.len());
240        for item in items {
241            let permit = semaphore.clone().acquire_owned().await.map_err(|e| {
242                ExecutionError::Join(format!("Semaphore acquisition failed: {}", e))
243            })?;
244
245            let op = Arc::clone(&op);
246            let handle = task::spawn_blocking(move || {
247                let result = op(item);
248                drop(permit); // Release permit
249                result
250            });
251
252            handles.push(handle);
253        }
254
255        // Collect results in order
256        let mut results = Vec::with_capacity(handles.len());
257        for handle in handles {
258            let result = handle
259                .await
260                .map_err(|e| ExecutionError::Join(format!("Task join failed: {}", e)))?;
261            results.push(result);
262        }
263
264        Ok(results)
265    }
266
267    fn name(&self) -> &str {
268        "tokio"
269    }
270}
271
272// ============================================================================
273// Sequential Executor (Single-threaded fallback)
274// ============================================================================
275
276/// Sequential executor processing items one at a time.
277///
278/// Fallback executor when parallelism is unavailable or undesired.
279/// Always available regardless of feature flags.
280#[derive(Debug)]
281pub struct SequentialExecutor;
282
283#[async_trait]
284impl ConcurrencyExecutor for SequentialExecutor {
285    async fn execute_batch<F, T>(
286        &self,
287        items: Vec<T>,
288        op: F,
289    ) -> Result<Vec<Result<(), ExecutionError>>, ExecutionError>
290    where
291        F: Fn(T) -> Result<(), ExecutionError> + Send + Sync + 'static,
292        T: Send + 'static,
293    {
294        // Process items sequentially
295        let results = items.into_iter().map(op).collect();
296        Ok(results)
297    }
298
299    fn name(&self) -> &str {
300        "sequential"
301    }
302}
303
304// ============================================================================
305// Factory Pattern
306// ============================================================================
307
308/// Unified executor enum combining all concurrency strategies.
309///
310/// Wraps different executor implementations in a single enum for type-safe usage.
311/// Automatically routes to appropriate implementation based on configuration.
312#[derive(Debug)]
313pub enum Executor {
314    /// Sequential executor (always available).
315    Sequential(SequentialExecutor),
316
317    /// Tokio async executor (always available).
318    Tokio(TokioExecutor),
319
320    /// Rayon parallel executor (requires `parallel` feature).
321    #[cfg(feature = "parallel")]
322    Rayon(RayonExecutor),
323}
324
325impl Executor {
326    /// Create Sequential executor.
327    pub fn sequential() -> Self {
328        Self::Sequential(SequentialExecutor)
329    }
330
331    /// Create Tokio executor with concurrency limit.
332    pub fn tokio(max_concurrent: usize) -> Self {
333        Self::Tokio(TokioExecutor::new(max_concurrent))
334    }
335
336    /// Create Rayon executor with optional thread count (requires `parallel` feature).
337    #[cfg(feature = "parallel")]
338    pub fn rayon(num_threads: Option<usize>) -> Result<Self, ExecutionError> {
339        RayonExecutor::new(num_threads).map(Self::Rayon)
340    }
341
342    /// Get executor implementation name for debugging.
343    pub fn name(&self) -> &str {
344        match self {
345            Self::Sequential(_) => "sequential",
346            Self::Tokio(_) => "tokio",
347            #[cfg(feature = "parallel")]
348            Self::Rayon(_) => "rayon",
349        }
350    }
351
352    /// Execute operation on batch of items concurrently.
353    ///
354    /// Returns vector of results in same order as input items.
355    /// Individual item failures don't stop processing of other items.
356    pub async fn execute_batch<F, T>(
357        &self,
358        items: Vec<T>,
359        op: F,
360    ) -> Result<Vec<Result<(), ExecutionError>>, ExecutionError>
361    where
362        F: Fn(T) -> Result<(), ExecutionError> + Send + Sync + 'static,
363        T: Send + 'static,
364    {
365        match self {
366            Self::Sequential(exec) => exec.execute_batch(items, op).await,
367            Self::Tokio(exec) => exec.execute_batch(items, op).await,
368            #[cfg(feature = "parallel")]
369            Self::Rayon(exec) => exec.execute_batch(items, op).await,
370        }
371    }
372}
373
374/// Concurrency mode selection for executor factory.
375#[derive(Debug, Clone)]
376pub enum ConcurrencyMode {
377    /// Rayon parallel executor (requires `parallel` feature).
378    Rayon { num_threads: Option<usize> },
379
380    /// Tokio async executor (always available).
381    Tokio { max_concurrent: usize },
382
383    /// Sequential fallback executor.
384    Sequential,
385}
386
387/// Create executor instance based on mode and available features.
388///
389/// Automatically falls back to Sequential when requested mode unavailable.
390///
391/// # Arguments
392///
393/// * `mode` - Desired concurrency mode
394///
395/// # Returns
396///
397/// Executor enum instance ready for use.
398///
399/// # Examples
400///
401/// ```rust
402/// use thread_flow::incremental::concurrency::{
403///     create_executor, ConcurrencyMode,
404/// };
405///
406/// # async fn example() {
407/// // Request Rayon (falls back to Sequential if `parallel` feature disabled)
408/// let executor = create_executor(ConcurrencyMode::Rayon { num_threads: Some(4) });
409///
410/// // Tokio always available
411/// let executor = create_executor(ConcurrencyMode::Tokio { max_concurrent: 10 });
412/// # }
413/// ```
414pub fn create_executor(mode: ConcurrencyMode) -> Executor {
415    match mode {
416        #[cfg(feature = "parallel")]
417        ConcurrencyMode::Rayon { num_threads } => {
418            match Executor::rayon(num_threads) {
419                Ok(executor) => executor,
420                Err(_) => {
421                    // Fall back to Sequential on Rayon initialization failure
422                    Executor::sequential()
423                }
424            }
425        }
426
427        #[cfg(not(feature = "parallel"))]
428        ConcurrencyMode::Rayon { .. } => {
429            // Graceful degradation when `parallel` feature disabled
430            Executor::sequential()
431        }
432
433        ConcurrencyMode::Tokio { max_concurrent } => Executor::tokio(max_concurrent),
434
435        ConcurrencyMode::Sequential => Executor::sequential(),
436    }
437}
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442
443    #[tokio::test]
444    async fn test_sequential_basic() {
445        let executor = SequentialExecutor;
446        let items = vec![1, 2, 3];
447        let results = executor.execute_batch(items, |_| Ok(())).await.unwrap();
448
449        assert_eq!(results.len(), 3);
450        assert!(results.iter().all(|r| r.is_ok()));
451    }
452
453    #[tokio::test]
454    async fn test_tokio_basic() {
455        let executor = TokioExecutor::new(2);
456        let items = vec![1, 2, 3];
457        let results = executor.execute_batch(items, |_| Ok(())).await.unwrap();
458
459        assert_eq!(results.len(), 3);
460        assert!(results.iter().all(|r| r.is_ok()));
461    }
462
463    #[cfg(feature = "parallel")]
464    #[tokio::test]
465    async fn test_rayon_basic() {
466        let executor = RayonExecutor::new(None).unwrap();
467        let items = vec![1, 2, 3];
468        let results = executor.execute_batch(items, |_| Ok(())).await.unwrap();
469
470        assert_eq!(results.len(), 3);
471        assert!(results.iter().all(|r| r.is_ok()));
472    }
473
474    #[test]
475    fn test_factory_sequential() {
476        let executor = create_executor(ConcurrencyMode::Sequential);
477        assert_eq!(executor.name(), "sequential");
478    }
479
480    #[test]
481    fn test_factory_tokio() {
482        let executor = create_executor(ConcurrencyMode::Tokio { max_concurrent: 5 });
483        assert_eq!(executor.name(), "tokio");
484    }
485
486    #[cfg(feature = "parallel")]
487    #[test]
488    fn test_factory_rayon() {
489        let executor = create_executor(ConcurrencyMode::Rayon { num_threads: None });
490        assert_eq!(executor.name(), "rayon");
491    }
492
493    #[cfg(not(feature = "parallel"))]
494    #[test]
495    fn test_factory_rayon_fallback() {
496        let executor = create_executor(ConcurrencyMode::Rayon { num_threads: None });
497        // Falls back to sequential when parallel feature disabled
498        assert_eq!(executor.name(), "sequential");
499    }
500}