thread-flow 0.1.0

Thread dataflow integration for data processing pipelines, using CocoIndex.
Documentation
// SPDX-FileCopyrightText: 2025 Knitli Inc. <knitli@knit.li>
// SPDX-License-Identifier: AGPL-3.0-or-later

//! Batch file processing with optional parallel execution
//!
//! This module provides utilities for processing multiple files efficiently:
//! - **CLI builds** (default): Uses rayon for CPU parallelism across cores
//! - **Worker builds**: Falls back to sequential processing (no threads in edge)
//!
//! ## Feature Gating
//!
//! Parallel processing is controlled by the `parallel` feature flag:
//! - **Enabled** (default): Multi-core parallel processing via rayon
//! - **Disabled** (worker): Single-threaded sequential processing
//!
//! ## Usage
//!
//! ```rust,ignore
//! use thread_flow::batch::process_files_batch;
//!
//! let results = process_files_batch(&file_paths, |path| {
//!     // Process each file
//!     analyze_file(path)
//! });
//! ```
//!
//! ## Performance Characteristics
//!
//! | Target | Concurrency | 100 Files | 1000 Files |
//! |--------|-------------|-----------|------------|
//! | CLI (4 cores) | Parallel | ~0.4s | ~4s |
//! | CLI (1 core) | Sequential | ~1.6s | ~16s |
//! | Worker | Sequential | ~1.6s | ~16s |
//!
//! **Speedup**: 2-4x on multi-core systems (linear with core count)

use std::path::Path;

/// Process multiple files in batch with optional parallelism
///
/// # Parallel Processing (CLI builds)
///
/// When the `parallel` feature is enabled (default), this function uses rayon
/// to process files across multiple CPU cores. The number of threads is
/// automatically determined by rayon based on available cores.
///
/// # Sequential Processing (Worker builds)
///
/// When the `parallel` feature is disabled (e.g., for Cloudflare Workers),
/// files are processed sequentially in a single thread. This avoids
/// SharedArrayBuffer requirements and ensures compatibility with edge runtimes.
///
/// # Example
///
/// ```rust,ignore
/// let paths = vec![
///     PathBuf::from("src/main.rs"),
///     PathBuf::from("src/lib.rs"),
/// ];
///
/// let results = process_files_batch(&paths, |path| {
///     std::fs::read_to_string(path).unwrap()
/// });
/// ```
pub fn process_files_batch<P, F, R>(paths: &[P], processor: F) -> Vec<R>
where
    P: AsRef<Path> + Sync,
    F: Fn(&Path) -> R + Sync + Send,
    R: Send,
{
    #[cfg(feature = "parallel")]
    {
        // Parallel processing using rayon (CLI builds)
        use rayon::prelude::*;
        paths.par_iter().map(|p| processor(p.as_ref())).collect()
    }

    #[cfg(not(feature = "parallel"))]
    {
        // Sequential processing (Worker builds)
        paths.iter().map(|p| processor(p.as_ref())).collect()
    }
}

/// Process multiple items in batch with optional parallelism
///
/// Generic version of `process_files_batch` that works with any slice of items.
///
/// # Example
///
/// ```rust,ignore
/// let fingerprints = vec!["abc123", "def456", "ghi789"];
///
/// let results = process_batch(&fingerprints, |fp| {
///     database.query_by_fingerprint(fp)
/// });
/// ```
pub fn process_batch<T, F, R>(items: &[T], processor: F) -> Vec<R>
where
    T: Sync,
    F: Fn(&T) -> R + Sync + Send,
    R: Send,
{
    #[cfg(feature = "parallel")]
    {
        use rayon::prelude::*;
        items.par_iter().map(processor).collect()
    }

    #[cfg(not(feature = "parallel"))]
    {
        items.iter().map(|item| processor(item)).collect()
    }
}

/// Try to process multiple files in batch, collecting errors
///
/// This version collects both successes and errors, allowing partial batch
/// processing to succeed even if some files fail.
///
/// # Returns
///
/// A vector of `Result<R, E>` where each element corresponds to the processing
/// result for the file at the same index in the input slice.
pub fn try_process_files_batch<P, F, R, E>(paths: &[P], processor: F) -> Vec<Result<R, E>>
where
    P: AsRef<Path> + Sync,
    F: Fn(&Path) -> Result<R, E> + Sync + Send,
    R: Send,
    E: Send,
{
    #[cfg(feature = "parallel")]
    {
        use rayon::prelude::*;
        paths.par_iter().map(|p| processor(p.as_ref())).collect()
    }

    #[cfg(not(feature = "parallel"))]
    {
        paths.iter().map(|p| processor(p.as_ref())).collect()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::path::PathBuf;

    #[test]
    fn test_process_batch_simple() {
        let numbers = vec![1, 2, 3, 4, 5];
        let results = process_batch(&numbers, |n| n * 2);
        assert_eq!(results, vec![2, 4, 6, 8, 10]);
    }

    #[test]
    fn test_process_files_batch() {
        let paths = vec![
            PathBuf::from("file1.txt"),
            PathBuf::from("file2.txt"),
            PathBuf::from("file3.txt"),
        ];

        let results = process_files_batch(&paths, |path| {
            path.file_name()
                .and_then(|s| s.to_str())
                .unwrap_or("unknown")
                .to_string()
        });

        assert_eq!(results, vec!["file1.txt", "file2.txt", "file3.txt"]);
    }

    #[test]
    fn test_try_process_files_batch_with_errors() {
        let paths = vec![
            PathBuf::from("good1.txt"),
            PathBuf::from("bad.txt"),
            PathBuf::from("good2.txt"),
        ];

        let results = try_process_files_batch(&paths, |path| {
            let name = path
                .file_name()
                .and_then(|s| s.to_str())
                .ok_or("invalid path")?;

            if name.starts_with("bad") {
                Err("processing failed")
            } else {
                Ok(name.to_string())
            }
        });

        assert!(results[0].is_ok());
        assert!(results[1].is_err());
        assert!(results[2].is_ok());
    }

    #[cfg(feature = "parallel")]
    #[test]
    fn test_parallel_feature_enabled() {
        // This test only runs when parallel feature is enabled
        let items: Vec<i32> = (0..100).collect();
        let results = process_batch(&items, |n| n * n);
        assert_eq!(results.len(), 100);
        assert_eq!(results[10], 100);
    }
}