rust_integration_services/file/
file_receiver.rs

1use std::{collections::{HashMap, HashSet}, panic::AssertUnwindSafe, path::{Path, PathBuf}, pin::Pin, sync::Arc, time::Duration};
2use futures::FutureExt;
3use regex::Regex;
4use tokio::{signal::unix::{signal, SignalKind}, sync::Mutex, task::JoinSet};
5use uuid::Uuid;
6
7type FileCallback = Arc<dyn Fn(String, PathBuf) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
8
9pub struct FileReceiver {
10    source_path: PathBuf,
11    routes: Vec<(Regex, FileCallback)>,
12}
13
14impl FileReceiver {
15    pub fn new<T: AsRef<Path>>(source_path: T) -> Self {
16        FileReceiver {
17            source_path: source_path.as_ref().to_path_buf(),
18            routes: Vec::new(),
19        }
20    }
21
22    /// Callback that returns all file paths from the route using regular expression.
23    pub fn route<T, Fut, S>(mut self, filter: S, callback: T) -> Self 
24    where
25        T: Fn(String, PathBuf) -> Fut + Send + Sync + 'static,
26        Fut: Future<Output = ()> + Send + 'static,
27        S: AsRef<str>,
28    {
29        if self.routes.iter().any(|(r, _)| r.as_str() == filter.as_ref()) {
30            panic!("Route already exists with filter {}", filter.as_ref())
31        }
32        let regex = Regex::new(filter.as_ref()).expect("Invalid Regex");
33        self.routes.push((regex, Arc::new(move |uuid, path| Box::pin(callback(uuid, path)))));
34        self
35    }
36
37    pub async fn receive(self) {
38        let mut receiver_join_set = JoinSet::new();
39        let mut interval = tokio::time::interval(Duration::from_millis(1500));
40        let mut sigterm = signal(SignalKind::terminate()).expect("Failed to start SIGTERM signal receiver");
41        let mut sigint = signal(SignalKind::interrupt()).expect("Failed to start SIGINT signal receiver");
42        let lock_list = Arc::new(Mutex::new(HashSet::<String>::new()));
43        let route_map = Arc::new(self.routes.clone());
44        let mut size_map = HashMap::<String, u64>::new();
45        
46        log::trace!("started on {:?}", self.source_path);
47        loop {
48            tokio::select! {
49                _ = sigterm.recv() => break,
50                _ = sigint.recv() => break,
51                _ = async {} => {
52                    if let Ok(files) = Self::get_files_in_directory(&self.source_path).await {
53                        for file_path in &files {
54                            for (route, callback) in route_map.iter() {
55                                let file_name = file_path.file_name().unwrap().to_str().unwrap().to_string();
56                                
57                                // Check if the filter regex matches the file.
58                                if !route.is_match(&file_name) {
59                                    continue;
60                                }
61                                
62                                // Check if the file is being copied by comparing the current size with the previous polling size.
63                                let size = tokio::fs::metadata(file_path).await.unwrap().len();
64                                match size_map.get_mut(&file_name) {
65                                    Some(old_size) => {
66                                        if size > *old_size {
67                                            *old_size = size;
68                                            continue;
69                                        }
70                                        size_map.remove(&file_name);
71                                    }
72                                    None => {
73                                        size_map.insert(file_name, size);
74                                        continue;
75                                    }
76                                }
77    
78                                // Add file to a locked list to avoid multiple tasks processing the same file.
79                                let mut unlocked_list = lock_list.lock().await;
80                                if unlocked_list.contains(&file_name) {
81                                    continue;
82                                }
83                                unlocked_list.insert(file_name.clone());
84                                drop(unlocked_list);
85                                
86                                let callback = Arc::clone(&callback);
87                                let lock_list = Arc::clone(&lock_list);
88                                let file_path = Arc::new(file_path.to_path_buf());
89                                let uuid = Uuid::new_v4().to_string();
90                            
91                                log::trace!("[{}] file received at {:?}", uuid, file_path);
92                                receiver_join_set.spawn(async move {
93                                    let callback_fut = callback(uuid.to_string(), file_path.to_path_buf());
94                                    let result = AssertUnwindSafe(callback_fut).catch_unwind().await;
95                                    match result {
96                                        Ok(_) => log::trace!("[{}] file processed", uuid),
97                                        Err(err) => log::error!("[{}] {:?}", uuid, err),
98                                    };
99
100                                    let mut unlocked_list = lock_list.lock().await;
101                                    unlocked_list.remove(&file_name);
102                                });
103                            }
104                        }
105                    }
106
107                    interval.tick().await;
108                }
109            }
110        }
111
112        log::trace!("shut down pending...");
113        while let Some(_) = receiver_join_set.join_next().await {}
114        log::trace!("shut down complete");
115    }
116
117    async fn get_files_in_directory(path: &Path) -> tokio::io::Result<Vec<PathBuf>> {
118        let mut files: Vec<PathBuf> = Vec::new();
119        let mut entries = tokio::fs::read_dir(&path).await?;
120
121        while let Some(file) = entries.next_entry().await? {
122            let file = file.path();
123            if file.is_file() {
124                files.push(file);
125            }
126        }
127
128        Ok(files)
129    }
130}