Skip to main content

thread_flow/
batch.rs

1// SPDX-FileCopyrightText: 2025 Knitli Inc. <knitli@knit.li>
2// SPDX-License-Identifier: AGPL-3.0-or-later
3
4//! Batch file processing with optional parallel execution
5//!
6//! This module provides utilities for processing multiple files efficiently:
7//! - **CLI builds** (default): Uses rayon for CPU parallelism across cores
8//! - **Worker builds**: Falls back to sequential processing (no threads in edge)
9//!
10//! ## Feature Gating
11//!
12//! Parallel processing is controlled by the `parallel` feature flag:
13//! - **Enabled** (default): Multi-core parallel processing via rayon
14//! - **Disabled** (worker): Single-threaded sequential processing
15//!
16//! ## Usage
17//!
18//! ```rust,ignore
19//! use thread_flow::batch::process_files_batch;
20//!
21//! let results = process_files_batch(&file_paths, |path| {
22//!     // Process each file
23//!     analyze_file(path)
24//! });
25//! ```
26//!
27//! ## Performance Characteristics
28//!
29//! | Target | Concurrency | 100 Files | 1000 Files |
30//! |--------|-------------|-----------|------------|
31//! | CLI (4 cores) | Parallel | ~0.4s | ~4s |
32//! | CLI (1 core) | Sequential | ~1.6s | ~16s |
33//! | Worker | Sequential | ~1.6s | ~16s |
34//!
35//! **Speedup**: 2-4x on multi-core systems (linear with core count)
36
37use std::path::Path;
38
39/// Process multiple files in batch with optional parallelism
40///
41/// # Parallel Processing (CLI builds)
42///
43/// When the `parallel` feature is enabled (default), this function uses rayon
44/// to process files across multiple CPU cores. The number of threads is
45/// automatically determined by rayon based on available cores.
46///
47/// # Sequential Processing (Worker builds)
48///
49/// When the `parallel` feature is disabled (e.g., for Cloudflare Workers),
50/// files are processed sequentially in a single thread. This avoids
51/// SharedArrayBuffer requirements and ensures compatibility with edge runtimes.
52///
53/// # Example
54///
55/// ```rust,ignore
56/// let paths = vec![
57///     PathBuf::from("src/main.rs"),
58///     PathBuf::from("src/lib.rs"),
59/// ];
60///
61/// let results = process_files_batch(&paths, |path| {
62///     std::fs::read_to_string(path).unwrap()
63/// });
64/// ```
65pub fn process_files_batch<P, F, R>(paths: &[P], processor: F) -> Vec<R>
66where
67    P: AsRef<Path> + Sync,
68    F: Fn(&Path) -> R + Sync + Send,
69    R: Send,
70{
71    #[cfg(feature = "parallel")]
72    {
73        // Parallel processing using rayon (CLI builds)
74        use rayon::prelude::*;
75        paths.par_iter().map(|p| processor(p.as_ref())).collect()
76    }
77
78    #[cfg(not(feature = "parallel"))]
79    {
80        // Sequential processing (Worker builds)
81        paths.iter().map(|p| processor(p.as_ref())).collect()
82    }
83}
84
85/// Process multiple items in batch with optional parallelism
86///
87/// Generic version of `process_files_batch` that works with any slice of items.
88///
89/// # Example
90///
91/// ```rust,ignore
92/// let fingerprints = vec!["abc123", "def456", "ghi789"];
93///
94/// let results = process_batch(&fingerprints, |fp| {
95///     database.query_by_fingerprint(fp)
96/// });
97/// ```
98pub fn process_batch<T, F, R>(items: &[T], processor: F) -> Vec<R>
99where
100    T: Sync,
101    F: Fn(&T) -> R + Sync + Send,
102    R: Send,
103{
104    #[cfg(feature = "parallel")]
105    {
106        use rayon::prelude::*;
107        items.par_iter().map(processor).collect()
108    }
109
110    #[cfg(not(feature = "parallel"))]
111    {
112        items.iter().map(|item| processor(item)).collect()
113    }
114}
115
116/// Try to process multiple files in batch, collecting errors
117///
118/// This version collects both successes and errors, allowing partial batch
119/// processing to succeed even if some files fail.
120///
121/// # Returns
122///
123/// A vector of `Result<R, E>` where each element corresponds to the processing
124/// result for the file at the same index in the input slice.
125pub fn try_process_files_batch<P, F, R, E>(paths: &[P], processor: F) -> Vec<Result<R, E>>
126where
127    P: AsRef<Path> + Sync,
128    F: Fn(&Path) -> Result<R, E> + Sync + Send,
129    R: Send,
130    E: Send,
131{
132    #[cfg(feature = "parallel")]
133    {
134        use rayon::prelude::*;
135        paths.par_iter().map(|p| processor(p.as_ref())).collect()
136    }
137
138    #[cfg(not(feature = "parallel"))]
139    {
140        paths.iter().map(|p| processor(p.as_ref())).collect()
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147    use std::path::PathBuf;
148
149    #[test]
150    fn test_process_batch_simple() {
151        let numbers = vec![1, 2, 3, 4, 5];
152        let results = process_batch(&numbers, |n| n * 2);
153        assert_eq!(results, vec![2, 4, 6, 8, 10]);
154    }
155
156    #[test]
157    fn test_process_files_batch() {
158        let paths = vec![
159            PathBuf::from("file1.txt"),
160            PathBuf::from("file2.txt"),
161            PathBuf::from("file3.txt"),
162        ];
163
164        let results = process_files_batch(&paths, |path| {
165            path.file_name()
166                .and_then(|s| s.to_str())
167                .unwrap_or("unknown")
168                .to_string()
169        });
170
171        assert_eq!(results, vec!["file1.txt", "file2.txt", "file3.txt"]);
172    }
173
174    #[test]
175    fn test_try_process_files_batch_with_errors() {
176        let paths = vec![
177            PathBuf::from("good1.txt"),
178            PathBuf::from("bad.txt"),
179            PathBuf::from("good2.txt"),
180        ];
181
182        let results = try_process_files_batch(&paths, |path| {
183            let name = path
184                .file_name()
185                .and_then(|s| s.to_str())
186                .ok_or("invalid path")?;
187
188            if name.starts_with("bad") {
189                Err("processing failed")
190            } else {
191                Ok(name.to_string())
192            }
193        });
194
195        assert!(results[0].is_ok());
196        assert!(results[1].is_err());
197        assert!(results[2].is_ok());
198    }
199
200    #[cfg(feature = "parallel")]
201    #[test]
202    fn test_parallel_feature_enabled() {
203        // This test only runs when parallel feature is enabled
204        let items: Vec<i32> = (0..100).collect();
205        let results = process_batch(&items, |n| n * n);
206        assert_eq!(results.len(), 100);
207        assert_eq!(results[10], 100);
208    }
209}