1use futures::{stream, Stream, StreamExt}; use std::{io, path::PathBuf};
3use tokio::fs::{self, DirEntry}; use super::element::Element;
6use super::Work;
7use crate::result::*;
8
9impl<'a> Work<'a> {
10 pub fn all_files_recursive(self, path: impl Into<String>) -> Result<Work<'a>> {
11 let path = path.into();
12 let path = std::path::PathBuf::from(shellexpand::full(&path)?.as_ref());
13
14 let dir_entries = visit(path.clone());
15 let new_elements =
16 dir_entries.map(move |entry| Element::create(path.clone(), entry.unwrap()));
17
18 self.add_work(move |elements| elements.chain(new_elements).boxed())
19 }
20}
21
22fn visit(path: impl Into<PathBuf>) -> impl Stream<Item = io::Result<DirEntry>> + Send + 'static {
23 async fn one_level(path: PathBuf, to_visit: &mut Vec<PathBuf>) -> io::Result<Vec<DirEntry>> {
24 let mut dir = fs::read_dir(path).await?;
25 let mut files = Vec::new();
26
27 while let Some(child) = dir.next_entry().await? {
28 if child.metadata().await?.is_dir() {
29 to_visit.push(child.path());
30 } else {
31 files.push(child);
32 }
33 }
34
35 Ok(files)
36 }
37
38 stream::unfold(vec![path.into()], |mut to_visit| async {
39 let path = to_visit.pop()?;
40 let file_stream = match one_level(path, &mut to_visit).await {
41 Ok(files) => stream::iter(files).map(Ok).left_stream(),
42 Err(e) => stream::once(async { Err(e) }).right_stream(),
43 };
44
45 Some((file_stream, to_visit))
46 })
47 .flatten()
48}