rust_integration_services/file/
file_receiver.rs1use std::{collections::{HashMap, HashSet}, path::{Path, PathBuf}, pin::Pin, sync::Arc, time::Duration};
2use regex::Regex;
3use tokio::{signal::unix::{signal, SignalKind}, sync::Mutex, task::JoinSet};
4use uuid::Uuid;
5
6use crate::common::event_handler::EventHandler;
7
8type FileCallback = Arc<dyn Fn(String, PathBuf) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
9
10#[derive(Clone)]
11pub enum FileReceiverEventSignal {
12 OnFileReceived(String, PathBuf),
13 OnFileProcessed(String, PathBuf),
14}
15
16pub struct FileReceiver {
17 source_path: PathBuf,
18 routes: HashMap<String, FileCallback>,
19 event_handler: EventHandler<FileReceiverEventSignal>,
20 event_join_set: JoinSet<()>,
21}
22
23impl FileReceiver {
24 pub fn new<T: AsRef<Path>>(source_path: T) -> Self {
25 FileReceiver {
26 source_path: source_path.as_ref().to_path_buf(),
27 routes: HashMap::new(),
28 event_handler: EventHandler::new(),
29 event_join_set: JoinSet::new(),
30 }
31 }
32
33 pub fn route<T, Fut, S>(mut self, filter: S, callback: T) -> Self
35 where
36 T: Fn(String, PathBuf) -> Fut + Send + Sync + 'static,
37 Fut: Future<Output = ()> + Send + 'static,
38 S: AsRef<str>,
39 {
40 Regex::new(filter.as_ref()).expect("Invalid Regex");
41 self.routes.insert(filter.as_ref().to_string(), Arc::new(move |uuid, path| Box::pin(callback(uuid, path))));
42 self
43 }
44
45 pub fn on_event<T, Fut>(mut self, handler: T) -> Self
46 where
47 T: Fn(FileReceiverEventSignal) -> Fut + Send + Sync + 'static,
48 Fut: Future<Output = ()> + Send + 'static,
49 {
50 self.event_join_set = self.event_handler.init(handler);
51 self
52 }
53
54 pub async fn receive(mut self) {
55 let mut receiver_join_set = JoinSet::new();
56 let mut interval = tokio::time::interval(Duration::from_millis(1500));
57 let mut sigterm = signal(SignalKind::terminate()).expect("Failed to start SIGTERM signal receiver");
58 let mut sigint = signal(SignalKind::interrupt()).expect("Failed to start SIGINT signal receiver");
59 let lock_list = Arc::new(Mutex::new(HashSet::<String>::new()));
60 let route_map = Arc::new(self.routes.clone());
61 let mut size_map = HashMap::<String, u64>::new();
62
63 loop {
64 tokio::select! {
65 _ = sigterm.recv() => break,
66 _ = sigint.recv() => break,
67 _ = async {} => {
68 if let Ok(files) = Self::get_files_in_directory(&self.source_path).await {
69 for file_path in &files {
70 for (route, callback) in route_map.iter() {
71 let file_name = file_path.file_name().unwrap().to_str().unwrap().to_string();
72
73 let regex = Regex::new(route).unwrap();
75 if !regex.is_match(&file_name) {
76 continue;
77 }
78
79 let size = tokio::fs::metadata(file_path).await.unwrap().len();
81 match size_map.get_mut(&file_name) {
82 Some(old_size) => {
83 if size > *old_size {
84 *old_size = size;
85 continue;
86 }
87 size_map.remove(&file_name);
88 }
89 None => {
90 size_map.insert(file_name, size);
91 continue;
92 }
93 }
94
95 let mut unlocked_list = lock_list.lock().await;
97 if unlocked_list.contains(&file_name) {
98 continue;
99 }
100 unlocked_list.insert(file_name.clone());
101 drop(unlocked_list);
102
103 let callback = Arc::clone(&callback);
104 let lock_list = Arc::clone(&lock_list);
105 let file_path = Arc::new(file_path.to_path_buf());
106 let uuid = Uuid::new_v4().to_string();
107 let event_broadcast = Arc::new(self.event_handler.broadcast());
108
109 event_broadcast.send(FileReceiverEventSignal::OnFileReceived(uuid.clone(), file_path.to_path_buf())).await.unwrap();
110 receiver_join_set.spawn(async move {
111 callback(uuid.clone(), file_path.to_path_buf()).await;
112 let mut unlocked_list = lock_list.lock().await;
113 unlocked_list.remove(&file_name);
114 event_broadcast.send(FileReceiverEventSignal::OnFileProcessed(uuid.clone(), file_path.to_path_buf())).await.unwrap();
115 });
116 }
117 }
118 }
119
120 interval.tick().await;
121 }
122 }
123 }
124
125 while let Some(_) = receiver_join_set.join_next().await {}
126 while let Some(_) = self.event_join_set.join_next().await {}
127 }
128
129 async fn get_files_in_directory(path: &Path) -> tokio::io::Result<Vec<PathBuf>> {
130 let mut files: Vec<PathBuf> = Vec::new();
131 let mut entries = tokio::fs::read_dir(&path).await?;
132
133 while let Some(file) = entries.next_entry().await? {
134 let file = file.path();
135 if file.is_file() {
136 files.push(file);
137 }
138 }
139
140 Ok(files)
141 }
142}