scanseq/core/
scan.rs

1//! Directory scanning and parallel sequence detection.
2//!
3//! This module implements the two-phase scanning algorithm:
4//! 1. **Phase 1**: Discover all subdirectories using jwalk (fast parallel walker)
5//! 2. **Phase 2**: Process folders in parallel using rayon thread pool
6//!
7//! Each worker:
8//! - Scans files in one folder
9//! - Converts paths to [`File`] objects (extracts digit groups, creates masks)
10//! - Groups files into [`Seq`] sequences via mask-based hashing
11//!
12//! The mask-based approach handles unpadded sequences correctly:
13//! `img_1.exr` through `img_100.exr` all have mask `img_@` and group together.
14
15use super::file::File;
16use super::seq::Seq;
17use indicatif::{ProgressBar, ProgressStyle};
18use jwalk::WalkDir;
19use log::{debug, info, warn};
20use rayon::prelude::*;
21use rayon::ThreadPoolBuilder;
22use std::path::{Path, PathBuf};
23use std::sync::atomic::AtomicUsize;
24use std::sync::Arc;
25
26/// Scan directory for all subdirectories
27///
28/// Returns unique sorted list of folder paths
29pub fn scan_dirs<P: AsRef<Path>>(root: P, recursive: bool) -> Result<Vec<PathBuf>, String> {
30    let root = root.as_ref();
31    info!("Scanning folders in: {}", root.display());
32    let walker = if recursive { WalkDir::new(root) } else { WalkDir::new(root).max_depth(1) };
33    let mut folders = Vec::new();
34    for entry in walker {
35        match entry {
36            Ok(e) if e.file_type().is_dir() => {
37                let path = e.path();
38                if path != root {
39                    folders.push(path.to_path_buf());
40                }
41            }
42            Ok(_) => {} // Not a directory
43            Err(e) => warn!("Skipping inaccessible path: {}", e),
44        }
45    }
46    // Always include root itself
47    folders.push(root.to_path_buf());
48    // Sort and deduplicate
49    folders.sort();
50    folders.dedup();
51    info!("Found {} folders", folders.len());
52    Ok(folders)
53}
54
55/// Scan single folder for files matching mask
56///
57/// Returns file list (non-recursive, just this folder)
58pub fn scan_files<P: AsRef<Path>>(folder: P, mask: Option<&str>) -> Result<Vec<PathBuf>, String> {
59    let folder = folder.as_ref();
60    let entries = std::fs::read_dir(folder).map_err(|e| format!("Failed to read dir {}: {}", folder.display(), e))?;
61
62    // Pre-compile glob pattern once (if mask contains wildcards)
63    let glob_pattern = match mask {
64        Some(m) if m.contains('*') => Some(glob::Pattern::new(m).map_err(|e| format!("Invalid mask: {}", e))?),
65        _ => None,
66    };
67
68    let mut files = Vec::new();
69    for entry in entries.filter_map(|e| e.ok()) {
70        let path = entry.path();
71        if !path.is_file() {
72            continue;
73        }
74        // Apply mask if provided
75        if let Some(mask) = mask {
76            if let Some(name) = path.file_name() {
77                let name_str = name.to_string_lossy();
78                if let Some(ref pattern) = glob_pattern {
79                    if !pattern.matches(&name_str) {
80                        continue;
81                    }
82                } else if name_str != mask {
83                    continue;
84                }
85            }
86        }
87        files.push(path);
88    }
89    Ok(files)
90}
91
92/// Main scan and group function
93///
94/// Returns all sequences found (flattened, not per-folder)
95pub fn get_seqs<P: AsRef<Path>>(root: P, recursive: bool, mask: Option<&str>, min_len: usize) -> Result<Vec<Seq>, String> {
96    let start = std::time::Instant::now();
97    // Phase 1: Discover folders
98    info!("Phase 1: Discovering folders...");
99    let folders = scan_dirs(root, recursive)?;
100    info!("Phase 1 complete: {} folders in {:.2}s", folders.len(), start.elapsed().as_secs_f64());
101
102    // Phase 2: Process folders in parallel
103    info!("Phase 2: Processing folders in parallel...");
104    let phase2_start = std::time::Instant::now();
105
106    // Use dynamic thread count based on available cores
107    let num_threads = std::thread::available_parallelism()
108        .map(|n| n.get())
109        .unwrap_or(8);
110    info!("Using {} threads for parallel processing", num_threads);
111
112    let pool = ThreadPoolBuilder::new()
113        .num_threads(num_threads)
114        .build()
115        .map_err(|e| format!("Failed to create thread pool: {}", e))?;
116
117    let found_seqs = AtomicUsize::new(0);
118
119    // Progress bar
120    let pb = Arc::new(ProgressBar::new(folders.len() as u64));
121    pb.set_style(
122        ProgressStyle::default_bar()
123            .template("[{elapsed_precise}] {bar:40.cyan/blue} {pos}/{len} folders ({msg})")
124            .expect("Invalid progress bar template")
125            .progress_chars("=>-"),
126    );
127
128    let all_seqs: Vec<Seq> = pool.install(|| {
129        folders
130            .par_iter()
131            .flat_map(|folder| {
132                // Scan files in this folder
133                let files = match scan_files(folder, mask) {
134                    Ok(f) => f,
135                    Err(e) => {
136                        warn!("Error scanning {}: {}", folder.display(), e);
137                        pb.inc(1);
138                        return Vec::new();
139                    }
140                };
141
142                if files.is_empty() {
143                    pb.inc(1);
144                    return Vec::new();
145                }
146
147                debug!("Processing {} ({} files)", folder.display(), files.len());
148
149                // Convert to File objects (move PathBuf instead of clone)
150                let mut file_objs: Vec<File> = files.into_iter().map(File::new).collect();
151
152                // Group into sequences
153                let seqs = Seq::group_seqs(&mut file_objs);
154
155                // Filter by min_len
156                let filtered: Vec<Seq> = seqs.into_iter().filter(|s| s.len() >= min_len).collect();
157
158                if !filtered.is_empty() {
159                    let seq_count = filtered.len();
160                    // Use fetch_add return value to avoid race condition in message
161                    let prev = found_seqs.fetch_add(seq_count, std::sync::atomic::Ordering::Relaxed);
162                    debug!("Found {} seqs in {}", seq_count, folder.display());
163                    pb.set_message(format!("{} seqs found", prev + seq_count));
164                }
165
166                // Update progress bar
167                pb.inc(1);
168
169                filtered
170            })
171            .collect()
172    });
173
174    pb.finish_with_message("Complete");
175
176    let total_seqs = all_seqs.len();
177    info!("Phase 2 complete: {} sequences in {:.2}s", total_seqs, phase2_start.elapsed().as_secs_f64());
178    info!("Total time: {:.2}s", start.elapsed().as_secs_f64());
179
180    Ok(all_seqs)
181}