samply_quota_manager/
quota_manager.rs

1use std::collections::VecDeque;
2use std::fs;
3use std::path::Path;
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, SystemTime};
6
7use bytesize::ByteSize;
8use tokio::sync::Notify;
9use tokio::task::JoinHandle;
10
11use super::file_inventory::{FileInfo, FileInventory};
12
13/// Evicts least-recently-used files in a managed directory when asked to do so.
14///
15/// `QuotaManager` does not observe file system mutations! You have to tell it
16/// about any files you create or delete in this directory. It stores this
17/// information in a sqlite database file at a path of your choosing.
18pub struct QuotaManager {
19    settings: Arc<Mutex<EvictionSettings>>,
20    inventory: Arc<Mutex<FileInventory>>,
21    /// Sent from QuotaManager::finish
22    stop_signal_sender: tokio::sync::oneshot::Sender<()>,
23    /// Sent whenever a file is added. A `Notify` has the semantics
24    /// we want if the sender side notifies more frequently than the
25    /// receiver side can check; multiple notifications are coalesced
26    /// into one.
27    eviction_signal_sender: Arc<tokio::sync::Notify>,
28    /// The join handle for the tokio task that receives the signals
29    /// and deletes the files. Stored here so that finish() can block on it.
30    join_handle: JoinHandle<()>,
31}
32
33/// Used to initiate a [`QuotaManager`] eviction, and to tell it about
34/// the creation and access of files. `Send` and `Sync`.
35pub struct QuotaManagerNotifier {
36    inventory: Arc<Mutex<FileInventory>>,
37    eviction_signal_sender: Arc<tokio::sync::Notify>,
38}
39
40impl Clone for QuotaManagerNotifier {
41    fn clone(&self) -> Self {
42        Self {
43            inventory: self.inventory.clone(),
44            eviction_signal_sender: self.eviction_signal_sender.clone(),
45        }
46    }
47}
48
49#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
50struct EvictionSettings {
51    max_size_bytes: Option<u64>,
52    max_age_seconds: Option<u64>,
53}
54
55impl QuotaManager {
56    /// Create an instance for the managed directory given by `root_path`.
57    ///
58    /// Uses the database at `db_path` to store information about the files
59    /// under this directory, and creates it if it doesn't exist yet,
60    /// prepopulating it with information about the current files in the
61    /// managed directory.
62    ///
63    /// Both root_path and the parent directory of db_path must already exist.
64    pub fn new(root_path: &Path, db_path: &Path) -> Result<Self, String> {
65        let root_path = root_path.to_path_buf();
66        let root_path_clone = root_path.clone();
67        let inventory = FileInventory::new(&root_path, db_path, move || {
68            Self::list_existing_files_sync(&root_path_clone)
69        })
70        .map_err(|e| format!("{e}"))?;
71        let inventory = Arc::new(Mutex::new(inventory));
72        let settings = Arc::new(Mutex::new(EvictionSettings::default()));
73
74        let (stop_signal_sender, stop_signal_receiver) = tokio::sync::oneshot::channel();
75        let eviction_signal_sender = Arc::new(Notify::new());
76
77        let eviction_thread_runner = QuotaManagerEvictionThread {
78            stop_signal_receiver,
79            eviction_signal_receiver: eviction_signal_sender.clone(),
80            inventory: Arc::clone(&inventory),
81            settings: Arc::clone(&settings),
82        };
83
84        let join_handle = tokio::spawn(eviction_thread_runner.run());
85
86        Ok(Self {
87            settings,
88            inventory,
89            stop_signal_sender,
90            eviction_signal_sender,
91            join_handle,
92        })
93    }
94
95    /// Returns a new `QuotaManagerNotifier`. This is how you tell this
96    /// `QuotaManager` about file creations / accesses
97    pub fn notifier(&self) -> QuotaManagerNotifier {
98        QuotaManagerNotifier {
99            inventory: Arc::clone(&self.inventory),
100            eviction_signal_sender: Arc::clone(&self.eviction_signal_sender),
101        }
102    }
103
104    /// Stops the background task that does the evictions, and waits for
105    /// any currently-running eviction to finish.
106    pub async fn finish(self) {
107        let _ = self.stop_signal_sender.send(());
108        self.join_handle.await.unwrap()
109    }
110
111    /// Change the desired maximum total size of the managed directory, in bytes.
112    ///
113    /// Respected during the next eviction.
114    pub fn set_max_total_size(&self, max_size_bytes: Option<u64>) {
115        self.settings.lock().unwrap().max_size_bytes = max_size_bytes;
116    }
117
118    /// Change the desired maximum age of tracked files in the managed directory,
119    /// in seconds.
120    ///
121    /// Respected during the next eviction.
122    pub fn set_max_age(&self, max_age_seconds: Option<u64>) {
123        self.settings.lock().unwrap().max_age_seconds = max_age_seconds;
124    }
125
126    fn list_existing_files_sync(dir: &Path) -> Vec<FileInfo> {
127        let mut files = Vec::new();
128        let mut dirs_to_visit = VecDeque::new();
129        dirs_to_visit.push_back(dir.to_path_buf());
130
131        while let Some(current_dir) = dirs_to_visit.pop_front() {
132            let entries = match fs::read_dir(&current_dir) {
133                Ok(entries) => entries,
134                Err(e) => {
135                    log::error!("Failed to read directory {:?}: {}", current_dir, e);
136                    continue;
137                }
138            };
139            for entry in entries {
140                let path = match entry {
141                    Ok(entry) => entry.path(),
142                    Err(e) => {
143                        log::error!("Failed to read directory entry in {:?}: {}", current_dir, e);
144                        continue;
145                    }
146                };
147
148                if path.is_dir() {
149                    dirs_to_visit.push_back(path);
150                    continue;
151                }
152                if !path.is_file() {
153                    continue;
154                }
155
156                let metadata = match fs::metadata(&path) {
157                    Ok(metadata) => metadata,
158                    Err(e) => {
159                        log::error!("Failed to query file size for {:?}: {}", path, e);
160                        continue;
161                    }
162                };
163                files.push(FileInfo {
164                    path,
165                    size_in_bytes: metadata.len(),
166                    creation_time: metadata.created().ok().unwrap_or_else(SystemTime::now),
167                    last_access_time: metadata.accessed().ok().unwrap_or_else(SystemTime::now),
168                });
169            }
170        }
171        log::info!("Found {} existing files in {:?}", files.len(), dir);
172        files
173    }
174}
175
176struct QuotaManagerEvictionThread {
177    stop_signal_receiver: tokio::sync::oneshot::Receiver<()>,
178    eviction_signal_receiver: Arc<tokio::sync::Notify>,
179    settings: Arc<Mutex<EvictionSettings>>,
180    inventory: Arc<Mutex<FileInventory>>,
181}
182
183impl QuotaManagerEvictionThread {
184    pub async fn run(mut self) {
185        loop {
186            tokio::select! {
187                _ = &mut self.stop_signal_receiver => {
188                    return;
189                }
190                _ = self.eviction_signal_receiver.notified() => {
191                    self.perform_eviction_if_needed().await;
192                }
193            }
194        }
195    }
196
197    async fn perform_eviction_if_needed(&self) {
198        let settings = *self.settings.lock().unwrap();
199        let total_size_before = self.inventory.lock().unwrap().total_size_in_bytes();
200        log::info!("Current total size: {}", ByteSize(total_size_before));
201
202        let files_to_delete_for_enforcing_max_size = match settings.max_size_bytes {
203            Some(max_size_bytes) => {
204                let inventory = self.inventory.lock().unwrap();
205                inventory.get_files_to_delete_to_enforce_max_size(max_size_bytes)
206            }
207            None => vec![],
208        };
209
210        if !files_to_delete_for_enforcing_max_size.is_empty() {
211            self.delete_files(files_to_delete_for_enforcing_max_size)
212                .await;
213            let total_size = self.inventory.lock().unwrap().total_size_in_bytes();
214            log::info!("Current total size: {}", ByteSize(total_size));
215        }
216
217        let files_to_delete_for_enforcing_max_age = match settings.max_age_seconds {
218            Some(max_age_seconds) => {
219                let cutoff_time = SystemTime::now() - Duration::from_secs(max_age_seconds);
220                let inventory = self.inventory.lock().unwrap();
221                inventory.get_files_last_accessed_before(cutoff_time)
222            }
223            None => vec![],
224        };
225
226        if !files_to_delete_for_enforcing_max_age.is_empty() {
227            self.delete_files(files_to_delete_for_enforcing_max_age)
228                .await;
229            let total_size = self.inventory.lock().unwrap().total_size_in_bytes();
230            log::info!("Current total size: {}", ByteSize(total_size));
231        }
232    }
233
234    async fn delete_files(&self, files: Vec<FileInfo>) {
235        for file_info in files {
236            log::info!(
237                "Deleting file {:?} ({})",
238                file_info.path,
239                ByteSize(file_info.size_in_bytes)
240            );
241            match tokio::fs::remove_file(&file_info.path).await {
242                Ok(()) => {
243                    let mut inventory = self.inventory.lock().unwrap();
244                    inventory.on_file_deleted(&file_info.path);
245                }
246                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
247                    let mut inventory = self.inventory.lock().unwrap();
248                    inventory.on_file_found_to_be_absent(&file_info.path);
249                }
250                Err(e) => {
251                    log::error!("Error when deleting {:?}: {}", file_info.path, e);
252                }
253            }
254            // TODO: delete containing directory if empty
255        }
256    }
257}
258
259impl QuotaManagerNotifier {
260    /// Trigger an eviction. The eviction runs asynchronously in a single
261    /// shared eviction task and uses the current eviction settings.
262    pub fn trigger_eviction_if_needed(&self) {
263        self.eviction_signal_sender.notify_one();
264    }
265
266    /// You must call this whenever a new file gets added to the managed directory.
267    ///
268    /// Calls for files outside the managed directory are ignored.
269    pub fn on_file_created(&self, path: &Path, size_in_bytes: u64, creation_time: SystemTime) {
270        let mut inventory = self.inventory.lock().unwrap();
271        inventory.on_file_created(path, size_in_bytes, creation_time);
272    }
273
274    /// You must call this whenever a file in the managed directory is accessed.
275    ///
276    /// Calls for files outside the managed directory are ignored.
277    pub fn on_file_accessed(&self, path: &Path, access_time: SystemTime) {
278        let mut inventory = self.inventory.lock().unwrap();
279        inventory.on_file_accessed(path, access_time);
280    }
281
282    /// You usually don't need to call this because we expect you to leave any
283    /// deleting to the [`QuotaManager`]. But if you do delete any files in the
284    /// managed directory yourself, call this method so that the [`QuotaManager`]
285    /// can update its information.
286    pub fn on_file_deleted(&self, path: &Path) {
287        let mut inventory = self.inventory.lock().unwrap();
288        inventory.on_file_deleted(path);
289    }
290}