1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
use futures::{stream, Stream, StreamExt};
use std::{io, path::PathBuf};
use tokio::fs::{self, DirEntry};
use super::element::Element;
use super::Work;
use crate::result::*;
impl<'a> Work<'a> {
pub fn all_files_recursive(self, path: impl Into<String>) -> Result<Work<'a>> {
let path = path.into();
let path = std::path::PathBuf::from(shellexpand::full(&path)?.as_ref());
let dir_entries = visit(path.clone());
let new_elements =
dir_entries.map(move |entry| Element::create(path.clone(), entry.unwrap()));
self.add_work(move |elements| elements.chain(new_elements).boxed())
}
}
fn visit(path: impl Into<PathBuf>) -> impl Stream<Item = io::Result<DirEntry>> + Send + 'static {
async fn one_level(path: PathBuf, to_visit: &mut Vec<PathBuf>) -> io::Result<Vec<DirEntry>> {
let mut dir = fs::read_dir(path).await?;
let mut files = Vec::new();
while let Some(child) = dir.next_entry().await? {
if child.metadata().await?.is_dir() {
to_visit.push(child.path());
} else {
files.push(child);
}
}
Ok(files)
}
stream::unfold(vec![path.into()], |mut to_visit| async {
let path = to_visit.pop()?;
let file_stream = match one_level(path, &mut to_visit).await {
Ok(files) => stream::iter(files).map(Ok).left_stream(),
Err(e) => stream::once(async { Err(e) }).right_stream(),
};
Some((file_stream, to_visit))
})
.flatten()
}