remember_this/
manager.rs

1use std::collections::HashMap;
2use std::hash::Hash;
3use std::sync::{Arc, RwLock};
4
5use chrono::Duration;
6
7use crate::cache::{self, CacheEntry};
8pub use crate::Cache;
9
10const TREE_META: &[u8] = b":meta:";
11const KEY_FORMAT: &[u8] = b"format";
12const KEY_FORMAT_VERSION: &[u8] = b"version";
13const VALUE_FORMAT: &[u8] = b"disk-cache";
14const VALUE_FORMAT_VERSION: &[u8] = &[0, 1, 0];
15
16/// An object managing several caches.
17pub struct CacheManager {
18    /// The database to which cache data is written
19    db: sled::Db,
20}
21impl CacheManager {
22    /// Create a new cache manager.
23    pub fn new(options: &CacheManagerOptions) -> Result<Self, sled::Error> {
24        // Attempt to open the cache database.
25        let config = sled::Config::default()
26            .path(&options.path)
27            .mode(sled::Mode::HighThroughput)
28            .flush_every_ms(Some(10_000))
29            .use_compression(options.use_compression)
30            .temporary(options.use_temporary);
31        let db = match config.open() {
32            Ok(db) => db,
33            Err(sled::Error::Corruption { .. }) => {
34                warn!(target: "disk-cache", "Cache file corrupted, recreating");
35                // Erase database and reopen.
36                let _ = std::fs::remove_dir_all(&options.path);
37                config.create_new(true).open()?
38            }
39            other => other?,
40        };
41
42        // If metadata is absent or incorrect, drop cache and recreate.
43        let meta_tree = db.open_tree(TREE_META)?;
44        let is_correct_format = meta_tree
45            .get(KEY_FORMAT)?
46            .map(|format| format == VALUE_FORMAT)
47            .unwrap_or(false);
48        let is_correct_version = meta_tree
49            .get(KEY_FORMAT_VERSION)?
50            .map(|version| version == VALUE_FORMAT_VERSION)
51            .unwrap_or(false);
52
53        debug!(target: "disk-cache", "is_correct_format: {}", is_correct_format);
54        debug!(target: "disk-cache", "is_correct_version: {}", is_correct_version);
55
56        if !is_correct_format || !is_correct_version {
57            for tree in db.tree_names() {
58                debug!(target: "disk-cache", "dropping tree: {:?}", tree);
59                db.drop_tree(tree).or_else(|e| match e {
60                    sled::Error::Unsupported(_) =>
61                    /* Attempting to remove a core structure, skip */
62                    {
63                        Ok(false)
64                    }
65                    other => Err(other),
66                })?;
67            }
68        }
69        let meta_tree = db.open_tree(":meta:")?;
70        meta_tree.insert(KEY_FORMAT, VALUE_FORMAT)?;
71        meta_tree.insert(KEY_FORMAT_VERSION, VALUE_FORMAT_VERSION)?;
72
73        Ok(CacheManager { db })
74    }
75
76    /// Return the internal name of the tree representing this cache.
77    fn get_cache_name(name: &str) -> String {
78        format!("cache:{}", name)
79    }
80
81    /// Return the internal name of the tree representing metadata for this cache.
82    fn get_meta_name(name: &str) -> String {
83        format!("meta:{}", name)
84    }
85
86    fn get_expiry_name(name: &str) -> String {
87        format!("expiry:{}", name)
88    }
89
90    /// Remove a cache.
91    pub fn purge(&self, name: &str) -> sled::Result<bool> {
92        let cache = self.db.drop_tree(Self::get_cache_name(name))?;
93        let meta = self.db.drop_tree(Self::get_meta_name(name))?;
94        Ok(cache || meta)
95    }
96
97    /// Instantiate a new cache for a specific type.
98    pub fn cache<K, V>(&self, name: &str, options: &CacheOptions) -> sled::Result<Cache<K, V>>
99    where
100        K: Send
101            + Clone
102            + Hash
103            + Eq
104            + for<'de> serde::Deserialize<'de>
105            + serde::Serialize
106            + Sync
107            + 'static,
108        V: Send + Clone + for<'de> serde::Deserialize<'de> + serde::Serialize + Sync + 'static,
109    {
110        let content_key = Self::get_cache_name(name);
111        let meta_key = Self::get_meta_name(name);
112        let expiry_key = Self::get_expiry_name(name);
113
114        // Check whether we need to purge the cache.
115        let version = [
116            (options.version & 0xFF) as u8,
117            ((options.version >> 8) & 0xFF) as u8,
118            ((options.version >> 16) & 0xFF) as u8,
119            ((options.version >> 24) & 0xFF) as u8,
120        ];
121        let format_changed = self.db.open_tree(&meta_key)?
122            .get(KEY_FORMAT_VERSION)?
123            .map(|k| {
124                debug!(target: "disk-cache", "Cache version: {:?}, expected {:?}", k.as_ref(), version);
125                k.as_ref() != version
126            })
127            .unwrap_or(true);
128
129        if format_changed || options.purge {
130            debug!(target: "disk-cache", "We need to cleanup this cache - format_changed:{} options.purge:{}", format_changed, options.purge);
131            self.db.drop_tree(&content_key)?;
132            self.db.drop_tree(&expiry_key)?;
133        }
134        self.db
135            .open_tree(meta_key)?
136            .insert(KEY_FORMAT_VERSION, &version)?;
137
138        // Now actually open data.
139        let in_memory: Arc<RwLock<HashMap<K, CacheEntry<V>>>> =
140            Arc::new(RwLock::new(HashMap::new()));
141        let content = self.db.open_tree(content_key)?;
142        let expiry = self.db.open_tree(expiry_key)?;
143
144        // Setup interval cleanup.
145        {
146            let start = tokio::time::Instant::now()
147                + tokio::time::Duration::from_secs(
148                    options.initial_disk_cleanup_after.num_seconds() as u64,
149                );
150            let duration =
151                tokio::time::Duration::from_secs(options.memory_duration.num_seconds() as u64);
152            let in_memory = in_memory.clone();
153            let expiry = expiry.clone();
154            let content = content.clone();
155
156            tokio::spawn(async move {
157                let mut interval = tokio::time::interval_at(start, duration);
158                loop {
159                    let _ = interval.tick().await;
160                    cache::cleanup_disk_cache::<K, V>(&expiry, &content);
161
162                    if Arc::strong_count(&in_memory) == 1 {
163                        // We're the last owner, time to stop.
164                        return;
165                    }
166
167                    // Cleanup in-memory
168                    cache::cleanup_memory_cache(&in_memory);
169                }
170            });
171        }
172
173        Ok(cache::Cache {
174            in_memory,
175            content,
176            expiry,
177            memory_duration: options.memory_duration,
178            disk_duration: options.disk_duration,
179        })
180    }
181}
182
183/// Options for the CacheManager.
184#[derive(TypedBuilder)]
185pub struct CacheManagerOptions {
186    /// The path where the cache should be stored.
187    #[builder(setter(into))]
188    path: std::path::PathBuf,
189
190    /// If `true`, use compression.
191    ///
192    /// By default, false.
193    #[builder(default = false)]
194    use_compression: bool,
195
196    /// If `true`, drop database once the `CacheManager` is dropped.
197    ///
198    /// Useful mostly for testing.
199    ///
200    /// By default, false.
201    #[builder(default = false)]
202    use_temporary: bool,
203}
204
205#[derive(TypedBuilder)]
206pub struct CacheOptions {
207    /// How long data should stay in memory.
208    ///
209    /// Note that this duration is approximative. The caches will run cleanup
210    /// tasks once in a while to remove data from the cache.
211    ///
212    /// If unspecified, 1h.
213    #[builder(default=Duration::hours(1))]
214    memory_duration: Duration,
215
216    /// How long data should stay on disk.
217    ///
218    /// Note that this duration is approximative. The caches will run cleanup
219    /// tasks once in a while to remove data from the cache.
220    ///
221    /// If unspecified, 1day.
222    #[builder(default=Duration::days(1))]
223    disk_duration: Duration,
224
225    /// How long to wait before cleaning up data that is already on disk.
226    ///
227    /// If unspecified, 10 seconds.
228    #[builder(default=Duration::seconds(10))]
229    initial_disk_cleanup_after: Duration,
230
231    /// If `true`, erase the cache without attempting to reload it.
232    ///
233    /// Used mostly for testing.
234    #[builder(default = false)]
235    purge: bool,
236
237    /// Increment this if you have changed the format of the cache and wish
238    /// to erase its contents.
239    #[builder(default = 0)]
240    version: u32,
241}
242impl Default for CacheOptions {
243    fn default() -> Self {
244        CacheOptions::builder().build()
245    }
246}