rust_integration_services/file/
file_receiver.rs1use std::{collections::{HashMap, HashSet}, path::{Path, PathBuf}, pin::Pin, sync::Arc, time::Duration};
2use regex::Regex;
3use tokio::{signal::unix::{signal, SignalKind}, sync::{mpsc, Mutex}, task::JoinSet};
4use uuid::Uuid;
5
6use crate::utils::error::Error;
7
8type FileCallback = Arc<dyn Fn(String, PathBuf) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
9
10#[derive(Clone)]
11pub enum FileReceiverEventSignal {
12 OnFileReceived(String, PathBuf),
13}
14
15pub struct FileReceiver {
16 source_path: PathBuf,
17 filter: HashMap<String, FileCallback>,
18 event_broadcast: mpsc::Sender<FileReceiverEventSignal>,
19 event_receiver: Option<mpsc::Receiver<FileReceiverEventSignal>>,
20 event_join_set: JoinSet<()>,
21}
22
23impl FileReceiver {
24 pub fn new<T: AsRef<Path>>(source_path: T) -> Self {
25 let (event_broadcast, event_receiver) = mpsc::channel(128);
26 FileReceiver {
27 source_path: source_path.as_ref().to_path_buf(),
28 filter: HashMap::new(),
29 event_broadcast,
30 event_receiver: Some(event_receiver),
31 event_join_set: JoinSet::new(),
32 }
33 }
34
35 pub fn filter<T, Fut, S>(mut self, filter: S, callback: T) -> Self
37 where
38 T: Fn(String, PathBuf) -> Fut + Send + Sync + 'static,
39 Fut: Future<Output = ()> + Send + 'static,
40 S: AsRef<str>,
41 {
42 Regex::new(filter.as_ref()).expect("Invalid Regex!");
43 self.filter.insert(filter.as_ref().to_string(), Arc::new(move |uuid, path| Box::pin(callback(uuid, path))));
44 self
45 }
46
47 pub fn on_event<T, Fut>(mut self, handler: T) -> Self
48 where
49 T: Fn(FileReceiverEventSignal) -> Fut + Send + Sync + 'static,
50 Fut: Future<Output = ()> + Send + 'static,
51 {
52 let mut receiver = self.event_receiver.unwrap();
53 let mut sigterm = signal(SignalKind::terminate()).expect("Failed to start SIGTERM signal receiver.");
54 let mut sigint = signal(SignalKind::interrupt()).expect("Failed to start SIGINT signal receiver.");
55
56 self.event_join_set.spawn(async move {
57 loop {
58 tokio::select! {
59 _ = sigterm.recv() => break,
60 _ = sigint.recv() => break,
61 event = receiver.recv() => {
62 match event {
63 Some(event) => handler(event).await,
64 None => break,
65 }
66 }
67 }
68 }
69 });
70
71 self.event_receiver = None;
72 self
73 }
74
75 pub async fn receive(mut self) -> tokio::io::Result<()> {
76 if !self.source_path.try_exists()? {
77 return Err(Error::tokio_io(format!("The path '{:?}' does not exist!", &self.source_path)));
78 }
79
80 let mut join_set = JoinSet::new();
81 let mut interval = tokio::time::interval(Duration::from_millis(1500));
82 let mut sigterm = signal(SignalKind::terminate())?;
83 let mut sigint = signal(SignalKind::interrupt())?;
84 let lock_list = Arc::new(Mutex::new(HashSet::<String>::new()));
85 let filter_map = Arc::new(self.filter.clone());
86 let mut size_map = HashMap::<String, u64>::new();
87
88 loop {
89 tokio::select! {
90 _ = sigterm.recv() => break,
91 _ = sigint.recv() => break,
92 _ = async {} => {
93 if let Ok(files) = Self::get_files_in_directory(&self.source_path).await {
94 for file_path in &files {
95 for (filter, callback) in filter_map.iter() {
96 let file_name = file_path.file_name().unwrap().to_str().unwrap().to_string();
97
98 let regex = Regex::new(filter).unwrap();
100 if !regex.is_match(&file_name) {
101 continue;
102 }
103
104 let size = tokio::fs::metadata(file_path).await.unwrap().len();
106 match size_map.get_mut(&file_name) {
107 Some(old_size) => {
108 if size > *old_size {
109 *old_size = size;
110 continue;
111 }
112 size_map.remove(&file_name);
113 }
114 None => {
115 size_map.insert(file_name, size);
116 continue;
117 }
118 }
119
120 let mut unlocked_list = lock_list.lock().await;
122 if unlocked_list.contains(&file_name) {
123 continue;
124 }
125 unlocked_list.insert(file_name.clone());
126 drop(unlocked_list);
127
128 let callback = Arc::clone(&callback);
129 let lock_list = Arc::clone(&lock_list);
130 let file_path = Arc::new(file_path.to_path_buf());
131 let uuid = Uuid::new_v4().to_string();
132
133 self.event_broadcast.send(FileReceiverEventSignal::OnFileReceived(uuid.clone(), file_path.to_path_buf())).await.unwrap();
134 join_set.spawn(async move {
135 callback(uuid, file_path.to_path_buf()).await;
136 let mut unlocked_list = lock_list.lock().await;
137 unlocked_list.remove(&file_name);
138 });
139 }
140 }
141 }
142
143 interval.tick().await;
144 }
145 }
146 }
147
148 while let Some(_) = join_set.join_next().await {}
149 while let Some(_) = self.event_join_set.join_next().await {}
150
151 Ok(())
152 }
153
154 async fn get_files_in_directory(path: &Path) -> tokio::io::Result<Vec<PathBuf>> {
155 let mut files: Vec<PathBuf> = Vec::new();
156 let mut entries = tokio::fs::read_dir(&path).await?;
157
158 while let Some(file) = entries.next_entry().await? {
159 let file = file.path();
160 if file.is_file() {
161 files.push(file);
162 }
163 }
164
165 Ok(files)
166 }
167}