infinitree_backends/
cache.rs1use super::block_on;
2use anyhow::Context;
3use infinitree::{
4 backends::{Backend, BackendError, Directory, Result},
5 object::{ObjectId, ReadObject, WriteObject},
6};
7use lru::LruCache;
8use scc::HashSet;
9use std::{
10 convert::TryFrom,
11 fs::{read_dir, DirEntry},
12 num::NonZeroUsize,
13 path::{Path, PathBuf},
14 sync::Arc,
15 time::SystemTime,
16};
17
18pub type Cache = FSCache<{ infinitree::BLOCK_SIZE }>;
19
20#[derive(Clone)]
21pub struct FSCache<const BLOCK_SIZE: usize> {
22 file_list: Arc<tokio::sync::RwLock<LruCache<ObjectId, FileAccess>>>,
23 warm: Arc<HashSet<ObjectId>>,
24
25 size_limit: usize,
26 upstream: Arc<dyn Backend>,
27 directory: Arc<Directory>,
28}
29
30impl<const BLOCK_SIZE: usize> FSCache<BLOCK_SIZE> {
31 pub fn new(
32 local: impl AsRef<Path>,
33 size_limit_b: NonZeroUsize,
34 upstream: Arc<dyn Backend>,
35 ) -> Result<Arc<Self>> {
36 let size_limit = size_limit_b.get();
37 if size_limit < BLOCK_SIZE {
38 return Err(BackendError::from(anyhow::anyhow!(
39 "cache size needs to be at least {} bytes",
40 BLOCK_SIZE
41 )));
42 }
43
44 let local = PathBuf::from(local.as_ref());
45 std::fs::create_dir_all(&local)?;
46
47 let mut file_list = read_dir(&local)?
48 .filter_map(|result| {
49 result.ok().and_then(|entry| {
50 if let Ok(ftype) = entry.file_type() {
51 let is_hidden = {
52 let raw_name = entry.file_name();
53 let name = raw_name.to_string_lossy();
54 name.starts_with('.')
55 };
56
57 if ftype.is_file() && !is_hidden {
58 return Some(entry);
59 }
60 }
61 None
62 })
63 })
64 .map(FileAccess::from)
65 .collect::<Vec<_>>();
66
67 file_list.sort_by(|a, b| a.atime.cmp(&b.atime));
78
79 let mut files = LruCache::unbounded();
80 for file in file_list {
81 files.put(file.id, file);
82 }
83
84 Ok(Self {
85 upstream,
86 size_limit: size_limit_b.get(),
87 directory: Directory::new(local)?,
88 warm: Arc::default(),
89 file_list: Arc::new(tokio::sync::RwLock::new(files)),
90 }
91 .into())
92 }
93
94 async fn size(&self) -> usize {
95 BLOCK_SIZE * (self.warm.len() + self.file_list.read().await.len())
96 }
97
98 async fn make_space_for_object(&self) -> Result<Vec<ObjectId>> {
99 let mut evicted = vec![];
100
101 while self.size().await > self.size_limit - BLOCK_SIZE {
104 let file = self
105 .file_list
106 .write()
107 .await
108 .pop_lru()
109 .context("cache is too small!")?;
110
111 file.1.delete(&self.directory)?;
112 evicted.push(file.0);
113 }
114
115 Ok(evicted)
116 }
117
118 async fn add_new_object(&self, obj: WriteObject) -> Result<Vec<ObjectId>> {
119 let evicted = self.make_space_for_object().await?;
120
121 let id = *obj.id();
122 let cache = self.clone();
123 tokio::task::spawn_blocking(move || cache.directory.write_object(&obj))
124 .await
125 .expect("the task shouldn't be aborted")?;
126
127 if !self.warm.contains(&id) {
128 self.file_list.write().await.put(id, FileAccess::new(id));
129 }
130
131 Ok(evicted)
132 }
133
134 async fn read_upstream(&self, id: &ObjectId) -> Result<Arc<ReadObject>> {
135 let id = *id;
136 let cache = self.clone();
137 let object = tokio::task::spawn_blocking(move || cache.upstream.read_object(&id))
138 .await
139 .expect("the task shouldn't be aborted");
140
141 if let Ok(ref obj) = object {
142 self.add_new_object(obj.clone().into()).await?;
143 }
144
145 object
146 }
147
148 async fn read_cache_or_upstream(&self, id: &ObjectId) -> Result<Arc<ReadObject>> {
149 if self.file_list.write().await.get(id).is_some()
150 || self.warm.read_async(id, |_| true).await.is_some()
151 {
152 match self.directory.read_object(id) {
153 ok @ Ok(_) => ok,
154 Err(_) => self.read_upstream(id).await,
155 }
156 } else {
157 self.read_upstream(id).await
158 }
159 }
160}
161
162impl<const BLOCK_SIZE: usize> Backend for FSCache<BLOCK_SIZE> {
163 fn write_object(&self, object: &WriteObject) -> Result<()> {
164 self.upstream.write_object(object)?;
165 block_on(self.add_new_object(object.clone()))?;
166 Ok(())
167 }
168
169 fn read_object(&self, id: &ObjectId) -> Result<Arc<ReadObject>> {
170 block_on(self.read_cache_or_upstream(id))
171 }
172
173 fn read_fresh(&self, id: &ObjectId) -> Result<Arc<ReadObject>> {
174 block_on(self.read_upstream(id))
175 }
176
177 fn keep_warm(&self, objects: &[ObjectId]) -> Result<()> {
178 if objects.len() * BLOCK_SIZE > self.size_limit {
179 return Err(BackendError::from(anyhow::anyhow!(
180 "keep-warm list is larger than cache size!"
181 )));
182 }
183
184 block_on(async {
185 self.warm.clear_async().await;
186
187 let mut lru = self.file_list.write().await;
188 for id in objects {
189 let _ = lru.pop(id);
191
192 self.warm
193 .insert_async(*id)
194 .await
195 .expect("warm list is cleared above");
196 }
197 });
198
199 Ok(())
200 }
201
202 fn preload(&self, objects: &[ObjectId]) -> Result<()> {
203 let cache = self.clone();
204 let objects = objects.to_vec();
205
206 tokio::task::spawn_blocking(move || {
207 for id in objects {
208 let _ = cache.read_object(&id).unwrap();
209 }
210 });
211
212 Ok(())
213 }
214
215 fn sync(&self) -> Result<()> {
216 self.upstream.sync()
217 }
218}
219
220struct FileAccess {
221 atime: SystemTime,
222 id: ObjectId,
223}
224
225impl FileAccess {
226 fn new(id: ObjectId) -> Self {
227 Self {
228 id,
229 atime: SystemTime::now(),
230 }
231 }
232
233 fn delete(self, directory: &Directory) -> Result<()> {
234 directory.delete(&[self.id])
235 }
236}
237
238impl From<DirEntry> for FileAccess {
239 fn from(direntry: DirEntry) -> Self {
240 let atime = direntry.metadata().unwrap().accessed().unwrap();
241 let path = direntry.path();
242 let id = ObjectId::try_from(path.file_name().unwrap().to_str().unwrap()).unwrap();
243
244 Self { atime, id }
245 }
246}
247
248#[cfg(test)]
249mod test {
250 use super::Cache;
251 use crate::test::{write_and_wait_for_commit, TEST_DATA_DIR};
252 use infinitree::{
253 backends::test::InMemoryBackend, backends::Backend, object::WriteObject, ObjectId,
254 };
255 use std::{env, num::NonZeroUsize, path::Path};
256
257 #[test]
258 #[should_panic(expected = "cache size needs to be at least 4194304 bytes")]
259 fn cache_at_least_block_size() {
260 Cache::new(
261 "/whatever",
262 NonZeroUsize::new(123).unwrap(),
263 InMemoryBackend::shared(),
264 )
265 .unwrap();
266 }
267
268 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
269 async fn write_twice_and_evict() {
270 let mut object = WriteObject::default();
271
272 let data_root = Path::new(&env::var("CARGO_MANIFEST_DIR").unwrap())
273 .join(TEST_DATA_DIR)
274 .join("cache");
275 std::fs::create_dir_all(&data_root).unwrap();
276
277 let backend = Cache::new(
278 &data_root,
279 NonZeroUsize::new(infinitree::BLOCK_SIZE).unwrap(),
280 InMemoryBackend::shared(),
281 )
282 .unwrap();
283
284 let id_1 = *object.id();
285 let id_2 = ObjectId::from_bytes(b"1234567890abcdef1234567890abcdef");
286
287 write_and_wait_for_commit(backend.as_ref(), &object);
288 let _obj_1_read_ref = backend.read_object(object.id()).unwrap();
289
290 object.set_id(id_2);
291 write_and_wait_for_commit(backend.as_ref(), &object);
292
293 let test_filename = data_root.join(id_1.to_string());
294 std::fs::remove_file(test_filename).unwrap_err();
297
298 let test_filename = data_root.join(id_2.to_string());
299 std::fs::remove_file(test_filename).unwrap();
301 }
302}