rust_integration_services/file/
file_receiver.rs

1use std::{collections::{HashMap, HashSet}, path::{Path, PathBuf}, pin::Pin, sync::Arc, time::Duration};
2use regex::Regex;
3use tokio::{signal::unix::{signal, SignalKind}, sync::Mutex, task::JoinSet};
4use uuid::Uuid;
5
6use crate::common::event_handler::EventHandler;
7
8type FileCallback = Arc<dyn Fn(String, PathBuf) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
9
10#[derive(Clone)]
11pub enum FileReceiverEventSignal {
12    OnFileReceived(String, PathBuf),
13    OnFileProcessed(String, PathBuf),
14}
15
16pub struct FileReceiver {
17    source_path: PathBuf,
18    routes: HashMap<String, FileCallback>,
19    event_handler: EventHandler<FileReceiverEventSignal>,
20    event_join_set: JoinSet<()>,
21}
22
23impl FileReceiver {
24    pub fn new<T: AsRef<Path>>(source_path: T) -> Self {
25        FileReceiver {
26            source_path: source_path.as_ref().to_path_buf(),
27            routes: HashMap::new(),
28            event_handler: EventHandler::new(),
29            event_join_set: JoinSet::new(),
30        }
31    }
32
33    /// Callback that returns all file paths from the route using regular expression.
34    pub fn route<T, Fut, S>(mut self, filter: S, callback: T) -> Self 
35    where
36        T: Fn(String, PathBuf) -> Fut + Send + Sync + 'static,
37        Fut: Future<Output = ()> + Send + 'static,
38        S: AsRef<str>,
39    {
40        Regex::new(filter.as_ref()).expect("Invalid Regex");
41        self.routes.insert(filter.as_ref().to_string(), Arc::new(move |uuid, path| Box::pin(callback(uuid, path))));
42        self
43    }
44
45    pub fn on_event<T, Fut>(mut self, handler: T) -> Self
46    where
47        T: Fn(FileReceiverEventSignal) -> Fut + Send + Sync + 'static,
48        Fut: Future<Output = ()> + Send + 'static,
49    {
50        self.event_join_set = self.event_handler.init(handler);
51        self
52    }
53
54    pub async fn receive(mut self) {
55        let mut receiver_join_set = JoinSet::new();
56        let mut interval = tokio::time::interval(Duration::from_millis(1500));
57        let mut sigterm = signal(SignalKind::terminate()).expect("Failed to start SIGTERM signal receiver");
58        let mut sigint = signal(SignalKind::interrupt()).expect("Failed to start SIGINT signal receiver");
59        let lock_list = Arc::new(Mutex::new(HashSet::<String>::new()));
60        let route_map = Arc::new(self.routes.clone());
61        let mut size_map = HashMap::<String, u64>::new();
62        
63        loop {
64            tokio::select! {
65                _ = sigterm.recv() => break,
66                _ = sigint.recv() => break,
67                _ = async {} => {
68                    if let Ok(files) = Self::get_files_in_directory(&self.source_path).await {
69                        for file_path in &files {
70                            for (route, callback) in route_map.iter() {
71                                let file_name = file_path.file_name().unwrap().to_str().unwrap().to_string();
72                                
73                                // Check if the filter regex matches the file.
74                                let regex = Regex::new(route).unwrap();
75                                if !regex.is_match(&file_name) {
76                                    continue;
77                                }
78                                
79                                // Check if the file is being copied by comparing the current size with the previous polling size.
80                                let size = tokio::fs::metadata(file_path).await.unwrap().len();
81                                match size_map.get_mut(&file_name) {
82                                    Some(old_size) => {
83                                        if size > *old_size {
84                                            *old_size = size;
85                                            continue;
86                                        }
87                                        size_map.remove(&file_name);
88                                    }
89                                    None => {
90                                        size_map.insert(file_name, size);
91                                        continue;
92                                    }
93                                }
94    
95                                // Add file to a locked list to avoid multiple tasks processing the same file.
96                                let mut unlocked_list = lock_list.lock().await;
97                                if unlocked_list.contains(&file_name) {
98                                    continue;
99                                }
100                                unlocked_list.insert(file_name.clone());
101                                drop(unlocked_list);
102                                
103                                let callback = Arc::clone(&callback);
104                                let lock_list = Arc::clone(&lock_list);
105                                let file_path = Arc::new(file_path.to_path_buf());
106                                let uuid = Uuid::new_v4().to_string();
107                                let event_broadcast = Arc::new(self.event_handler.broadcast());
108                            
109                                event_broadcast.send(FileReceiverEventSignal::OnFileReceived(uuid.clone(), file_path.to_path_buf())).await.unwrap();
110                                receiver_join_set.spawn(async move {
111                                    callback(uuid.clone(), file_path.to_path_buf()).await;
112                                    let mut unlocked_list = lock_list.lock().await;
113                                    unlocked_list.remove(&file_name);
114                                    event_broadcast.send(FileReceiverEventSignal::OnFileProcessed(uuid.clone(), file_path.to_path_buf())).await.unwrap();
115                                });
116                            }
117                        }
118                    }
119
120                    interval.tick().await;
121                }
122            }
123        }
124
125        while let Some(_) = receiver_join_set.join_next().await {}
126        while let Some(_) = self.event_join_set.join_next().await {}
127    }
128
129    async fn get_files_in_directory(path: &Path) -> tokio::io::Result<Vec<PathBuf>> {
130        let mut files: Vec<PathBuf> = Vec::new();
131        let mut entries = tokio::fs::read_dir(&path).await?;
132
133        while let Some(file) = entries.next_entry().await? {
134            let file = file.path();
135            if file.is_file() {
136                files.push(file);
137            }
138        }
139
140        Ok(files)
141    }
142}