pipebase/listen/
file.rs

1use async_trait::async_trait;
2use serde::Deserialize;
3use std::fs::{self, DirEntry};
4use std::io;
5use std::path::PathBuf;
6use std::time::Duration;
7use tokio::sync::mpsc::Sender;
8use tokio::time::sleep;
9
10use super::Listen;
11use crate::common::{ConfigInto, FromConfig, FromPath, Period};
12
13#[async_trait]
14pub trait ListFile {
15    /// list file in directory
16    async fn list(&self) -> io::Result<Vec<PathBuf>>;
17    async fn filter(&self, _entry: &DirEntry) -> bool {
18        true
19    }
20}
21
22#[derive(Clone, Deserialize)]
23pub enum FilePathVisitMode {
24    Once,
25    Cron(Period),
26}
27
28#[derive(Clone, Deserialize)]
29pub struct LocalFilePathVisitorConfig {
30    pub root: String,
31    pub mode: Option<FilePathVisitMode>,
32}
33
34impl FromPath for LocalFilePathVisitorConfig {}
35
36#[async_trait]
37impl ConfigInto<LocalFilePathVisitor> for LocalFilePathVisitorConfig {}
38
39/// Visit file under directory and send file path
40pub struct LocalFilePathVisitor {
41    /// Root directory path
42    root: PathBuf,
43    /// Either Once ot Cron
44    mode: FilePathVisitMode,
45    /// Sender to notify downstreams
46    tx: Option<Sender<PathBuf>>,
47}
48
49impl LocalFilePathVisitor {
50    pub fn new(config: LocalFilePathVisitorConfig) -> Self {
51        let mode = match config.mode {
52            Some(mode) => mode,
53            None => FilePathVisitMode::Once,
54        };
55        LocalFilePathVisitor {
56            root: PathBuf::from(config.root),
57            mode,
58            tx: None,
59        }
60    }
61}
62
63#[async_trait]
64impl FromConfig<LocalFilePathVisitorConfig> for LocalFilePathVisitor {
65    async fn from_config(config: LocalFilePathVisitorConfig) -> anyhow::Result<Self> {
66        Ok(LocalFilePathVisitor::new(config))
67    }
68}
69
70#[async_trait]
71impl ListFile for LocalFilePathVisitor {
72    /// Recursive list file under directory
73    async fn list(&self) -> io::Result<Vec<PathBuf>> {
74        let dir = match self.root.is_dir() {
75            true => self.root.to_owned(),
76            false => return Ok(vec![]),
77        };
78        let mut dirs = vec![dir];
79        let mut file_paths: Vec<PathBuf> = Vec::new();
80        loop {
81            let dir = match dirs.pop() {
82                Some(dir) => dir,
83                None => return Ok(file_paths),
84            };
85            for entry in fs::read_dir(dir)? {
86                let entry = entry?;
87                let path = entry.path();
88                let include = match path.is_dir() {
89                    true => {
90                        dirs.push(path);
91                        continue;
92                    }
93                    false => self.filter(&entry).await,
94                };
95                if include {
96                    file_paths.push(path)
97                }
98            }
99        }
100    }
101}
102
103impl LocalFilePathVisitor {
104    async fn run_once(&mut self) -> anyhow::Result<()> {
105        for path in self.list().await? {
106            self.tx.as_ref().unwrap().send(path).await?;
107        }
108        Ok(())
109    }
110
111    async fn run_cron(&mut self, delay: Duration) -> anyhow::Result<()> {
112        loop {
113            self.run_once().await?;
114            sleep(delay).await;
115        }
116    }
117}
118
119/// # Parameters
120/// * PathBuf: output
121#[async_trait]
122impl Listen<PathBuf, LocalFilePathVisitorConfig> for LocalFilePathVisitor {
123    async fn run(&mut self) -> anyhow::Result<()> {
124        let period = match self.mode {
125            FilePathVisitMode::Once => return self.run_once().await,
126            FilePathVisitMode::Cron(ref period) => period.to_owned(),
127        };
128        self.run_cron(period.into()).await
129    }
130
131    fn set_sender(&mut self, sender: Sender<PathBuf>) {
132        self.tx = Some(sender)
133    }
134}
135
136#[cfg(test)]
137mod tests {
138    use crate::prelude::*;
139    use std::collections::HashSet;
140    use std::path::PathBuf;
141
142    #[tokio::test]
143    async fn test_list_folder() {
144        let (tx, mut rx) = channel!(PathBuf, 1024);
145        let channels = pipe_channels!([tx]);
146        let config = config!(
147            LocalFilePathVisitorConfig,
148            "resources/catalogs/local_file_visitor.yml"
149        );
150        let pipe = listener!("file_visitor");
151        join_pipes!([run_pipe!(pipe, config, channels)]);
152        let mut all_expected_files: HashSet<PathBuf> = HashSet::new();
153        all_expected_files.insert(PathBuf::from(
154            "resources/test_file_folder/sub_folder/test_file_0.txt",
155        ));
156        all_expected_files.insert(PathBuf::from("resources/test_file_folder/test_file_0.txt"));
157        all_expected_files.insert(PathBuf::from("resources/test_file_folder/test_file_1.txt"));
158        let mut actual_files_total: usize = 0;
159        loop {
160            let file_path = match rx.recv().await {
161                Some(file_path) => file_path,
162                None => break,
163            };
164            actual_files_total += 1;
165            assert!(all_expected_files.contains(&file_path))
166        }
167        assert_eq!(all_expected_files.len(), actual_files_total)
168    }
169}