parawalk 0.1.1

Blazing-fast parallel directory walker with zero filtering baggage
Documentation
//! # parawalk
//!
//! Blazing-fast parallel directory walker with zero filtering baggage.
//!
//! Uses a crossbeam-deque work-stealing scheduler — the same pattern as
//! the `ignore` crate's parallel walker, without any gitignore, glob, or
//! hidden-file filtering overhead.
//!
//! # Quick Start
//!
//! ```rust,no_run
//! use parawalk::{walk, WalkConfig, Entry, EntryRef};
//! use std::sync::mpsc;
//!
//! let (tx, rx) = mpsc::channel();
//!
//! walk(
//!     "/usr".into(),
//!     WalkConfig::default(),
//!     None::<fn(&EntryRef<'_>) -> bool>,
//!     move |entry: Entry| { let _ = tx.send(entry); },
//! );
//!
//! let count = rx.into_iter().count();
//! println!("Found {} entries", count);
//! ```

#![forbid(unsafe_code)]

use std::ffi::OsStr;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

use crossbeam_deque::{Injector, Stealer, Worker};

// ---------------------------------------------------------------------------
// Public types
// ---------------------------------------------------------------------------

/// Configuration for a parallel walk.
pub struct WalkConfig {
    /// Number of worker threads. Defaults to logical CPU count.
    pub threads: usize,

    /// Maximum traversal depth. `None` = unlimited.
    pub max_depth: Option<usize>,

    /// Follow symbolic links. Defaults to false.
    pub follow_links: bool,
}

impl Default for WalkConfig {
    fn default() -> Self {
        Self {
            threads: std::thread::available_parallelism()
                .map(|n| n.get())
                .unwrap_or(4),
            max_depth: None,
            follow_links: false,
        }
    }
}

/// A single entry produced during a walk.
pub struct Entry {
    /// Full path to the entry.
    pub path: PathBuf,

    /// What kind of entry this is.
    pub kind: EntryKind,

    /// Depth from the root. Root's children = 1.
    pub depth: usize,
}

/// The kind of a directory entry.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EntryKind {
    File,
    Dir,
    Symlink,
    Other,
}

/// Cheap borrowed view of an entry — available before any PathBuf is allocated.
///
/// Use this in your pre-filter to decide whether to materialize the full path.
pub struct EntryRef<'a> {
    /// Just the filename component — zero allocation.
    pub name: &'a OsStr,

    /// Depth from root.
    pub depth: usize,

    /// Entry kind.
    pub kind: EntryKind,
}

// ---------------------------------------------------------------------------
// Internal job type
// ---------------------------------------------------------------------------

struct DirJob {
    path: PathBuf,
    depth: usize,
}

// ---------------------------------------------------------------------------
// walk()
// ---------------------------------------------------------------------------

