use std::path::PathBuf;
use super::Iter;
use crate::{
bstr::BString, dirwalk, util::OwnedOrStaticAtomicBool, worktree::IndexPersistedOrInMemory, PathspecDetached,
Repository,
};
pub struct Item {
pub entry: gix_dir::Entry,
pub collapsed_directory_status: Option<gix_dir::entry::Status>,
}
impl Item {
fn new(entry: gix_dir::EntryRef<'_>, collapsed_directory_status: Option<gix_dir::entry::Status>) -> Self {
Item {
entry: entry.to_owned(),
collapsed_directory_status,
}
}
}
pub struct Outcome {
pub index: IndexPersistedOrInMemory,
pub excludes: gix_worktree::Stack,
pub pathspec: PathspecDetached,
pub traversal_root: PathBuf,
pub dirwalk: gix_dir::walk::Outcome,
}
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum Error {
#[error("Failed to spawn producer thread")]
#[cfg(feature = "parallel")]
SpawnThread(#[from] std::io::Error),
#[error(transparent)]
#[cfg(not(feature = "parallel"))]
Dirwalk(#[from] dirwalk::Error),
#[error(transparent)]
#[cfg(not(feature = "parallel"))]
DetachPathSpec(#[from] std::io::Error),
}
impl Iter {
pub(crate) fn new(
repo: &Repository,
index: IndexPersistedOrInMemory,
patterns: Vec<BString>,
should_interrupt: OwnedOrStaticAtomicBool,
options: dirwalk::Options,
) -> Result<Iter, Error> {
#[cfg(feature = "parallel")]
{
let repo = repo.clone().into_sync();
let (tx, rx) = std::sync::mpsc::channel();
let handle = std::thread::Builder::new()
.name("gix::dirwalk::iter::producer".into())
.spawn({
let should_interrupt = should_interrupt.clone();
move || -> Result<Outcome, dirwalk::Error> {
let repo: Repository = repo.into();
let mut collect = Collect { tx };
let out = repo.dirwalk(&index, patterns, &should_interrupt, options, &mut collect)?;
Ok(Outcome {
index,
excludes: out.excludes.detach(),
pathspec: out.pathspec.detach().map_err(|err| {
dirwalk::Error::Walk(gix_dir::walk::Error::ReadDir {
path: repo.git_dir().to_owned(),
source: err,
})
})?,
traversal_root: out.traversal_root,
dirwalk: out.dirwalk,
})
}
})?;
Ok(Iter {
rx_and_join: Some((rx, handle)),
should_interrupt,
out: None,
})
}
#[cfg(not(feature = "parallel"))]
{
let mut collect = Collect { items: Vec::new() };
let out = repo.dirwalk(&index, patterns, &should_interrupt, options, &mut collect)?;
let out = Outcome {
index,
excludes: out.excludes.detach(),
pathspec: out.pathspec.detach()?,
traversal_root: out.traversal_root,
dirwalk: out.dirwalk,
};
Ok(Iter {
items: collect.items.into_iter(),
out: Some(out),
})
}
}
}
impl Iter {
pub fn outcome_mut(&mut self) -> Option<&mut Outcome> {
self.out.as_mut()
}
pub fn into_outcome(mut self) -> Option<Outcome> {
self.out.take()
}
}
impl Iterator for Iter {
type Item = Result<Item, dirwalk::Error>;
fn next(&mut self) -> Option<Self::Item> {
#[cfg(feature = "parallel")]
{
let (rx, _join) = self.rx_and_join.as_ref()?;
match rx.recv().ok() {
Some(item) => Some(Ok(item)),
None => {
let (_rx, handle) = self.rx_and_join.take()?;
match handle.join().expect("no panic") {
Ok(out) => {
self.out = Some(out);
None
}
Err(err) => Some(Err(err)),
}
}
}
}
#[cfg(not(feature = "parallel"))]
self.items.next().map(Ok)
}
}
#[cfg(feature = "parallel")]
impl Drop for Iter {
fn drop(&mut self) {
crate::util::parallel_iter_drop(
self.rx_and_join
.take()
.map(|(rx, handle)| (rx, handle, None::<std::thread::JoinHandle<()>>)),
&self.should_interrupt,
);
}
}
struct Collect {
#[cfg(feature = "parallel")]
tx: std::sync::mpsc::Sender<Item>,
#[cfg(not(feature = "parallel"))]
items: Vec<Item>,
}
impl gix_dir::walk::Delegate for Collect {
fn emit(
&mut self,
entry: gix_dir::EntryRef<'_>,
collapsed_directory_status: Option<gix_dir::entry::Status>,
) -> gix_dir::walk::Action {
let item = Item::new(entry, collapsed_directory_status);
#[cfg(feature = "parallel")]
self.tx.send(item).ok();
#[cfg(not(feature = "parallel"))]
self.items.push(item);
std::ops::ControlFlow::Continue(())
}
}