use std::fs;
use std::io::Write;
use std::os::unix::ffi::OsStrExt;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use crossbeam::deque::Worker;
use crate::cli::{FilterConfig, OutputConfig};
use crate::config::Config;
use crate::process::{glob_match, name_matches_extension};
use crate::queue::Pool;
pub struct WorkerState {
pub out: Vec<u8>,
pub local_count: u64,
pub local_size: u64,
pub child_dirs: Vec<PathBuf>,
}
impl WorkerState {
pub fn new(out_cap: usize, dir_batch: usize) -> Self {
Self {
out: Vec::with_capacity(out_cap),
local_count: 0,
local_size: 0,
child_dirs: Vec::with_capacity(dir_batch),
}
}
}
const OUT_FLUSH_BYTES: usize = 256 * 1024;
pub fn walk_one(
dir: PathBuf,
_config: &Config,
filter: &FilterConfig,
output: &OutputConfig,
pool: &Pool<PathBuf>,
local: &Worker<PathBuf>,
state: &mut WorkerState,
) {
let read_dir = match fs::read_dir(&dir) {
Ok(rd) => rd,
Err(_) => {
pool.sub_dirs(1);
return;
}
};
let need_size = output.sum_size
|| output.long_format
|| filter.min_size.is_some()
|| filter.max_size.is_some();
let count_only = output.count_only;
let print_paths = !count_only;
let dirs_match = filter.match_dirs;
let exts = &filter.extensions;
let name_pat = filter.name_pattern.as_deref();
let need_file_name = print_paths || !exts.is_empty() || name_pat.is_some() || need_size;
let dir_bytes = dir.as_os_str().as_bytes();
let dir_needs_sep = !dir_bytes.is_empty() && *dir_bytes.last().unwrap() != b'/';
state.child_dirs.clear();
for entry in read_dir {
let entry = match entry {
Ok(e) => e,
Err(_) => continue,
};
let ft = match entry.file_type() {
Ok(t) => t,
Err(_) => continue,
};
if ft.is_file() && !dirs_match {
if !need_file_name {
state.local_count += 1;
continue;
}
}
let name = entry.file_name();
let name_bytes = name.as_bytes();
if ft.is_dir() {
if should_skip_dir_bytes(name_bytes, filter) {
continue;
}
let mut child = PathBuf::with_capacity(dir_bytes.len() + 1 + name_bytes.len());
child.push(&dir);
child.push(&name);
if dirs_match && name_matches(name_bytes, exts, name_pat) {
write_match(
state,
dir_bytes,
dir_needs_sep,
name_bytes,
&child,
need_size,
print_paths,
output,
);
}
state.child_dirs.push(child);
} else if ft.is_file() && !dirs_match {
if !name_matches(name_bytes, exts, name_pat) {
continue;
}
let size = if need_size {
match entry.metadata() {
Ok(m) => Some(m.len()),
Err(_) => continue,
}
} else {
None
};
if let Some(min) = filter.min_size {
if size.is_none_or(|s| s < min) {
continue;
}
}
if let Some(max) = filter.max_size {
if size.is_none_or(|s| s > max) {
continue;
}
}
state.local_count += 1;
if let Some(s) = size {
state.local_size += s;
}
if print_paths {
if output.long_format {
let _ = write!(&mut state.out, "{:>12} ", size.unwrap_or(0));
state.out.extend_from_slice(dir_bytes);
if dir_needs_sep {
state.out.push(b'/');
}
state.out.extend_from_slice(name_bytes);
state.out.push(b'\n');
} else {
state.out.extend_from_slice(dir_bytes);
if dir_needs_sep {
state.out.push(b'/');
}
state.out.extend_from_slice(name_bytes);
state.out.push(b'\n');
}
if state.out.len() >= OUT_FLUSH_BYTES {
flush_out(&mut state.out);
}
}
}
}
let n = state.child_dirs.len();
if n > 0 {
pool.add_dirs(n);
for child in state.child_dirs.drain(..) {
local.push(child);
}
pool.maybe_unpark();
}
pool.sub_dirs(1);
}
pub fn flush_worker_final(state: &mut WorkerState, matched: &AtomicU64, total_size: &AtomicU64) {
flush_out(&mut state.out);
if state.local_count > 0 {
matched.fetch_add(state.local_count, Ordering::Relaxed);
state.local_count = 0;
}
if state.local_size > 0 {
total_size.fetch_add(state.local_size, Ordering::Relaxed);
state.local_size = 0;
}
}
#[allow(clippy::too_many_arguments)]
fn write_match(
state: &mut WorkerState,
dir_bytes: &[u8],
dir_needs_sep: bool,
name_bytes: &[u8],
full_path: &Path,
need_size: bool,
print_paths: bool,
output: &OutputConfig,
) {
let size = if need_size {
fs::metadata(full_path).map(|m| m.len()).ok()
} else {
None
};
state.local_count += 1;
if let Some(s) = size {
state.local_size += s;
}
if print_paths {
if output.long_format {
let _ = write!(&mut state.out, "{:>12} ", size.unwrap_or(0));
}
state.out.extend_from_slice(dir_bytes);
if dir_needs_sep {
state.out.push(b'/');
}
state.out.extend_from_slice(name_bytes);
state.out.push(b'\n');
if state.out.len() >= OUT_FLUSH_BYTES {
flush_out(&mut state.out);
}
}
}
#[inline]
fn name_matches(name: &[u8], exts: &[String], pat: Option<&str>) -> bool {
if !exts.is_empty() && !any_extension_matches(name, exts) {
return false;
}
if let Some(p) = pat {
if !glob_match(p.as_bytes(), name) {
return false;
}
}
true
}
#[inline]
fn any_extension_matches(name: &[u8], exts: &[String]) -> bool {
for e in exts {
if name_matches_extension(name, e.as_bytes()) {
return true;
}
}
false
}
fn flush_out(buf: &mut Vec<u8>) {
if buf.is_empty() {
return;
}
let stdout = std::io::stdout();
let mut h = stdout.lock();
match h.write_all(buf) {
Ok(()) => buf.clear(),
Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
std::process::exit(0);
}
Err(e) => {
eprintln!("pfind: write error: {e}");
std::process::exit(1);
}
}
}
fn should_skip_dir_bytes(name: &[u8], filter: &FilterConfig) -> bool {
if filter.skip_dirs.is_empty() {
return false;
}
for skip in &filter.skip_dirs {
let s = skip.as_bytes();
if s == name {
return true;
}
if let Some(suffix) = s.strip_prefix(b"*") {
if name.len() >= suffix.len() && &name[name.len() - suffix.len()..] == suffix {
return true;
}
}
}
false
}