rust_integration_services/file/
file_receiver.rs1use std::{collections::{HashMap, HashSet}, path::{Path, PathBuf}, pin::Pin, sync::Arc, time::Duration};
2use regex::Regex;
3use tokio::{signal::unix::{signal, SignalKind}, sync::Mutex, task::JoinSet};
4use uuid::Uuid;
5
6use crate::{common::event_handler::EventHandler, file::file_receiver_event::FileReceiverEvent};
7
8type FileCallback = Arc<dyn Fn(String, PathBuf) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
9
10pub struct FileReceiver {
11 source_path: PathBuf,
12 routes: HashMap<String, FileCallback>,
13 event_handler: EventHandler<FileReceiverEvent>,
14 event_join_set: JoinSet<()>,
15}
16
17impl FileReceiver {
18 pub fn new<T: AsRef<Path>>(source_path: T) -> Self {
19 FileReceiver {
20 source_path: source_path.as_ref().to_path_buf(),
21 routes: HashMap::new(),
22 event_handler: EventHandler::new(),
23 event_join_set: JoinSet::new(),
24 }
25 }
26
27 pub fn route<T, Fut, S>(mut self, filter: S, callback: T) -> Self
29 where
30 T: Fn(String, PathBuf) -> Fut + Send + Sync + 'static,
31 Fut: Future<Output = ()> + Send + 'static,
32 S: AsRef<str>,
33 {
34 Regex::new(filter.as_ref()).expect("Invalid Regex");
35 self.routes.insert(filter.as_ref().to_string(), Arc::new(move |uuid, path| Box::pin(callback(uuid, path))));
36 self
37 }
38
39 pub fn on_event<T, Fut>(mut self, handler: T) -> Self
40 where
41 T: Fn(FileReceiverEvent) -> Fut + Send + Sync + 'static,
42 Fut: Future<Output = ()> + Send + 'static,
43 {
44 self.event_join_set = self.event_handler.init(handler);
45 self
46 }
47
48 pub async fn receive(mut self) {
49 let mut receiver_join_set = JoinSet::new();
50 let mut interval = tokio::time::interval(Duration::from_millis(1500));
51 let mut sigterm = signal(SignalKind::terminate()).expect("Failed to start SIGTERM signal receiver");
52 let mut sigint = signal(SignalKind::interrupt()).expect("Failed to start SIGINT signal receiver");
53 let lock_list = Arc::new(Mutex::new(HashSet::<String>::new()));
54 let route_map = Arc::new(self.routes.clone());
55 let mut size_map = HashMap::<String, u64>::new();
56
57 loop {
58 tokio::select! {
59 _ = sigterm.recv() => break,
60 _ = sigint.recv() => break,
61 _ = async {} => {
62 if let Ok(files) = Self::get_files_in_directory(&self.source_path).await {
63 for file_path in &files {
64 for (route, callback) in route_map.iter() {
65 let file_name = file_path.file_name().unwrap().to_str().unwrap().to_string();
66
67 let regex = Regex::new(route).unwrap();
69 if !regex.is_match(&file_name) {
70 continue;
71 }
72
73 let size = tokio::fs::metadata(file_path).await.unwrap().len();
75 match size_map.get_mut(&file_name) {
76 Some(old_size) => {
77 if size > *old_size {
78 *old_size = size;
79 continue;
80 }
81 size_map.remove(&file_name);
82 }
83 None => {
84 size_map.insert(file_name, size);
85 continue;
86 }
87 }
88
89 let mut unlocked_list = lock_list.lock().await;
91 if unlocked_list.contains(&file_name) {
92 continue;
93 }
94 unlocked_list.insert(file_name.clone());
95 drop(unlocked_list);
96
97 let callback = Arc::clone(&callback);
98 let lock_list = Arc::clone(&lock_list);
99 let file_path = Arc::new(file_path.to_path_buf());
100 let uuid = Uuid::new_v4().to_string();
101 let event_broadcast = Arc::new(self.event_handler.broadcast());
102
103 event_broadcast.send(FileReceiverEvent::OnFileReceived(uuid.clone(), file_path.to_path_buf())).await.unwrap();
104 receiver_join_set.spawn(async move {
105 let callback_handle = tokio::spawn(callback(uuid.clone(), file_path.to_path_buf())).await;
106 match callback_handle {
107 Ok(_) => {
108 event_broadcast.send(FileReceiverEvent::OnFileProcessed(uuid, file_path.to_path_buf())).await.unwrap();
109 },
110 Err(err) => {
111 event_broadcast.send(FileReceiverEvent::OnError(uuid, err.to_string())).await.unwrap();
112 },
113 }
114
115 let mut unlocked_list = lock_list.lock().await;
116 unlocked_list.remove(&file_name);
117 });
118 }
119 }
120 }
121
122 interval.tick().await;
123 }
124 }
125 }
126
127 while let Some(_) = receiver_join_set.join_next().await {}
128 while let Some(_) = self.event_join_set.join_next().await {}
129 }
130
131 async fn get_files_in_directory(path: &Path) -> tokio::io::Result<Vec<PathBuf>> {
132 let mut files: Vec<PathBuf> = Vec::new();
133 let mut entries = tokio::fs::read_dir(&path).await?;
134
135 while let Some(file) = entries.next_entry().await? {
136 let file = file.path();
137 if file.is_file() {
138 files.push(file);
139 }
140 }
141
142 Ok(files)
143 }
144}