/// Walk a directory tree in parallel, calling `visitor` for each entry.
///
/// Only entries that pass the optional `pre_filter` are materialized into
/// full [`Entry`] values and passed to `visitor`. Entries that fail the
/// pre-filter are dropped with zero allocation.
///
/// # Arguments
///
/// * `root` - The directory to walk.
/// * `config` - Walk configuration (threads, depth, symlinks).
/// * `pre_filter` - Optional cheap filter on [`EntryRef`] (filename + kind).
///   Return `true` to materialize and visit the entry, `false` to skip.
/// * `visitor` - Called for each entry that passes the pre-filter.
pub fn walk<F, P>(root: PathBuf, config: WalkConfig, pre_filter: Option<P>, visitor: F)
where
    F: Fn(Entry) + Send + Sync + 'static,
    P: Fn(&EntryRef<'_>) -> bool + Send + Sync + 'static,
{
    let injector = Arc::new(Injector::<DirJob>::new());
    let visitor = Arc::new(visitor);
    let pre_filter: Option<Arc<P>> = pre_filter.map(Arc::new);

    // Seed the root job
    injector.push(DirJob { path: root, depth: 0 });

    let n = config.threads.max(1);
    let max_depth = config.max_depth;
    let follow_links = config.follow_links;

    // Build workers and stealers
    let workers: Vec<Worker<DirJob>> = (0..n).map(|_| Worker::new_lifo()).collect();
    let stealers: Arc<Vec<Stealer<DirJob>>> =
        Arc::new(workers.iter().map(|w| w.stealer()).collect());

    // Active job counter — when it hits zero all threads can exit
    let active = Arc::new(AtomicUsize::new(0));

    std::thread::scope(|s| {
        for worker in workers {
            let injector = Arc::clone(&injector);
            let stealers = Arc::clone(&stealers);
            let visitor = Arc::clone(&visitor);
            let pre_filter = pre_filter.clone();
            let active = Arc::clone(&active);

            s.spawn(move || {
                loop {
                    // Try local first, then steal from others, then global injector
                    let job = worker.pop().or_else(|| {
                        stealers
                            .iter()
                            .find_map(|s| s.steal().success())
                            .or_else(|| injector.steal().success())
                    });

                    match job {
                        Some(job) => {
                            active.fetch_add(1, Ordering::Relaxed);
                            process_dir(
                                job,
                                &worker,
                                &injector,
                                &visitor,
                                pre_filter.as_ref(),
                                max_depth,
                                follow_links,
                            );
                            active.fetch_sub(1, Ordering::Relaxed);
                        }
                        None => {
                            // No work found — check if truly done
                            if active.load(Ordering::Relaxed) == 0
                                && injector.is_empty()
                            {
                                // Double-check after a yield to avoid false exit
                                std::thread::yield_now();
                                if active.load(Ordering::Relaxed) == 0
                                    && injector.is_empty()
                                {
                                    break;
                                }
                            } else {
                                std::thread::yield_now();
                            }
                        }
                    }
                }
            });
        }
    });
}

// ---------------------------------------------------------------------------
// process_dir
// ---------------------------------------------------------------------------

fn process_dir<F, P>(
    job: DirJob,
    worker: &Worker<DirJob>,
    _injector: &Injector<DirJob>,
    visitor: &Arc<F>,
    pre_filter: Option<&Arc<P>>,
    max_depth: Option<usize>,
    follow_links: bool,
) where
    F: Fn(Entry) + Send + Sync,
    P: Fn(&EntryRef<'_>) -> bool + Send + Sync,
{
    let read = match fs::read_dir(&job.path) {
        Ok(r) => r,
        Err(_) => return,
    };

    for raw in read {
        let raw = match raw {
            Ok(e) => e,
            Err(_) => continue,
        };

        let file_type = match raw.file_type() {
            Ok(ft) => ft,
            Err(_) => continue,
        };

        let is_symlink = file_type.is_symlink();
        let is_dir = if is_symlink && follow_links {
            raw.path().is_dir()
        } else {
            file_type.is_dir()
        };

        let kind = if is_dir {
            EntryKind::Dir
        } else if is_symlink {
            EntryKind::Symlink
        } else if file_type.is_file() {
            EntryKind::File
        } else {
            EntryKind::Other
        };

        let depth = job.depth + 1;
        let name = raw.file_name();

        // Cheap pre-filter — runs on borrowed &OsStr, zero allocation
        let pass = pre_filter
            .map(|f| f(&EntryRef { name: &name, depth, kind: kind.clone() }))
            .unwrap_or(true);

        if pass {
            // Only materialize full PathBuf for entries that pass the filter
            let path = job.path.join(&name);
            visitor(Entry { path, kind: kind.clone(), depth });
        }

        // Push subdirectories as new jobs regardless of filter
        // (we always recurse, just don't emit filtered dirs to visitor)
        if is_dir && max_depth.map(|d| depth < d).unwrap_or(true) {
            let sub = DirJob { path: job.path.join(&name), depth };
            worker.push(sub);
        }
    }
}