openlibspot_core/
cache.rs

1use std::{
2    cmp::Reverse,
3    collections::HashMap,
4    fs::{self, File},
5    io::{self, Read, Write},
6    path::{Path, PathBuf},
7    sync::Arc,
8    time::SystemTime,
9};
10
11use parking_lot::Mutex;
12use priority_queue::PriorityQueue;
13use thiserror::Error;
14
15use crate::{authentication::Credentials, error::ErrorKind, Error, FileId};
16
17#[derive(Debug, Error)]
18pub enum CacheError {
19    #[error("audio cache location is not configured")]
20    Path,
21}
22
23impl From<CacheError> for Error {
24    fn from(err: CacheError) -> Self {
25        Error::failed_precondition(err)
26    }
27}
28
29/// Some kind of data structure that holds some paths, the size of these files and a timestamp.
30/// It keeps track of the file sizes and is able to pop the path with the oldest timestamp if
31/// a given limit is exceeded.
32struct SizeLimiter {
33    queue: PriorityQueue<PathBuf, Reverse<SystemTime>>,
34    sizes: HashMap<PathBuf, u64>,
35    size_limit: u64,
36    in_use: u64,
37}
38
39impl SizeLimiter {
40    /// Creates a new instance with the given size limit.
41    fn new(limit: u64) -> Self {
42        Self {
43            queue: PriorityQueue::new(),
44            sizes: HashMap::new(),
45            size_limit: limit,
46            in_use: 0,
47        }
48    }
49
50    /// Adds an entry to this data structure.
51    ///
52    /// If this file is already contained, it will be updated accordingly.
53    fn add(&mut self, file: &Path, size: u64, accessed: SystemTime) {
54        self.in_use += size;
55        self.queue.push(file.to_owned(), Reverse(accessed));
56        if let Some(old_size) = self.sizes.insert(file.to_owned(), size) {
57            // It's important that decreasing happens after
58            // increasing the size, to prevent an overflow.
59            self.in_use -= old_size;
60        }
61    }
62
63    /// Returns true if the limit is exceeded.
64    fn exceeds_limit(&self) -> bool {
65        self.in_use > self.size_limit
66    }
67
68    /// Returns the least recently accessed file if the size of the cache exceeds
69    /// the limit.
70    ///
71    /// The entry is removed from the data structure, but the caller is responsible
72    /// to delete the file in the file system.
73    fn pop(&mut self) -> Option<PathBuf> {
74        if self.exceeds_limit() {
75            if let Some((next, _)) = self.queue.pop() {
76                if let Some(size) = self.sizes.remove(&next) {
77                    self.in_use -= size;
78                } else {
79                    error!("`queue` and `sizes` should have the same keys.");
80                }
81                Some(next)
82            } else {
83                error!("in_use was > 0, so the queue should have contained an item.");
84                None
85            }
86        } else {
87            None
88        }
89    }
90
91    /// Updates the timestamp of an existing element. Returns `true` if the item did exist.
92    fn update(&mut self, file: &Path, access_time: SystemTime) -> bool {
93        self.queue
94            .change_priority(file, Reverse(access_time))
95            .is_some()
96    }
97
98    /// Removes an element with the specified path. Returns `true` if the item did exist.
99    fn remove(&mut self, file: &Path) -> bool {
100        if self.queue.remove(file).is_none() {
101            return false;
102        }
103
104        if let Some(size) = self.sizes.remove(file) {
105            self.in_use -= size;
106        } else {
107            error!("`queue` and `sizes` should have the same keys.");
108        }
109
110        true
111    }
112}
113
114struct FsSizeLimiter {
115    limiter: Mutex<SizeLimiter>,
116}
117
118impl FsSizeLimiter {
119    /// Returns access time and file size of a given path.
120    fn get_metadata(file: &Path) -> io::Result<(SystemTime, u64)> {
121        let metadata = file.metadata()?;
122
123        // The first of the following timestamps which is available will be chosen as access time:
124        // 1. Access time
125        // 2. Modification time
126        // 3. Creation time
127        // 4. Current time
128        let access_time = metadata
129            .accessed()
130            .or_else(|_| metadata.modified())
131            .or_else(|_| metadata.created())
132            .unwrap_or_else(|_| SystemTime::now());
133
134        let size = metadata.len();
135
136        Ok((access_time, size))
137    }
138
139    /// Recursively search a directory for files and add them to the `limiter` struct.
140    fn init_dir(limiter: &mut SizeLimiter, path: &Path) {
141        let list_dir = match fs::read_dir(path) {
142            Ok(list_dir) => list_dir,
143            Err(e) => {
144                warn!("Could not read directory {:?} in cache dir: {}", path, e);
145                return;
146            }
147        };
148
149        for entry in list_dir {
150            let entry = match entry {
151                Ok(entry) => entry,
152                Err(e) => {
153                    warn!("Could not directory {:?} in cache dir: {}", path, e);
154                    return;
155                }
156            };
157
158            match entry.file_type() {
159                Ok(file_type) if file_type.is_dir() || file_type.is_symlink() => {
160                    Self::init_dir(limiter, &entry.path())
161                }
162                Ok(file_type) if file_type.is_file() => {
163                    let path = entry.path();
164                    match Self::get_metadata(&path) {
165                        Ok((access_time, size)) => {
166                            limiter.add(&path, size, access_time);
167                        }
168                        Err(e) => {
169                            warn!("Could not read file {:?} in cache dir: {}", path, e)
170                        }
171                    }
172                }
173                Ok(ft) => {
174                    warn!(
175                        "File {:?} in cache dir has unsupported type {:?}",
176                        entry.path(),
177                        ft
178                    )
179                }
180                Err(e) => {
181                    warn!(
182                        "Could not get type of file {:?} in cache dir: {}",
183                        entry.path(),
184                        e
185                    )
186                }
187            };
188        }
189    }
190
191    fn add(&self, file: &Path, size: u64) {
192        self.limiter.lock().add(file, size, SystemTime::now())
193    }
194
195    fn touch(&self, file: &Path) -> bool {
196        self.limiter.lock().update(file, SystemTime::now())
197    }
198
199    fn remove(&self, file: &Path) -> bool {
200        self.limiter.lock().remove(file)
201    }
202
203    fn prune_internal<F: FnMut() -> Option<PathBuf>>(mut pop: F) -> Result<(), Error> {
204        let mut first = true;
205        let mut count = 0;
206        let mut last_error = None;
207
208        while let Some(file) = pop() {
209            if first {
210                debug!("Cache dir exceeds limit, removing least recently used files.");
211                first = false;
212            }
213
214            let res = fs::remove_file(&file);
215            if let Err(e) = res {
216                warn!("Could not remove file {:?} from cache dir: {}", file, e);
217                last_error = Some(e);
218            } else {
219                count += 1;
220            }
221        }
222
223        if count > 0 {
224            info!("Removed {} cache files.", count);
225        }
226
227        if let Some(err) = last_error {
228            Err(err.into())
229        } else {
230            Ok(())
231        }
232    }
233
234    fn prune(&self) -> Result<(), Error> {
235        Self::prune_internal(|| self.limiter.lock().pop())
236    }
237
238    fn new(path: &Path, limit: u64) -> Result<Self, Error> {
239        let mut limiter = SizeLimiter::new(limit);
240
241        Self::init_dir(&mut limiter, path);
242        Self::prune_internal(|| limiter.pop())?;
243
244        Ok(Self {
245            limiter: Mutex::new(limiter),
246        })
247    }
248}
249
250/// A cache for volume, credentials and audio files.
251#[derive(Clone)]
252pub struct Cache {
253    credentials_location: Option<PathBuf>,
254    volume_location: Option<PathBuf>,
255    audio_location: Option<PathBuf>,
256    size_limiter: Option<Arc<FsSizeLimiter>>,
257}
258
259impl Cache {
260    pub fn new<P: AsRef<Path>>(
261        credentials_path: Option<P>,
262        volume_path: Option<P>,
263        audio_path: Option<P>,
264        size_limit: Option<u64>,
265    ) -> Result<Self, Error> {
266        let mut size_limiter = None;
267
268        if let Some(location) = &credentials_path {
269            fs::create_dir_all(location)?;
270        }
271
272        let credentials_location = credentials_path
273            .as_ref()
274            .map(|p| p.as_ref().join("credentials.json"));
275
276        if let Some(location) = &volume_path {
277            fs::create_dir_all(location)?;
278        }
279
280        let volume_location = volume_path.as_ref().map(|p| p.as_ref().join("volume"));
281
282        if let Some(location) = &audio_path {
283            fs::create_dir_all(location)?;
284
285            if let Some(limit) = size_limit {
286                let limiter = FsSizeLimiter::new(location.as_ref(), limit)?;
287                size_limiter = Some(Arc::new(limiter));
288            }
289        }
290
291        let audio_location = audio_path.map(|p| p.as_ref().to_owned());
292
293        let cache = Cache {
294            credentials_location,
295            volume_location,
296            audio_location,
297            size_limiter,
298        };
299
300        Ok(cache)
301    }
302
303    pub fn credentials(&self) -> Option<Credentials> {
304        let location = self.credentials_location.as_ref()?;
305
306        // This closure is just convencience to enable the question mark operator
307        let read = || -> Result<Credentials, Error> {
308            let mut file = File::open(location)?;
309            let mut contents = String::new();
310            file.read_to_string(&mut contents)?;
311            Ok(serde_json::from_str(&contents)?)
312        };
313
314        match read() {
315            Ok(c) => Some(c),
316            Err(e) => {
317                // If the file did not exist, the file was probably not written
318                // before. Otherwise, log the error.
319                if e.kind != ErrorKind::NotFound {
320                    warn!("Error reading credentials from cache: {}", e);
321                }
322                None
323            }
324        }
325    }
326
327    pub fn save_credentials(&self, cred: &Credentials) {
328        if let Some(location) = &self.credentials_location {
329            let result = File::create(location).and_then(|mut file| {
330                let data = serde_json::to_string(cred)?;
331                write!(file, "{data}")
332            });
333
334            if let Err(e) = result {
335                warn!("Cannot save credentials to cache: {}", e)
336            }
337        }
338    }
339
340    pub fn volume(&self) -> Option<u16> {
341        let location = self.volume_location.as_ref()?;
342
343        let read = || -> Result<u16, Error> {
344            let mut file = File::open(location)?;
345            let mut contents = String::new();
346            file.read_to_string(&mut contents)?;
347            Ok(contents.parse()?)
348        };
349
350        match read() {
351            Ok(v) => Some(v),
352            Err(e) => {
353                if e.kind != ErrorKind::NotFound {
354                    warn!("Error reading volume from cache: {}", e);
355                }
356                None
357            }
358        }
359    }
360
361    pub fn save_volume(&self, volume: u16) {
362        if let Some(ref location) = self.volume_location {
363            let result = File::create(location).and_then(|mut file| write!(file, "{volume}"));
364            if let Err(e) = result {
365                warn!("Cannot save volume to cache: {}", e);
366            }
367        }
368    }
369
370    pub fn file_path(&self, file: FileId) -> Option<PathBuf> {
371        match file.to_base16() {
372            Ok(name) => self.audio_location.as_ref().map(|location| {
373                let mut path = location.join(&name[0..2]);
374                path.push(&name[2..]);
375                path
376            }),
377            Err(e) => {
378                warn!("Invalid FileId: {}", e);
379                None
380            }
381        }
382    }
383
384    pub fn file(&self, file: FileId) -> Option<File> {
385        let path = self.file_path(file)?;
386        match File::open(&path) {
387            Ok(file) => {
388                if let Some(limiter) = self.size_limiter.as_deref() {
389                    if !limiter.touch(&path) {
390                        error!("limiter could not touch {:?}", path);
391                    }
392                }
393                Some(file)
394            }
395            Err(e) => {
396                if e.kind() != io::ErrorKind::NotFound {
397                    warn!("Error reading file from cache: {}", e)
398                }
399                None
400            }
401        }
402    }
403
404    pub fn save_file<F: Read>(&self, file: FileId, contents: &mut F) -> Result<PathBuf, Error> {
405        if let Some(path) = self.file_path(file) {
406            if let Some(parent) = path.parent() {
407                if let Ok(size) = fs::create_dir_all(parent)
408                    .and_then(|_| File::create(&path))
409                    .and_then(|mut file| io::copy(contents, &mut file))
410                {
411                    if let Some(limiter) = self.size_limiter.as_deref() {
412                        limiter.add(&path, size);
413                        limiter.prune()?;
414                    }
415                    return Ok(path);
416                }
417            }
418        }
419        Err(CacheError::Path.into())
420    }
421
422    pub fn remove_file(&self, file: FileId) -> Result<(), Error> {
423        let path = self.file_path(file).ok_or(CacheError::Path)?;
424
425        fs::remove_file(&path)?;
426        if let Some(limiter) = self.size_limiter.as_deref() {
427            limiter.remove(&path);
428        }
429
430        Ok(())
431    }
432}
433
434#[cfg(test)]
435mod test {
436    use super::*;
437    use std::time::Duration;
438
439    fn ordered_time(v: u64) -> SystemTime {
440        SystemTime::UNIX_EPOCH + Duration::from_secs(v)
441    }
442
443    #[test]
444    fn test_size_limiter() {
445        let mut limiter = SizeLimiter::new(1000);
446
447        limiter.add(Path::new("a"), 500, ordered_time(2));
448        limiter.add(Path::new("b"), 500, ordered_time(1));
449
450        // b (500) -> a (500)  => sum: 1000 <= 1000
451        assert!(!limiter.exceeds_limit());
452        assert_eq!(limiter.pop(), None);
453
454        limiter.add(Path::new("c"), 1000, ordered_time(3));
455
456        // b (500) -> a (500) -> c (1000)  => sum: 2000 > 1000
457        assert!(limiter.exceeds_limit());
458        assert_eq!(limiter.pop().as_deref(), Some(Path::new("b")));
459        // a (500) -> c (1000)  => sum: 1500 > 1000
460        assert_eq!(limiter.pop().as_deref(), Some(Path::new("a")));
461        // c (1000)   => sum: 1000 <= 1000
462        assert_eq!(limiter.pop().as_deref(), None);
463
464        limiter.add(Path::new("d"), 5, ordered_time(2));
465        // d (5) -> c (1000) => sum: 1005 > 1000
466        assert_eq!(limiter.pop().as_deref(), Some(Path::new("d")));
467        // c (1000)   => sum: 1000 <= 1000
468        assert_eq!(limiter.pop().as_deref(), None);
469
470        // Test updating
471
472        limiter.add(Path::new("e"), 500, ordered_time(3));
473        //  c (1000) -> e (500)  => sum: 1500 > 1000
474        assert!(limiter.update(Path::new("c"), ordered_time(4)));
475        // e (500) -> c (1000)  => sum: 1500 > 1000
476        assert_eq!(limiter.pop().as_deref(), Some(Path::new("e")));
477        // c (1000)  => sum: 1000 <= 1000
478
479        // Test removing
480        limiter.add(Path::new("f"), 500, ordered_time(2));
481        assert!(limiter.remove(Path::new("c")));
482        assert!(!limiter.exceeds_limit());
483    }
484}