1use std::path::{Path, PathBuf};
2
3use thiserror::Error;
4use walkdir::WalkDir;
5
6use crate::TaskSource;
7
8pub struct DirectorySource {
9 follow_symlinks: bool,
10 filter: Option<Box<dyn FnMut(&Path) -> bool + Send>>,
11 root: PathBuf,
12 walker: Option<walkdir::IntoIter>,
13}
14
15#[derive(Debug, Error)]
16pub enum DirectorySourceError {
17 #[error(transparent)]
18 WalkDir(#[from] walkdir::Error),
19}
20
21impl DirectorySource {
22 pub fn new(root: impl AsRef<Path>) -> Self {
23 Self {
24 follow_symlinks: false,
25 filter: None,
26 root: PathBuf::from(root.as_ref()),
27 walker: None,
28 }
29 }
30
31 pub fn new_with(
32 root: impl AsRef<Path>,
33 filter: impl FnMut(&Path) -> bool + Send + 'static,
34 ) -> Self {
35 Self {
36 follow_symlinks: false,
37 filter: Some(Box::new(filter) as _),
38 root: PathBuf::from(root.as_ref()),
39 walker: None,
40 }
41 }
42
43 pub fn follow_symlinks(&mut self, toggle: bool) {
44 self.follow_symlinks = toggle;
45 }
46}
47
48impl TaskSource for DirectorySource {
49 type TaskInput = PathBuf;
50 type Error = DirectorySourceError;
51
52 fn next_task(&mut self, _id: uuid::Uuid) -> Result<Option<PathBuf>, Self::Error> {
53 let walker = self.walker.get_or_insert_with(|| {
54 WalkDir::new(&self.root)
55 .follow_links(self.follow_symlinks)
56 .into_iter()
57 });
58
59 loop {
60 let Some(entry) = walker.next() else {
61 self.walker = None;
63 return Ok(None);
64 };
65
66 let entry = entry?;
67
68 if !entry.file_type().is_file() {
69 continue;
70 }
71
72 let path = entry.into_path();
73
74 if self.filter.as_mut().map(|f| f(&path)).unwrap_or(true) {
75 return Ok(Some(path));
76 }
77 }
78 }
79}
80
81#[cfg(test)]
82mod test {
83 use std::convert::Infallible;
84
85 use super::*;
86
87 use crate::{Error, TaskProcessor, TaskSink};
88
89 #[test]
90 fn test_dir() -> Result<(), Error> {
91 struct Process;
92 struct Sink;
93
94 impl TaskSink for Sink {
95 type TaskOutput = String;
96 type TaskError = String;
97
98 type Error = Infallible;
99
100 fn process_task_result(
101 &mut self,
102 id: uuid::Uuid,
103 result: Result<String, String>,
104 ) -> Result<(), Self::Error> {
105 match result {
106 Ok(v) => {
107 println!("task {id} processed successfully (payload: {v})")
108 }
109 Err(v) => {
110 println!("task {id} failed (reason: {v})")
111 }
112 }
113 Ok(())
114 }
115 }
116
117 impl TaskProcessor for Process {
118 type TaskInput = PathBuf;
119 type TaskOutput = String;
120 type TaskError = String;
121
122 fn process_task(&mut self, _id: uuid::Uuid, payload: PathBuf) -> Result<String, String> {
123 Ok(format!("hello, task:{}", payload.display()))
124 }
125 }
126
127 let mut source = DirectorySource::new("target");
128 let mut sink = Sink;
129 let mut process = Process;
130
131 crate::run(&mut source, &mut process, &mut sink)?;
132
133 Ok(())
134 }
135}