samply_quota_manager/
quota_manager.rs1use std::collections::VecDeque;
2use std::fs;
3use std::path::Path;
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, SystemTime};
6
7use bytesize::ByteSize;
8use tokio::sync::Notify;
9use tokio::task::JoinHandle;
10
11use super::file_inventory::{FileInfo, FileInventory};
12
13pub struct QuotaManager {
19 settings: Arc<Mutex<EvictionSettings>>,
20 inventory: Arc<Mutex<FileInventory>>,
21 stop_signal_sender: tokio::sync::oneshot::Sender<()>,
23 eviction_signal_sender: Arc<tokio::sync::Notify>,
28 join_handle: JoinHandle<()>,
31}
32
33pub struct QuotaManagerNotifier {
36 inventory: Arc<Mutex<FileInventory>>,
37 eviction_signal_sender: Arc<tokio::sync::Notify>,
38}
39
40impl Clone for QuotaManagerNotifier {
41 fn clone(&self) -> Self {
42 Self {
43 inventory: self.inventory.clone(),
44 eviction_signal_sender: self.eviction_signal_sender.clone(),
45 }
46 }
47}
48
49#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
50struct EvictionSettings {
51 max_size_bytes: Option<u64>,
52 max_age_seconds: Option<u64>,
53}
54
55impl QuotaManager {
56 pub fn new(root_path: &Path, db_path: &Path) -> Result<Self, String> {
65 let root_path = root_path.to_path_buf();
66 let root_path_clone = root_path.clone();
67 let inventory = FileInventory::new(&root_path, db_path, move || {
68 Self::list_existing_files_sync(&root_path_clone)
69 })
70 .map_err(|e| format!("{e}"))?;
71 let inventory = Arc::new(Mutex::new(inventory));
72 let settings = Arc::new(Mutex::new(EvictionSettings::default()));
73
74 let (stop_signal_sender, stop_signal_receiver) = tokio::sync::oneshot::channel();
75 let eviction_signal_sender = Arc::new(Notify::new());
76
77 let eviction_thread_runner = QuotaManagerEvictionThread {
78 stop_signal_receiver,
79 eviction_signal_receiver: eviction_signal_sender.clone(),
80 inventory: Arc::clone(&inventory),
81 settings: Arc::clone(&settings),
82 };
83
84 let join_handle = tokio::spawn(eviction_thread_runner.run());
85
86 Ok(Self {
87 settings,
88 inventory,
89 stop_signal_sender,
90 eviction_signal_sender,
91 join_handle,
92 })
93 }
94
95 pub fn notifier(&self) -> QuotaManagerNotifier {
98 QuotaManagerNotifier {
99 inventory: Arc::clone(&self.inventory),
100 eviction_signal_sender: Arc::clone(&self.eviction_signal_sender),
101 }
102 }
103
104 pub async fn finish(self) {
107 let _ = self.stop_signal_sender.send(());
108 self.join_handle.await.unwrap()
109 }
110
111 pub fn set_max_total_size(&self, max_size_bytes: Option<u64>) {
115 self.settings.lock().unwrap().max_size_bytes = max_size_bytes;
116 }
117
118 pub fn set_max_age(&self, max_age_seconds: Option<u64>) {
123 self.settings.lock().unwrap().max_age_seconds = max_age_seconds;
124 }
125
126 fn list_existing_files_sync(dir: &Path) -> Vec<FileInfo> {
127 let mut files = Vec::new();
128 let mut dirs_to_visit = VecDeque::new();
129 dirs_to_visit.push_back(dir.to_path_buf());
130
131 while let Some(current_dir) = dirs_to_visit.pop_front() {
132 let entries = match fs::read_dir(¤t_dir) {
133 Ok(entries) => entries,
134 Err(e) => {
135 log::error!("Failed to read directory {:?}: {}", current_dir, e);
136 continue;
137 }
138 };
139 for entry in entries {
140 let path = match entry {
141 Ok(entry) => entry.path(),
142 Err(e) => {
143 log::error!("Failed to read directory entry in {:?}: {}", current_dir, e);
144 continue;
145 }
146 };
147
148 if path.is_dir() {
149 dirs_to_visit.push_back(path);
150 continue;
151 }
152 if !path.is_file() {
153 continue;
154 }
155
156 let metadata = match fs::metadata(&path) {
157 Ok(metadata) => metadata,
158 Err(e) => {
159 log::error!("Failed to query file size for {:?}: {}", path, e);
160 continue;
161 }
162 };
163 files.push(FileInfo {
164 path,
165 size_in_bytes: metadata.len(),
166 creation_time: metadata.created().ok().unwrap_or_else(SystemTime::now),
167 last_access_time: metadata.accessed().ok().unwrap_or_else(SystemTime::now),
168 });
169 }
170 }
171 log::info!("Found {} existing files in {:?}", files.len(), dir);
172 files
173 }
174}
175
176struct QuotaManagerEvictionThread {
177 stop_signal_receiver: tokio::sync::oneshot::Receiver<()>,
178 eviction_signal_receiver: Arc<tokio::sync::Notify>,
179 settings: Arc<Mutex<EvictionSettings>>,
180 inventory: Arc<Mutex<FileInventory>>,
181}
182
183impl QuotaManagerEvictionThread {
184 pub async fn run(mut self) {
185 loop {
186 tokio::select! {
187 _ = &mut self.stop_signal_receiver => {
188 return;
189 }
190 _ = self.eviction_signal_receiver.notified() => {
191 self.perform_eviction_if_needed().await;
192 }
193 }
194 }
195 }
196
197 async fn perform_eviction_if_needed(&self) {
198 let settings = *self.settings.lock().unwrap();
199 let total_size_before = self.inventory.lock().unwrap().total_size_in_bytes();
200 log::info!("Current total size: {}", ByteSize(total_size_before));
201
202 let files_to_delete_for_enforcing_max_size = match settings.max_size_bytes {
203 Some(max_size_bytes) => {
204 let inventory = self.inventory.lock().unwrap();
205 inventory.get_files_to_delete_to_enforce_max_size(max_size_bytes)
206 }
207 None => vec![],
208 };
209
210 if !files_to_delete_for_enforcing_max_size.is_empty() {
211 self.delete_files(files_to_delete_for_enforcing_max_size)
212 .await;
213 let total_size = self.inventory.lock().unwrap().total_size_in_bytes();
214 log::info!("Current total size: {}", ByteSize(total_size));
215 }
216
217 let files_to_delete_for_enforcing_max_age = match settings.max_age_seconds {
218 Some(max_age_seconds) => {
219 let cutoff_time = SystemTime::now() - Duration::from_secs(max_age_seconds);
220 let inventory = self.inventory.lock().unwrap();
221 inventory.get_files_last_accessed_before(cutoff_time)
222 }
223 None => vec![],
224 };
225
226 if !files_to_delete_for_enforcing_max_age.is_empty() {
227 self.delete_files(files_to_delete_for_enforcing_max_age)
228 .await;
229 let total_size = self.inventory.lock().unwrap().total_size_in_bytes();
230 log::info!("Current total size: {}", ByteSize(total_size));
231 }
232 }
233
234 async fn delete_files(&self, files: Vec<FileInfo>) {
235 for file_info in files {
236 log::info!(
237 "Deleting file {:?} ({})",
238 file_info.path,
239 ByteSize(file_info.size_in_bytes)
240 );
241 match tokio::fs::remove_file(&file_info.path).await {
242 Ok(()) => {
243 let mut inventory = self.inventory.lock().unwrap();
244 inventory.on_file_deleted(&file_info.path);
245 }
246 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
247 let mut inventory = self.inventory.lock().unwrap();
248 inventory.on_file_found_to_be_absent(&file_info.path);
249 }
250 Err(e) => {
251 log::error!("Error when deleting {:?}: {}", file_info.path, e);
252 }
253 }
254 }
256 }
257}
258
259impl QuotaManagerNotifier {
260 pub fn trigger_eviction_if_needed(&self) {
263 self.eviction_signal_sender.notify_one();
264 }
265
266 pub fn on_file_created(&self, path: &Path, size_in_bytes: u64, creation_time: SystemTime) {
270 let mut inventory = self.inventory.lock().unwrap();
271 inventory.on_file_created(path, size_in_bytes, creation_time);
272 }
273
274 pub fn on_file_accessed(&self, path: &Path, access_time: SystemTime) {
278 let mut inventory = self.inventory.lock().unwrap();
279 inventory.on_file_accessed(path, access_time);
280 }
281
282 pub fn on_file_deleted(&self, path: &Path) {
287 let mut inventory = self.inventory.lock().unwrap();
288 inventory.on_file_deleted(path);
289 }
290}