use std::{
fs::DirEntry,
io,
path::{Path, PathBuf},
sync::Arc,
};
use anyhow::Result;
use async_recursion::async_recursion;
use globset::{Glob, GlobMatcher};
use parking_lot::Mutex;
use tokio::{sync::oneshot, task::JoinSet};
use crate::db::{self, info::CacheTypeRef};
pub async fn search(
base: PathBuf,
worker_count: usize,
queue_depth: usize,
follow_symlink: bool,
) -> Result<Vec<(PathBuf, CacheTypeRef)>> {
assert_ne!(worker_count, 0);
let ctx = Arc::new(Context {
follow_symlink,
busy_workers: Mutex::new(worker_count),
balancer: Mutex::default(),
result: tokio::sync::Mutex::default(),
});
let ignore_dir_names = db::IGNORE_DIR_NAMES
.read()
.iter()
.map(|g| Glob::new(g).unwrap().compile_matcher())
.collect::<Vec<_>>();
let mut handles = JoinSet::new();
for i in 0..worker_count {
let ctx = ctx.clone();
let base = base.clone();
let ignore_dir_names = ignore_dir_names.clone();
handles.spawn(async move {
let mut worker = WorkerContext::new(queue_depth, ignore_dir_names).unwrap();
if i == 0 {
worker.queue[0].push(base);
}
loop {
worker.run_search(&ctx).await.unwrap();
if !worker.pull_job(&ctx).await.unwrap() {
break;
}
}
});
}
while handles.join_next().await.is_some() {}
let result = ctx.result.lock().await;
Ok(result.clone())
}
type BalancerQueue = Vec<oneshot::Sender<Option<(usize, PathBuf)>>>;
#[derive(Debug)]
struct Context {
follow_symlink: bool,
busy_workers: Mutex<usize>,
balancer: Mutex<BalancerQueue>,
result: tokio::sync::Mutex<Vec<(PathBuf, CacheTypeRef)>>,
}
unsafe impl Send for Context {}
impl Context {
async fn check_entry(&self, entry: &DirEntry) -> Result<()> {
let path = entry.path();
if let Some((cache, path)) = db::check_path(&path).await? {
self.result.lock().await.push((path, cache));
}
Ok(())
}
}
#[derive(Debug, Clone)]
struct WorkerContext {
queue: Box<[Vec<PathBuf>]>,
ignore_dir_names: Vec<GlobMatcher>,
}
impl WorkerContext {
fn new(queue_depth: usize, ignore_dir_names: Vec<GlobMatcher>) -> Result<WorkerContext> {
let queue = Box::try_new_zeroed_slice(queue_depth)?;
let mut queue = unsafe { queue.assume_init() };
queue.fill_with(Vec::new);
Ok(WorkerContext {
queue,
ignore_dir_names,
})
}
async fn pull_job(&mut self, ctx: &Arc<Context>) -> Result<bool> {
{
let mut workers = ctx.busy_workers.lock();
*workers -= 1;
if *workers == 0 {
while let Some(tx) = ctx.balancer.lock().pop() {
tx.send(None).unwrap();
}
return Ok(false);
}
}
let (tx, rx) = oneshot::channel::<Option<(usize, PathBuf)>>();
ctx.balancer.lock().push(tx);
if let Some((depth, path)) = rx.await? {
self.queue[depth].push(path);
} else {
return Ok(false);
}
*ctx.busy_workers.lock() += 1;
Ok(true)
}
async fn share_job(&mut self, ctx: &Arc<Context>) -> Result<()> {
if let Some(mut balancer) = ctx.balancer.try_lock() {
while let Some(sender) = balancer.pop() {
let mut data = None;
for depth in 0..self.queue.len() - 1 {
if self.queue[depth].len() > 2 {
let path = self.queue[depth].pop().unwrap();
data = Some((depth, path));
break;
}
}
match data {
Some(data) => sender.send(Some(data)).unwrap(),
None => {
balancer.push(sender);
break;
}
}
}
}
Ok(())
}
async fn run_search(&mut self, ctx: &Arc<Context>) -> Result<()> {
let mut depth = self.queue.len() - 1;
let mut recursive_timer = 0u32;
loop {
if let Some(path) = self.queue[depth].pop() {
let children = self.do_search(ctx, path).await?;
if depth + 1 < self.queue.len() {
if !children.is_empty() {
depth += 1;
self.queue[depth] = children;
self.share_job(ctx).await?;
}
} else {
for child in children {
self.do_recursive_search(ctx, &child, &mut recursive_timer)
.await?;
}
}
} else {
if depth == 0 {
break;
}
depth -= 1;
}
}
Ok(())
}
async fn do_search(&self, ctx: &Arc<Context>, path: PathBuf) -> Result<Vec<PathBuf>> {
let mut queue = vec![];
if let Some(readdir) = ignore_errors(path.read_dir())? {
for entry in readdir {
let entry = entry?;
if !ctx.follow_symlink && entry.file_type()?.is_symlink() {
continue;
}
ctx.check_entry(&entry).await?;
if let Some(metadata) = ignore_errors(entry.metadata())? {
if metadata.is_dir() {
let name = entry.file_name().to_string_lossy().to_string();
if !self.ignore_dir_names.iter().any(|g| g.is_match(&name)) {
queue.push(entry.path());
}
}
}
}
}
Ok(queue)
}
#[async_recursion]
async fn do_recursive_search(
&mut self,
ctx: &Arc<Context>,
path: &Path,
timer: &mut u32,
) -> Result<()> {
if let Some(readdir) = ignore_errors(path.read_dir())? {
for entry in readdir {
let entry = entry?;
if !ctx.follow_symlink && entry.file_type()?.is_symlink() {
continue;
}
ctx.check_entry(&entry).await?;
if let Some(metadata) = ignore_errors(entry.metadata())? {
if metadata.is_dir() {
*timer = timer.overflowing_add(1).0;
if *timer as u8 == 0 {
self.share_job(ctx).await?;
}
self.do_recursive_search(ctx, &entry.path(), timer).await?;
}
}
}
}
Ok(())
}
}
fn ignore_errors<T>(result: io::Result<T>) -> io::Result<Option<T>> {
match result {
Ok(v) => Ok(Some(v)),
Err(err) => {
if matches!(
err.kind(),
std::io::ErrorKind::OutOfMemory
| std::io::ErrorKind::ResourceBusy
| std::io::ErrorKind::ReadOnlyFilesystem
| std::io::ErrorKind::PermissionDenied
) {
Ok(None)
} else {
Err(err)
}
}
}
}