guardy 0.2.4

Fast, secure git hooks in Rust with secret scanning and protected file synchronization
Documentation
//! Parallel execution and global scan counters

use std::path::PathBuf;
use std::sync::Arc;
// AtomicUsize and Ordering moved to counters module

use anyhow::Result;
use crossbeam::channel::{Receiver, Sender, bounded};

/// Run work items in parallel using worker threads with optimized configuration
///
/// Workers process files and stream output directly - no accumulation.
pub fn run<F>(work_items: Vec<PathBuf>, processor: F, num_workers: usize) -> Result<()>
where
    F: Fn(&PathBuf) + Send + Sync + 'static,
{
    if work_items.is_empty() {
        return Ok(());
    }

    let actual_workers = std::cmp::min(num_workers, work_items.len());

    // Use configurable channel buffer size for better performance
    let buffer_multiplier = crate::config::CONFIG.scanner.channel_buffer_multiplier as usize;
    let buffer_size = (actual_workers * buffer_multiplier).max(64);
    let (work_tx, work_rx): (Sender<PathBuf>, Receiver<PathBuf>) = bounded(buffer_size);
    let processor = Arc::new(processor);

    // Get stack size from config (convert MB to bytes)
    let stack_size_bytes = crate::config::CONFIG.scanner.stack_size_mb as usize * 1024 * 1024;

    tracing::debug!(
        "Starting {} workers with {}MB stack size, channel buffer: {}",
        actual_workers,
        crate::config::CONFIG.scanner.stack_size_mb,
        buffer_size
    );

    crossbeam::thread::scope(|s| -> Result<()> {
        // Spawn worker threads with custom stack size
        for worker_id in 0..actual_workers {
            let work_rx = work_rx.clone();
            let processor = processor.clone();

            s.builder()
                .name(format!("scanner-{worker_id:02}"))
                .stack_size(stack_size_bytes)
                .spawn(move |_| {
                    tracing::debug!("Worker {} started", worker_id);
                    let mut file_count = 0;
                    while let Ok(work_item) = work_rx.recv() {
                        file_count += 1;
                        // Memory check before processing
                        if file_count % 100 == 0
                            && let Ok(status) = std::fs::read_to_string("/proc/self/status")
                            && let Some(line) = status.lines().find(|l| l.starts_with("VmRSS:"))
                        {
                            tracing::debug!(
                                "Worker {} after {} files: {}",
                                worker_id,
                                file_count,
                                line
                            );
                        }
                        processor(&work_item);
                    }
                    tracing::debug!("Worker {} finished after {} files", worker_id, file_count);
                })
                .map_err(|e| {
                    anyhow::anyhow!("Failed to spawn worker thread {}: {}", worker_id, e)
                })?;
        }

        // Producer thread also needs custom stack size to handle large file lists
        s.builder()
            .name("guardy-producer".to_string())
            .stack_size(stack_size_bytes)
            .spawn(move |_| {
                for item in work_items {
                    if work_tx.send(item).is_err() {
                        break; // Workers dropped
                    }
                }
            })
            .map_err(|e| anyhow::anyhow!("Failed to spawn producer thread: {}", e))?;

        Ok(())
    })
    .map_err(|_| anyhow::anyhow!("Worker thread panicked"))?
}