rust_integration_services/file/
file_receiver.rs

1use std::{collections::{HashMap, HashSet}, path::{Path, PathBuf}, pin::Pin, sync::Arc, time::Duration};
2use regex::Regex;
3use tokio::{signal::unix::{signal, SignalKind}, sync::Mutex, task::JoinSet};
4use uuid::Uuid;
5
6use crate::{common::event_handler::EventHandler, file::file_receiver_event::FileReceiverEvent};
7
8type FileCallback = Arc<dyn Fn(String, PathBuf) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
9
10pub struct FileReceiver {
11    source_path: PathBuf,
12    routes: HashMap<String, FileCallback>,
13    event_handler: EventHandler<FileReceiverEvent>,
14    event_join_set: JoinSet<()>,
15}
16
17impl FileReceiver {
18    pub fn new<T: AsRef<Path>>(source_path: T) -> Self {
19        FileReceiver {
20            source_path: source_path.as_ref().to_path_buf(),
21            routes: HashMap::new(),
22            event_handler: EventHandler::new(),
23            event_join_set: JoinSet::new(),
24        }
25    }
26
27    /// Callback that returns all file paths from the route using regular expression.
28    pub fn route<T, Fut, S>(mut self, filter: S, callback: T) -> Self 
29    where
30        T: Fn(String, PathBuf) -> Fut + Send + Sync + 'static,
31        Fut: Future<Output = ()> + Send + 'static,
32        S: AsRef<str>,
33    {
34        Regex::new(filter.as_ref()).expect("Invalid Regex");
35        self.routes.insert(filter.as_ref().to_string(), Arc::new(move |uuid, path| Box::pin(callback(uuid, path))));
36        self
37    }
38
39    pub fn on_event<T, Fut>(mut self, handler: T) -> Self
40    where
41        T: Fn(FileReceiverEvent) -> Fut + Send + Sync + 'static,
42        Fut: Future<Output = ()> + Send + 'static,
43    {
44        self.event_join_set = self.event_handler.init(handler);
45        self
46    }
47
48    pub async fn receive(mut self) {
49        let mut receiver_join_set = JoinSet::new();
50        let mut interval = tokio::time::interval(Duration::from_millis(1500));
51        let mut sigterm = signal(SignalKind::terminate()).expect("Failed to start SIGTERM signal receiver");
52        let mut sigint = signal(SignalKind::interrupt()).expect("Failed to start SIGINT signal receiver");
53        let lock_list = Arc::new(Mutex::new(HashSet::<String>::new()));
54        let route_map = Arc::new(self.routes.clone());
55        let mut size_map = HashMap::<String, u64>::new();
56        
57        loop {
58            tokio::select! {
59                _ = sigterm.recv() => break,
60                _ = sigint.recv() => break,
61                _ = async {} => {
62                    if let Ok(files) = Self::get_files_in_directory(&self.source_path).await {
63                        for file_path in &files {
64                            for (route, callback) in route_map.iter() {
65                                let file_name = file_path.file_name().unwrap().to_str().unwrap().to_string();
66                                
67                                // Check if the filter regex matches the file.
68                                let regex = Regex::new(route).unwrap();
69                                if !regex.is_match(&file_name) {
70                                    continue;
71                                }
72                                
73                                // Check if the file is being copied by comparing the current size with the previous polling size.
74                                let size = tokio::fs::metadata(file_path).await.unwrap().len();
75                                match size_map.get_mut(&file_name) {
76                                    Some(old_size) => {
77                                        if size > *old_size {
78                                            *old_size = size;
79                                            continue;
80                                        }
81                                        size_map.remove(&file_name);
82                                    }
83                                    None => {
84                                        size_map.insert(file_name, size);
85                                        continue;
86                                    }
87                                }
88    
89                                // Add file to a locked list to avoid multiple tasks processing the same file.
90                                let mut unlocked_list = lock_list.lock().await;
91                                if unlocked_list.contains(&file_name) {
92                                    continue;
93                                }
94                                unlocked_list.insert(file_name.clone());
95                                drop(unlocked_list);
96                                
97                                let callback = Arc::clone(&callback);
98                                let lock_list = Arc::clone(&lock_list);
99                                let file_path = Arc::new(file_path.to_path_buf());
100                                let uuid = Uuid::new_v4().to_string();
101                                let event_broadcast = Arc::new(self.event_handler.broadcast());
102                            
103                                event_broadcast.send(FileReceiverEvent::OnFileReceived(uuid.clone(), file_path.to_path_buf())).await.unwrap();
104                                receiver_join_set.spawn(async move {
105                                    let callback_handle = tokio::spawn(callback(uuid.clone(), file_path.to_path_buf())).await;
106                                    match callback_handle {
107                                        Ok(_) => {
108                                            event_broadcast.send(FileReceiverEvent::OnFileProcessed(uuid, file_path.to_path_buf())).await.unwrap();
109                                        },
110                                        Err(err) => {
111                                            event_broadcast.send(FileReceiverEvent::OnError(uuid, err.to_string())).await.unwrap();
112                                        },
113                                    }
114
115                                    let mut unlocked_list = lock_list.lock().await;
116                                    unlocked_list.remove(&file_name);
117                                });
118                            }
119                        }
120                    }
121
122                    interval.tick().await;
123                }
124            }
125        }
126
127        while let Some(_) = receiver_join_set.join_next().await {}
128        while let Some(_) = self.event_join_set.join_next().await {}
129    }
130
131    async fn get_files_in_directory(path: &Path) -> tokio::io::Result<Vec<PathBuf>> {
132        let mut files: Vec<PathBuf> = Vec::new();
133        let mut entries = tokio::fs::read_dir(&path).await?;
134
135        while let Some(file) = entries.next_entry().await? {
136            let file = file.path();
137            if file.is_file() {
138                files.push(file);
139            }
140        }
141
142        Ok(files)
143    }
144}