rust_integration_services/file/
file_receiver.rs

1use std::{collections::{HashMap, HashSet}, path::{Path, PathBuf}, pin::Pin, sync::Arc, time::Duration};
2use regex::Regex;
3use tokio::{signal::unix::{signal, SignalKind}, sync::Mutex, task::JoinSet};
4use uuid::Uuid;
5
6use crate::common::event_handler::EventHandler;
7
8type FileCallback = Arc<dyn Fn(String, PathBuf) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
9
10#[derive(Clone)]
11pub enum FileReceiverEventSignal {
12    OnFileReceived(String, PathBuf),
13}
14
15pub struct FileReceiver {
16    source_path: PathBuf,
17    filter: HashMap<String, FileCallback>,
18    event_handler: EventHandler<FileReceiverEventSignal>,
19    event_join_set: JoinSet<()>,
20}
21
22impl FileReceiver {
23    pub fn new<T: AsRef<Path>>(source_path: T) -> Self {
24        FileReceiver {
25            source_path: source_path.as_ref().to_path_buf(),
26            filter: HashMap::new(),
27            event_handler: EventHandler::new(),
28            event_join_set: JoinSet::new(),
29        }
30    }
31
32    /// Callback that returns all file paths from the filter using regular expression.
33    pub fn filter<T, Fut, S>(mut self, filter: S, callback: T) -> Self 
34    where
35        T: Fn(String, PathBuf) -> Fut + Send + Sync + 'static,
36        Fut: Future<Output = ()> + Send + 'static,
37        S: AsRef<str>,
38    {
39        Regex::new(filter.as_ref()).expect("Invalid Regex!");
40        self.filter.insert(filter.as_ref().to_string(), Arc::new(move |uuid, path| Box::pin(callback(uuid, path))));
41        self
42    }
43
44    pub fn on_event<T, Fut>(mut self, handler: T) -> Self
45    where
46        T: Fn(FileReceiverEventSignal) -> Fut + Send + Sync + 'static,
47        Fut: Future<Output = ()> + Send + 'static,
48    {
49        self.event_join_set = self.event_handler.init(handler);
50        self
51    }
52
53    pub async fn receive(mut self) {
54        let mut receiver_join_set = JoinSet::new();
55        let mut interval = tokio::time::interval(Duration::from_millis(1500));
56        let mut sigterm = signal(SignalKind::terminate()).expect("Failed to start SIGTERM signal receiver");
57        let mut sigint = signal(SignalKind::interrupt()).expect("Failed to start SIGINT signal receiver");
58        let lock_list = Arc::new(Mutex::new(HashSet::<String>::new()));
59        let filter_map = Arc::new(self.filter.clone());
60        let mut size_map = HashMap::<String, u64>::new();
61        
62        loop {
63            tokio::select! {
64                _ = sigterm.recv() => break,
65                _ = sigint.recv() => break,
66                _ = async {} => {
67                    if let Ok(files) = Self::get_files_in_directory(&self.source_path).await {
68                        for file_path in &files {
69                            for (filter, callback) in filter_map.iter() {
70                                let file_name = file_path.file_name().unwrap().to_str().unwrap().to_string();
71                                
72                                // Check if the filter regex matches the file.
73                                let regex = Regex::new(filter).unwrap();
74                                if !regex.is_match(&file_name) {
75                                    continue;
76                                }
77                                
78                                // Check if the file is being copied by comparing the current size with the previous polling size.
79                                let size = tokio::fs::metadata(file_path).await.unwrap().len();
80                                match size_map.get_mut(&file_name) {
81                                    Some(old_size) => {
82                                        if size > *old_size {
83                                            *old_size = size;
84                                            continue;
85                                        }
86                                        size_map.remove(&file_name);
87                                    }
88                                    None => {
89                                        size_map.insert(file_name, size);
90                                        continue;
91                                    }
92                                }
93    
94                                // Add file to a locked list to avoid multiple tasks processing the same file.
95                                let mut unlocked_list = lock_list.lock().await;
96                                if unlocked_list.contains(&file_name) {
97                                    continue;
98                                }
99                                unlocked_list.insert(file_name.clone());
100                                drop(unlocked_list);
101                                
102                                let callback = Arc::clone(&callback);
103                                let lock_list = Arc::clone(&lock_list);
104                                let file_path = Arc::new(file_path.to_path_buf());
105                                let uuid = Uuid::new_v4().to_string();
106                                let event_broadcast = Arc::new(self.event_handler.broadcast());
107                            
108                                event_broadcast.send(FileReceiverEventSignal::OnFileReceived(uuid.clone(), file_path.to_path_buf())).await.unwrap();
109                                receiver_join_set.spawn(async move {
110                                    callback(uuid, file_path.to_path_buf()).await;
111                                    let mut unlocked_list = lock_list.lock().await;
112                                    unlocked_list.remove(&file_name);
113                                });
114                            }
115                        }
116                    }
117
118                    interval.tick().await;
119                }
120            }
121        }
122
123        while let Some(_) = receiver_join_set.join_next().await {}
124        while let Some(_) = self.event_join_set.join_next().await {}
125    }
126
127    async fn get_files_in_directory(path: &Path) -> tokio::io::Result<Vec<PathBuf>> {
128        let mut files: Vec<PathBuf> = Vec::new();
129        let mut entries = tokio::fs::read_dir(&path).await?;
130
131        while let Some(file) = entries.next_entry().await? {
132            let file = file.path();
133            if file.is_file() {
134                files.push(file);
135            }
136        }
137
138        Ok(files)
139    }
140}