rust_integration_services/file/
file_receiver.rs1use std::{collections::{HashMap, HashSet}, path::{Path, PathBuf}, pin::Pin, sync::Arc, time::Duration};
2use regex::Regex;
3use tokio::{signal::unix::{signal, SignalKind}, sync::Mutex, task::JoinSet};
4use uuid::Uuid;
5
6use crate::{common::event_handler::EventHandler, file::file_receiver_event::FileReceiverEvent};
7
8type FileCallback = Arc<dyn Fn(String, PathBuf) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
9
10pub struct FileReceiver {
11 source_path: PathBuf,
12 routes: HashMap<String, FileCallback>,
13 event_handler: EventHandler<FileReceiverEvent>,
14}
15
16impl FileReceiver {
17 pub fn new<T: AsRef<Path>>(source_path: T) -> Self {
18 FileReceiver {
19 source_path: source_path.as_ref().to_path_buf(),
20 routes: HashMap::new(),
21 event_handler: EventHandler::new(),
22 }
23 }
24
25 pub fn route<T, Fut, S>(mut self, filter: S, callback: T) -> Self
27 where
28 T: Fn(String, PathBuf) -> Fut + Send + Sync + 'static,
29 Fut: Future<Output = ()> + Send + 'static,
30 S: AsRef<str>,
31 {
32 Regex::new(filter.as_ref()).expect("Invalid Regex");
33 self.routes.insert(filter.as_ref().to_string(), Arc::new(move |uuid, path| Box::pin(callback(uuid, path))));
34 self
35 }
36
37 pub fn on_event<T, Fut>(mut self, handler: T) -> Self
38 where
39 T: Fn(FileReceiverEvent) -> Fut + Send + Sync + 'static,
40 Fut: Future<Output = ()> + Send + 'static,
41 {
42 self.event_handler.init(handler);
43 self
44 }
45
46 pub async fn receive(mut self) {
47 let mut receiver_join_set = JoinSet::new();
48 let mut interval = tokio::time::interval(Duration::from_millis(1500));
49 let mut sigterm = signal(SignalKind::terminate()).expect("Failed to start SIGTERM signal receiver");
50 let mut sigint = signal(SignalKind::interrupt()).expect("Failed to start SIGINT signal receiver");
51 let lock_list = Arc::new(Mutex::new(HashSet::<String>::new()));
52 let route_map = Arc::new(self.routes.clone());
53 let mut size_map = HashMap::<String, u64>::new();
54
55 loop {
56 tokio::select! {
57 _ = sigterm.recv() => break,
58 _ = sigint.recv() => break,
59 _ = async {} => {
60 if let Ok(files) = Self::get_files_in_directory(&self.source_path).await {
61 for file_path in &files {
62 for (route, callback) in route_map.iter() {
63 let file_name = file_path.file_name().unwrap().to_str().unwrap().to_string();
64
65 let regex = Regex::new(route).unwrap();
67 if !regex.is_match(&file_name) {
68 continue;
69 }
70
71 let size = tokio::fs::metadata(file_path).await.unwrap().len();
73 match size_map.get_mut(&file_name) {
74 Some(old_size) => {
75 if size > *old_size {
76 *old_size = size;
77 continue;
78 }
79 size_map.remove(&file_name);
80 }
81 None => {
82 size_map.insert(file_name, size);
83 continue;
84 }
85 }
86
87 let mut unlocked_list = lock_list.lock().await;
89 if unlocked_list.contains(&file_name) {
90 continue;
91 }
92 unlocked_list.insert(file_name.clone());
93 drop(unlocked_list);
94
95 let callback = Arc::clone(&callback);
96 let lock_list = Arc::clone(&lock_list);
97 let file_path = Arc::new(file_path.to_path_buf());
98 let uuid = Uuid::new_v4().to_string();
99 let event_broadcast = Arc::new(self.event_handler.broadcast());
100
101 event_broadcast.send(FileReceiverEvent::FileReceived {
102 uuid: uuid.clone(),
103 path: file_path.to_path_buf()
104 }).ok();
105 receiver_join_set.spawn(async move {
106 let callback_handle = tokio::spawn(callback(uuid.clone(), file_path.to_path_buf())).await;
107 match callback_handle {
108 Ok(_) => {
109 event_broadcast.send(FileReceiverEvent::FileProcessed {
110 uuid: uuid.clone(),
111 path: file_path.to_path_buf()
112 }).ok();
113 },
114 Err(err) => {
115 event_broadcast.send(FileReceiverEvent::Error {
116 uuid: uuid,
117 error: err.to_string()
118 }).ok();
119 },
120 }
121
122 let mut unlocked_list = lock_list.lock().await;
123 unlocked_list.remove(&file_name);
124 });
125 }
126 }
127 }
128
129 interval.tick().await;
130 }
131 }
132 }
133
134 self.event_handler.shutdown().await;
135 while let Some(_) = receiver_join_set.join_next().await {}
136 }
137
138 async fn get_files_in_directory(path: &Path) -> tokio::io::Result<Vec<PathBuf>> {
139 let mut files: Vec<PathBuf> = Vec::new();
140 let mut entries = tokio::fs::read_dir(&path).await?;
141
142 while let Some(file) = entries.next_entry().await? {
143 let file = file.path();
144 if file.is_file() {
145 files.push(file);
146 }
147 }
148
149 Ok(files)
150 }
151}