1use async_trait::async_trait;
2use serde::Deserialize;
3use std::fs::{self, DirEntry};
4use std::io;
5use std::path::PathBuf;
6use std::time::Duration;
7use tokio::sync::mpsc::Sender;
8use tokio::time::sleep;
9
10use super::Listen;
11use crate::common::{ConfigInto, FromConfig, FromPath, Period};
12
13#[async_trait]
14pub trait ListFile {
15 async fn list(&self) -> io::Result<Vec<PathBuf>>;
17 async fn filter(&self, _entry: &DirEntry) -> bool {
18 true
19 }
20}
21
22#[derive(Clone, Deserialize)]
23pub enum FilePathVisitMode {
24 Once,
25 Cron(Period),
26}
27
28#[derive(Clone, Deserialize)]
29pub struct LocalFilePathVisitorConfig {
30 pub root: String,
31 pub mode: Option<FilePathVisitMode>,
32}
33
34impl FromPath for LocalFilePathVisitorConfig {}
35
36#[async_trait]
37impl ConfigInto<LocalFilePathVisitor> for LocalFilePathVisitorConfig {}
38
39pub struct LocalFilePathVisitor {
41 root: PathBuf,
43 mode: FilePathVisitMode,
45 tx: Option<Sender<PathBuf>>,
47}
48
49impl LocalFilePathVisitor {
50 pub fn new(config: LocalFilePathVisitorConfig) -> Self {
51 let mode = match config.mode {
52 Some(mode) => mode,
53 None => FilePathVisitMode::Once,
54 };
55 LocalFilePathVisitor {
56 root: PathBuf::from(config.root),
57 mode,
58 tx: None,
59 }
60 }
61}
62
63#[async_trait]
64impl FromConfig<LocalFilePathVisitorConfig> for LocalFilePathVisitor {
65 async fn from_config(config: LocalFilePathVisitorConfig) -> anyhow::Result<Self> {
66 Ok(LocalFilePathVisitor::new(config))
67 }
68}
69
70#[async_trait]
71impl ListFile for LocalFilePathVisitor {
72 async fn list(&self) -> io::Result<Vec<PathBuf>> {
74 let dir = match self.root.is_dir() {
75 true => self.root.to_owned(),
76 false => return Ok(vec![]),
77 };
78 let mut dirs = vec![dir];
79 let mut file_paths: Vec<PathBuf> = Vec::new();
80 loop {
81 let dir = match dirs.pop() {
82 Some(dir) => dir,
83 None => return Ok(file_paths),
84 };
85 for entry in fs::read_dir(dir)? {
86 let entry = entry?;
87 let path = entry.path();
88 let include = match path.is_dir() {
89 true => {
90 dirs.push(path);
91 continue;
92 }
93 false => self.filter(&entry).await,
94 };
95 if include {
96 file_paths.push(path)
97 }
98 }
99 }
100 }
101}
102
103impl LocalFilePathVisitor {
104 async fn run_once(&mut self) -> anyhow::Result<()> {
105 for path in self.list().await? {
106 self.tx.as_ref().unwrap().send(path).await?;
107 }
108 Ok(())
109 }
110
111 async fn run_cron(&mut self, delay: Duration) -> anyhow::Result<()> {
112 loop {
113 self.run_once().await?;
114 sleep(delay).await;
115 }
116 }
117}
118
119#[async_trait]
122impl Listen<PathBuf, LocalFilePathVisitorConfig> for LocalFilePathVisitor {
123 async fn run(&mut self) -> anyhow::Result<()> {
124 let period = match self.mode {
125 FilePathVisitMode::Once => return self.run_once().await,
126 FilePathVisitMode::Cron(ref period) => period.to_owned(),
127 };
128 self.run_cron(period.into()).await
129 }
130
131 fn set_sender(&mut self, sender: Sender<PathBuf>) {
132 self.tx = Some(sender)
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use crate::prelude::*;
139 use std::collections::HashSet;
140 use std::path::PathBuf;
141
142 #[tokio::test]
143 async fn test_list_folder() {
144 let (tx, mut rx) = channel!(PathBuf, 1024);
145 let channels = pipe_channels!([tx]);
146 let config = config!(
147 LocalFilePathVisitorConfig,
148 "resources/catalogs/local_file_visitor.yml"
149 );
150 let pipe = listener!("file_visitor");
151 join_pipes!([run_pipe!(pipe, config, channels)]);
152 let mut all_expected_files: HashSet<PathBuf> = HashSet::new();
153 all_expected_files.insert(PathBuf::from(
154 "resources/test_file_folder/sub_folder/test_file_0.txt",
155 ));
156 all_expected_files.insert(PathBuf::from("resources/test_file_folder/test_file_0.txt"));
157 all_expected_files.insert(PathBuf::from("resources/test_file_folder/test_file_1.txt"));
158 let mut actual_files_total: usize = 0;
159 loop {
160 let file_path = match rx.recv().await {
161 Some(file_path) => file_path,
162 None => break,
163 };
164 actual_files_total += 1;
165 assert!(all_expected_files.contains(&file_path))
166 }
167 assert_eq!(all_expected_files.len(), actual_files_total)
168 }
169}