fugue_mptp/
sources.rs

1use std::path::{Path, PathBuf};
2
3use thiserror::Error;
4use walkdir::WalkDir;
5
6use crate::TaskSource;
7
8pub struct DirectorySource {
9    follow_symlinks: bool,
10    filter: Option<Box<dyn FnMut(&Path) -> bool + Send>>,
11    root: PathBuf,
12    walker: Option<walkdir::IntoIter>,
13}
14
15#[derive(Debug, Error)]
16pub enum DirectorySourceError {
17    #[error(transparent)]
18    WalkDir(#[from] walkdir::Error),
19}
20
21impl DirectorySource {
22    pub fn new(root: impl AsRef<Path>) -> Self {
23        Self {
24            follow_symlinks: false,
25            filter: None,
26            root: PathBuf::from(root.as_ref()),
27            walker: None,
28        }
29    }
30
31    pub fn new_with(
32        root: impl AsRef<Path>,
33        filter: impl FnMut(&Path) -> bool + Send + 'static,
34    ) -> Self {
35        Self {
36            follow_symlinks: false,
37            filter: Some(Box::new(filter) as _),
38            root: PathBuf::from(root.as_ref()),
39            walker: None,
40        }
41    }
42
43    pub fn follow_symlinks(&mut self, toggle: bool) {
44        self.follow_symlinks = toggle;
45    }
46}
47
48impl TaskSource for DirectorySource {
49    type TaskInput = PathBuf;
50    type Error = DirectorySourceError;
51
52    fn next_task(&mut self, _id: uuid::Uuid) -> Result<Option<PathBuf>, Self::Error> {
53        let walker = self.walker.get_or_insert_with(|| {
54            WalkDir::new(&self.root)
55                .follow_links(self.follow_symlinks)
56                .into_iter()
57        });
58
59        loop {
60            let Some(entry) = walker.next() else {
61                // reset
62                self.walker = None;
63                return Ok(None);
64            };
65
66            let entry = entry?;
67
68            if !entry.file_type().is_file() {
69                continue;
70            }
71
72            let path = entry.into_path();
73
74            if self.filter.as_mut().map(|f| f(&path)).unwrap_or(true) {
75                return Ok(Some(path));
76            }
77        }
78    }
79}
80
81#[cfg(test)]
82mod test {
83    use std::convert::Infallible;
84
85    use super::*;
86
87    use crate::{Error, TaskProcessor, TaskSink};
88
89    #[test]
90    fn test_dir() -> Result<(), Error> {
91        struct Process;
92        struct Sink;
93
94        impl TaskSink for Sink {
95            type TaskOutput = String;
96            type TaskError = String;
97
98            type Error = Infallible;
99
100            fn process_task_result(
101                &mut self,
102                id: uuid::Uuid,
103                result: Result<String, String>,
104            ) -> Result<(), Self::Error> {
105                match result {
106                    Ok(v) => {
107                        println!("task {id} processed successfully (payload: {v})")
108                    }
109                    Err(v) => {
110                        println!("task {id} failed (reason: {v})")
111                    }
112                }
113                Ok(())
114            }
115        }
116
117        impl TaskProcessor for Process {
118            type TaskInput = PathBuf;
119            type TaskOutput = String;
120            type TaskError = String;
121
122            fn process_task(&mut self, _id: uuid::Uuid, payload: PathBuf) -> Result<String, String> {
123                Ok(format!("hello, task:{}", payload.display()))
124            }
125        }
126
127        let mut source = DirectorySource::new("target");
128        let mut sink = Sink;
129        let mut process = Process;
130
131        crate::run(&mut source, &mut process, &mut sink)?;
132
133        Ok(())
134    }
135}