infinitree_backends/
cache.rs

1use super::block_on;
2use anyhow::Context;
3use infinitree::{
4    backends::{Backend, BackendError, Directory, Result},
5    object::{ObjectId, ReadObject, WriteObject},
6};
7use lru::LruCache;
8use scc::HashSet;
9use std::{
10    convert::TryFrom,
11    fs::{read_dir, DirEntry},
12    num::NonZeroUsize,
13    path::{Path, PathBuf},
14    sync::Arc,
15    time::SystemTime,
16};
17
18pub type Cache = FSCache<{ infinitree::BLOCK_SIZE }>;
19
20#[derive(Clone)]
21pub struct FSCache<const BLOCK_SIZE: usize> {
22    file_list: Arc<tokio::sync::RwLock<LruCache<ObjectId, FileAccess>>>,
23    warm: Arc<HashSet<ObjectId>>,
24
25    size_limit: usize,
26    upstream: Arc<dyn Backend>,
27    directory: Arc<Directory>,
28}
29
30impl<const BLOCK_SIZE: usize> FSCache<BLOCK_SIZE> {
31    pub fn new(
32        local: impl AsRef<Path>,
33        size_limit_b: NonZeroUsize,
34        upstream: Arc<dyn Backend>,
35    ) -> Result<Arc<Self>> {
36        let size_limit = size_limit_b.get();
37        if size_limit < BLOCK_SIZE {
38            return Err(BackendError::from(anyhow::anyhow!(
39                "cache size needs to be at least {} bytes",
40                BLOCK_SIZE
41            )));
42        }
43
44        let local = PathBuf::from(local.as_ref());
45        std::fs::create_dir_all(&local)?;
46
47        let mut file_list = read_dir(&local)?
48            .filter_map(|result| {
49                result.ok().and_then(|entry| {
50                    if let Ok(ftype) = entry.file_type() {
51                        let is_hidden = {
52                            let raw_name = entry.file_name();
53                            let name = raw_name.to_string_lossy();
54                            name.starts_with('.')
55                        };
56
57                        if ftype.is_file() && !is_hidden {
58                            return Some(entry);
59                        }
60                    }
61                    None
62                })
63            })
64            .map(FileAccess::from)
65            .collect::<Vec<_>>();
66
67        // we want to insert files in access time order so that we can
68        // always drop the least recently used from the cache.
69        //
70        // many filesystems will flat out ignore atime and we fall
71        // back to ctime. we're rolling on a best effort basis here.
72        //
73        // this also makes sense since when an object gets used, it's
74        // bumped in the lru, therefore it's not "old" anymore as far
75        // as the running process is concerned.
76
77        file_list.sort_by(|a, b| a.atime.cmp(&b.atime));
78
79        let mut files = LruCache::unbounded();
80        for file in file_list {
81            files.put(file.id, file);
82        }
83
84        Ok(Self {
85            upstream,
86            size_limit: size_limit_b.get(),
87            directory: Directory::new(local)?,
88            warm: Arc::default(),
89            file_list: Arc::new(tokio::sync::RwLock::new(files)),
90        }
91        .into())
92    }
93
94    async fn size(&self) -> usize {
95        BLOCK_SIZE * (self.warm.len() + self.file_list.read().await.len())
96    }
97
98    async fn make_space_for_object(&self) -> Result<Vec<ObjectId>> {
99        let mut evicted = vec![];
100
101        // due to the async-icity of this, we don't want to sit on a
102        // read-lock for the entire scope of this function
103        while self.size().await > self.size_limit - BLOCK_SIZE {
104            let file = self
105                .file_list
106                .write()
107                .await
108                .pop_lru()
109                .context("cache is too small!")?;
110
111            file.1.delete(&self.directory)?;
112            evicted.push(file.0);
113        }
114
115        Ok(evicted)
116    }
117
118    async fn add_new_object(&self, obj: WriteObject) -> Result<Vec<ObjectId>> {
119        let evicted = self.make_space_for_object().await?;
120
121        let id = *obj.id();
122        let cache = self.clone();
123        tokio::task::spawn_blocking(move || cache.directory.write_object(&obj))
124            .await
125            .expect("the task shouldn't be aborted")?;
126
127        if !self.warm.contains(&id) {
128            self.file_list.write().await.put(id, FileAccess::new(id));
129        }
130
131        Ok(evicted)
132    }
133
134    async fn read_upstream(&self, id: &ObjectId) -> Result<Arc<ReadObject>> {
135        let id = *id;
136        let cache = self.clone();
137        let object = tokio::task::spawn_blocking(move || cache.upstream.read_object(&id))
138            .await
139            .expect("the task shouldn't be aborted");
140
141        if let Ok(ref obj) = object {
142            self.add_new_object(obj.clone().into()).await?;
143        }
144
145        object
146    }
147
148    async fn read_cache_or_upstream(&self, id: &ObjectId) -> Result<Arc<ReadObject>> {
149        if self.file_list.write().await.get(id).is_some()
150            || self.warm.read_async(id, |_| true).await.is_some()
151        {
152            match self.directory.read_object(id) {
153                ok @ Ok(_) => ok,
154                Err(_) => self.read_upstream(id).await,
155            }
156        } else {
157            self.read_upstream(id).await
158        }
159    }
160}
161
162impl<const BLOCK_SIZE: usize> Backend for FSCache<BLOCK_SIZE> {
163    fn write_object(&self, object: &WriteObject) -> Result<()> {
164        self.upstream.write_object(object)?;
165        block_on(self.add_new_object(object.clone()))?;
166        Ok(())
167    }
168
169    fn read_object(&self, id: &ObjectId) -> Result<Arc<ReadObject>> {
170        block_on(self.read_cache_or_upstream(id))
171    }
172
173    fn read_fresh(&self, id: &ObjectId) -> Result<Arc<ReadObject>> {
174        block_on(self.read_upstream(id))
175    }
176
177    fn keep_warm(&self, objects: &[ObjectId]) -> Result<()> {
178        if objects.len() * BLOCK_SIZE > self.size_limit {
179            return Err(BackendError::from(anyhow::anyhow!(
180                "keep-warm list is larger than cache size!"
181            )));
182        }
183
184        block_on(async {
185            self.warm.clear_async().await;
186
187            let mut lru = self.file_list.write().await;
188            for id in objects {
189                // we don't care if it's in the cache already
190                let _ = lru.pop(id);
191
192                self.warm
193                    .insert_async(*id)
194                    .await
195                    .expect("warm list is cleared above");
196            }
197        });
198
199        Ok(())
200    }
201
202    fn preload(&self, objects: &[ObjectId]) -> Result<()> {
203        let cache = self.clone();
204        let objects = objects.to_vec();
205
206        tokio::task::spawn_blocking(move || {
207            for id in objects {
208                let _ = cache.read_object(&id).unwrap();
209            }
210        });
211
212        Ok(())
213    }
214
215    fn sync(&self) -> Result<()> {
216        self.upstream.sync()
217    }
218}
219
220struct FileAccess {
221    atime: SystemTime,
222    id: ObjectId,
223}
224
225impl FileAccess {
226    fn new(id: ObjectId) -> Self {
227        Self {
228            id,
229            atime: SystemTime::now(),
230        }
231    }
232
233    fn delete(self, directory: &Directory) -> Result<()> {
234        directory.delete(&[self.id])
235    }
236}
237
238impl From<DirEntry> for FileAccess {
239    fn from(direntry: DirEntry) -> Self {
240        let atime = direntry.metadata().unwrap().accessed().unwrap();
241        let path = direntry.path();
242        let id = ObjectId::try_from(path.file_name().unwrap().to_str().unwrap()).unwrap();
243
244        Self { atime, id }
245    }
246}
247
248#[cfg(test)]
249mod test {
250    use super::Cache;
251    use crate::test::{write_and_wait_for_commit, TEST_DATA_DIR};
252    use infinitree::{
253        backends::test::InMemoryBackend, backends::Backend, object::WriteObject, ObjectId,
254    };
255    use std::{env, num::NonZeroUsize, path::Path};
256
257    #[test]
258    #[should_panic(expected = "cache size needs to be at least 4194304 bytes")]
259    fn cache_at_least_block_size() {
260        Cache::new(
261            "/whatever",
262            NonZeroUsize::new(123).unwrap(),
263            InMemoryBackend::shared(),
264        )
265        .unwrap();
266    }
267
268    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
269    async fn write_twice_and_evict() {
270        let mut object = WriteObject::default();
271
272        let data_root = Path::new(&env::var("CARGO_MANIFEST_DIR").unwrap())
273            .join(TEST_DATA_DIR)
274            .join("cache");
275        std::fs::create_dir_all(&data_root).unwrap();
276
277        let backend = Cache::new(
278            &data_root,
279            NonZeroUsize::new(infinitree::BLOCK_SIZE).unwrap(),
280            InMemoryBackend::shared(),
281        )
282        .unwrap();
283
284        let id_1 = *object.id();
285        let id_2 = ObjectId::from_bytes(b"1234567890abcdef1234567890abcdef");
286
287        write_and_wait_for_commit(backend.as_ref(), &object);
288        let _obj_1_read_ref = backend.read_object(object.id()).unwrap();
289
290        object.set_id(id_2);
291        write_and_wait_for_commit(backend.as_ref(), &object);
292
293        let test_filename = data_root.join(id_1.to_string());
294        // 1st one is evicted automatically, hence `unwrap_err()`
295        // on windows/mmap feature set, the handle is locked, therefore it's error to delete
296        std::fs::remove_file(test_filename).unwrap_err();
297
298        let test_filename = data_root.join(id_2.to_string());
299        // 2nd one still lingering, we clean that up manually
300        std::fs::remove_file(test_filename).unwrap();
301    }
302}