arc_disk_cache/
lib.rs

1#[macro_use]
2extern crate tracing;
3
4use concread::arcache::{ARCache, ARCacheBuilder, CacheStats};
5use serde::de::DeserializeOwned;
6use serde::{Deserialize, Serialize};
7
8use tempfile::NamedTempFile;
9
10use std::collections::BTreeSet;
11
12use std::borrow::Borrow;
13use std::fmt::Debug;
14use std::fs::File;
15use std::hash::Hash;
16use std::io::{BufRead, BufReader, BufWriter, Seek, Write};
17use std::num::NonZeroUsize;
18use std::path::{Path, PathBuf};
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::Arc;
21
22use rand::prelude::*;
23
24static CHECK_INLINE: usize = 536870912;
25
26pub mod prelude {
27    pub use concread::arcache::CacheStats;
28    pub use tempfile::NamedTempFile;
29}
30
31#[derive(Debug, Serialize, Deserialize)]
32pub struct CacheObjMeta<K, D> {
33    pub key: K,
34    pub key_str: String,
35    pub crc: u32,
36    pub userdata: D,
37}
38
39#[derive(Clone, Debug)]
40pub struct CacheObj<K, D>
41where
42    K: Serialize
43        + DeserializeOwned
44        + AsRef<[u8]>
45        + Hash
46        + Eq
47        + Ord
48        + Clone
49        + Debug
50        + Sync
51        + Send
52        + 'static,
53    D: Serialize + DeserializeOwned + Clone + Debug + Sync + Send + 'static,
54{
55    pub key: K,
56    pub fhandle: Arc<FileHandle>,
57    pub userdata: D,
58}
59
60#[derive(Clone, Debug)]
61pub struct FileHandle {
62    pub key_str: String,
63    pub meta_path: PathBuf,
64    pub path: PathBuf,
65    pub amt: usize,
66    pub crc: u32,
67    running: Arc<AtomicBool>,
68}
69
70impl Drop for FileHandle {
71    fn drop(&mut self) {
72        if self.running.load(Ordering::Acquire) {
73            info!("🗑  remove fhandle -> {:?}", self.path);
74            let _ = std::fs::remove_file(&self.meta_path);
75            let _ = std::fs::remove_file(&self.path);
76        }
77    }
78}
79
80impl FileHandle {
81    pub fn reopen(&self) -> Result<File, std::io::Error> {
82        File::open(&self.path)
83    }
84}
85
86#[instrument(level = "trace")]
87fn crc32c_len(file: &mut File) -> Result<u32, ()> {
88    file.seek(std::io::SeekFrom::Start(0)).map_err(|e| {
89        error!("Unable to seek tempfile -> {:?}", e);
90    })?;
91
92    /*
93    let amt = file.metadata().map(|m| m.len() as usize).map_err(|e| {
94        error!("Unable to access metadata -> {:?}", e);
95    })?;
96    */
97
98    let mut buf_file = BufReader::with_capacity(8192, file);
99    let mut crc = 0;
100    loop {
101        match buf_file.fill_buf() {
102            Ok(buffer) => {
103                let length = buffer.len();
104                if length == 0 {
105                    // We are done!
106                    break;
107                } else {
108                    // we have content, proceed.
109                    crc = crc32c::crc32c_append(crc, &buffer);
110                    buf_file.consume(length);
111                }
112            }
113            Err(e) => {
114                error!("Bufreader error -> {:?}", e);
115                return Err(());
116            }
117        }
118    }
119    debug!("crc32c is: {:x}", crc);
120
121    Ok(crc)
122}
123
124#[derive(Clone)]
125pub struct ArcDiskCache<K, D>
126where
127    K: Serialize
128        + DeserializeOwned
129        + AsRef<[u8]>
130        + Hash
131        + Eq
132        + Ord
133        + Clone
134        + Debug
135        + Sync
136        + Send
137        + 'static,
138    D: Serialize + DeserializeOwned + Clone + Debug + Sync + Send + 'static,
139{
140    cache: Arc<ARCache<K, CacheObj<K, D>>>,
141    pub content_dir: PathBuf,
142    running: Arc<AtomicBool>,
143    durable_fs: bool,
144}
145
146impl<K, D> Drop for ArcDiskCache<K, D>
147where
148    K: Serialize
149        + DeserializeOwned
150        + AsRef<[u8]>
151        + Hash
152        + Eq
153        + Ord
154        + Clone
155        + Debug
156        + Sync
157        + Send
158        + 'static,
159    D: Serialize + DeserializeOwned + Clone + Debug + Sync + Send + 'static,
160{
161    fn drop(&mut self) {
162        trace!("ArcDiskCache - setting running to false");
163        self.running.store(false, Ordering::Release);
164    }
165}
166
167impl<K, D> ArcDiskCache<K, D>
168where
169    K: Serialize
170        + DeserializeOwned
171        + AsRef<[u8]>
172        + Hash
173        + Eq
174        + Ord
175        + Clone
176        + Debug
177        + Sync
178        + Send
179        + 'static,
180    D: Serialize + DeserializeOwned + Clone + Debug + Sync + Send + 'static,
181{
182    pub fn new(capacity: usize, content_dir: &Path, durable_fs: bool) -> Self {
183        info!("capacity: {}  content_dir: {:?}", capacity, content_dir);
184
185        let cache = Arc::new(
186            ARCacheBuilder::new()
187                .set_size(capacity, 0)
188                .set_watermark(0)
189                .build()
190                .expect("Invalid ARCache Parameters"),
191        );
192
193        let running = Arc::new(AtomicBool::new(true));
194
195        // Now for everything in content dir, look if we have valid metadata
196        // and everything that isn't metadata.
197        let mut entries = std::fs::read_dir(content_dir)
198            .expect("unable to read content dir")
199            .map(|res| res.map(|e| e.path()))
200            .collect::<Result<Vec<_>, std::io::Error>>()
201            .expect("Failed to access some dirents");
202
203        entries.sort();
204
205        let (meta, files): (Vec<_>, Vec<_>) = entries
206            .into_iter()
207            .partition(|p| p.extension() == Some(std::ffi::OsStr::new("meta")));
208
209        let meta_len = meta.len();
210        info!("Will process {} metadata", meta_len);
211
212        // Now we read each metadata in.
213        let meta: Vec<(PathBuf, CacheObjMeta<K, D>)> = meta
214            .into_iter()
215            .enumerate()
216            .filter_map(|(i, p)| {
217                if i % 1000 == 0 {
218                    info!("{} of {}", i, meta_len);
219                }
220                trace!(?p, "meta read");
221                File::open(&p)
222                    .ok()
223                    .map(|f| BufReader::new(f))
224                    .and_then(|rdr| serde_json::from_reader(rdr).ok())
225                    .map(|m| (p.to_path_buf(), m))
226            })
227            .collect();
228
229        let meta: Vec<CacheObj<K, D>> = meta
230            .into_iter()
231            .enumerate()
232            .filter_map(|(i, (meta_path, m))| {
233                if i % 1000 == 0 {
234                    info!("{} of {}", i, meta_len);
235                }
236                let CacheObjMeta {
237                    key,
238                    key_str,
239                    crc,
240                    userdata,
241                } = m;
242
243                let path = content_dir.join(&key_str);
244
245                if !path.exists() {
246                    return None;
247                }
248
249                let mut file = File::open(&path).ok()?;
250
251                let amt = match file.metadata().map(|m| m.len() as usize) {
252                    Ok(a) => a,
253                    Err(e) => {
254                        error!("Unable to access metadata -> {:?}", e);
255                        return None;
256                    }
257                };
258
259                if amt >= CHECK_INLINE {
260                    // Check large files on startup ONLY
261                    let crc_ck = crc32c_len(&mut file).ok()?;
262                    if crc_ck != crc {
263                        warn!("file potentially corrupted - {:?}", meta_path);
264                        return None;
265                    }
266                }
267
268                Some(CacheObj {
269                    key,
270                    userdata,
271                    fhandle: Arc::new(FileHandle {
272                        key_str,
273                        meta_path,
274                        path,
275                        amt,
276                        crc,
277                        running: running.clone(),
278                    }),
279                })
280            })
281            .collect();
282
283        info!("Found {:?} existing metadata", meta.len());
284
285        // Now we prune any files that ARENT in our valid cache meta set.
286        let mut files: BTreeSet<_> = files.into_iter().collect();
287        meta.iter().for_each(|co| {
288            files.remove(&co.fhandle.path);
289        });
290
291        files.iter().for_each(|p| {
292            trace!("🗑  -> {:?}", p);
293            let _ = std::fs::remove_file(p);
294        });
295
296        // Finally setup the cache.
297        let mut wrtxn = cache.write();
298        meta.into_iter().for_each(|co| {
299            let key = co.key.clone();
300            let amt = NonZeroUsize::new(co.fhandle.amt)
301                .unwrap_or(unsafe { NonZeroUsize::new_unchecked(1) });
302            wrtxn.insert_sized(key, co, amt);
303        });
304        wrtxn.commit();
305
306        // Reset the stats so that the import isn't present.
307        cache.reset_stats();
308
309        debug!("ArcDiskCache Ready!");
310
311        ArcDiskCache {
312            content_dir: content_dir.to_path_buf(),
313            cache,
314            running,
315            durable_fs,
316        }
317    }
318
319    pub fn get<Q: ?Sized>(&self, q: &Q) -> Option<CacheObj<K, D>>
320    where
321        K: Borrow<Q>,
322        Q: Hash + Eq + Ord,
323    {
324        let mut rtxn = self.cache.read();
325        rtxn.get(q)
326            .and_then(|obj| {
327                let mut file = File::open(&obj.fhandle.path).ok()?;
328
329                let amt = file
330                    .metadata()
331                    .map(|m| m.len() as usize)
332                    .map_err(|e| {
333                        error!("Unable to access metadata -> {:?}", e);
334                    })
335                    .ok()?;
336
337                if !self.durable_fs {
338                    if amt < CHECK_INLINE {
339                        let crc_ck = crc32c_len(&mut file).ok()?;
340                        if crc_ck != obj.fhandle.crc {
341                            warn!("file potentially corrupted - {:?}", obj.fhandle.meta_path);
342                            return None;
343                        }
344                    } else {
345                        info!("Skipping crc check, file too large");
346                    }
347                }
348
349                Some(obj)
350            })
351            .cloned()
352    }
353
354    pub fn path(&self) -> &Path {
355        &self.content_dir
356    }
357
358    pub fn view_stats(&self) -> CacheStats {
359        (*self.cache.view_stats()).clone()
360    }
361
362    pub fn insert_bytes(&self, k: K, d: D, bytes: &[u8]) -> () {
363        let mut fh = match self.new_tempfile() {
364            Some(fh) => fh,
365            None => return,
366        };
367
368        if let Err(e) = fh.write(bytes) {
369            error!(?e, "failed to write bytes to file");
370            return;
371        };
372
373        if let Err(e) = fh.flush() {
374            error!(?e, "failed to flush bytes to file");
375            return;
376        }
377
378        self.insert(k, d, fh)
379    }
380
381    // Add an item?
382    pub fn insert(&self, k: K, d: D, mut fh: NamedTempFile) -> () {
383        let file = fh.as_file_mut();
384
385        let amt = match file.metadata().map(|m| m.len() as usize) {
386            Ok(a) => a,
387            Err(e) => {
388                error!("Unable to access metadata -> {:?}", e);
389                return;
390            }
391        };
392
393        let crc = match crc32c_len(file) {
394            Ok(v) => v,
395            Err(_) => return,
396        };
397
398        // Need to salt the file path so that we don't accidently collide.
399        let mut rng = rand::thread_rng();
400        let mut salt: [u8; 16] = [0; 16];
401        rng.fill(&mut salt);
402
403        let k_slice: &[u8] = k.as_ref();
404
405        let mut adapted_k = Vec::with_capacity(16 + k_slice.len());
406        adapted_k.extend_from_slice(k_slice);
407        adapted_k.extend_from_slice(&salt);
408
409        let key_str = base64::encode_config(&adapted_k, base64::URL_SAFE);
410        let key_str = if key_str.len() > 160 {
411            debug!("Needing to truncate filename due to excessive key length");
412            let at = key_str.len() - 160;
413            key_str.split_at(at).1.to_string()
414        } else {
415            key_str
416        };
417
418        let path = self.content_dir.join(&key_str);
419        let mut meta_str = key_str.clone();
420        meta_str.push_str(".meta");
421        let meta_path = self.content_dir.join(&meta_str);
422
423        info!("{:?}", path);
424        info!("{:?}", meta_path);
425
426        let objmeta = CacheObjMeta {
427            key: k.clone(),
428            key_str: key_str.clone(),
429            crc,
430            userdata: d.clone(),
431        };
432
433        if meta_path.exists() {
434            warn!(
435                immediate = true,
436                "file collision detected, skipping write of {}", meta_str
437            );
438            return;
439        }
440
441        let m_file = match File::create(&meta_path).map(BufWriter::new) {
442            Ok(f) => f,
443            Err(e) => {
444                error!(
445                    immediate = true,
446                    "CRITICAL! Failed to open metadata {:?}", e
447                );
448                return;
449            }
450        };
451
452        if let Err(e) = serde_json::to_writer(m_file, &objmeta) {
453            error!(
454                immediate = true,
455                "CRITICAL! Failed to write metadata {:?}", e
456            );
457            return;
458        } else {
459            info!("Persisted metadata for {:?}", &meta_path);
460
461            if let Err(e) = fh.persist(&path) {
462                error!(immediate = true, "CRITICAL! Failed to persist file {:?}", e);
463                return;
464            }
465        }
466
467        info!("Persisted data for {:?}", &path);
468
469        // Can not fail from this point!
470        let co = CacheObj {
471            key: k.clone(),
472            userdata: d,
473            fhandle: Arc::new(FileHandle {
474                key_str,
475                meta_path,
476                path,
477                amt,
478                crc,
479                running: self.running.clone(),
480            }),
481        };
482
483        let amt = NonZeroUsize::new(amt).unwrap_or(unsafe { NonZeroUsize::new_unchecked(1) });
484
485        let mut wrtxn = self.cache.write();
486        wrtxn.insert_sized(k, co, amt);
487        debug!("commit");
488        wrtxn.commit();
489    }
490
491    // Given key, update the ud.
492    pub fn update_userdata<Q: ?Sized, F>(&self, q: &Q, mut func: F)
493    where
494        K: Borrow<Q>,
495        Q: Hash + Eq + Ord,
496        F: FnMut(&mut D),
497    {
498        let mut wrtxn = self.cache.write();
499
500        if let Some(mref) = wrtxn.get_mut(q, false) {
501            func(&mut mref.userdata);
502
503            let objmeta = CacheObjMeta {
504                key: mref.key.clone(),
505                key_str: mref.fhandle.key_str.clone(),
506                crc: mref.fhandle.crc,
507                userdata: mref.userdata.clone(),
508            };
509
510            // This will truncate the metadata if it does exist.
511            let m_file = File::create(&mref.fhandle.meta_path)
512                .map(BufWriter::new)
513                .map_err(|e| {
514                    error!("Failed to open metadata {:?}", e);
515                })
516                .unwrap();
517
518            serde_json::to_writer(m_file, &objmeta)
519                .map_err(|e| {
520                    error!("Failed to write metadata {:?}", e);
521                })
522                .unwrap();
523
524            info!("Persisted metadata for {:?}", &mref.fhandle.meta_path);
525
526            debug!("commit");
527            wrtxn.commit();
528        }
529    }
530
531    pub fn update_all_userdata<F, C>(&self, check: C, mut func: F)
532    where
533        C: Fn(&D) -> bool,
534        F: FnMut(&mut D),
535    {
536        let mut wrtxn = self.cache.write();
537
538        let keys: Vec<_> = wrtxn
539            .iter()
540            .filter_map(|(k, mref)| {
541                if check(&mref.userdata) {
542                    Some(k.clone())
543                } else {
544                    None
545                }
546            })
547            .collect();
548
549        for k in keys {
550            if let Some(mref) = wrtxn.get_mut(&k, false) {
551                func(&mut mref.userdata);
552
553                let objmeta = CacheObjMeta {
554                    key: mref.key.clone(),
555                    key_str: mref.fhandle.key_str.clone(),
556                    crc: mref.fhandle.crc,
557                    userdata: mref.userdata.clone(),
558                };
559
560                // This will truncate the metadata if it does exist.
561                let m_file = File::create(&mref.fhandle.meta_path)
562                    .map(BufWriter::new)
563                    .map_err(|e| {
564                        error!("Failed to open metadata {:?}", e);
565                    })
566                    .unwrap();
567
568                serde_json::to_writer(m_file, &objmeta)
569                    .map_err(|e| {
570                        error!("Failed to write metadata {:?}", e);
571                    })
572                    .unwrap();
573
574                info!("Persisted metadata for {:?}", &mref.fhandle.meta_path);
575            }
576        }
577
578        debug!("commit");
579        wrtxn.commit();
580    }
581
582    // Remove a key
583    pub fn remove(&self, k: K) {
584        let mut wrtxn = self.cache.write();
585        let _ = wrtxn.remove(k);
586        // This causes the handles to be dropped and binned.
587        debug!("commit");
588        wrtxn.commit();
589    }
590
591    //
592    pub fn new_tempfile(&self) -> Option<NamedTempFile> {
593        NamedTempFile::new_in(&self.content_dir)
594            .map_err(|e| error!(?e))
595            .ok()
596    }
597}
598
599#[cfg(test)]
600mod tests {
601    use super::ArcDiskCache;
602    use std::io::Write;
603    use tempfile::tempdir;
604
605    #[test]
606    fn disk_cache_test_basic() {
607        let _ = tracing_subscriber::fmt::try_init();
608
609        let dir = tempdir().expect("Failed to build tempdir");
610        // Need a new temp dir
611        let dc: ArcDiskCache<Vec<u8>, ()> = ArcDiskCache::new(1024, dir.path(), false);
612
613        let mut fh = dc.new_tempfile().unwrap();
614        let k = vec![0, 1, 2, 3, 4, 5];
615
616        let file = fh.as_file_mut();
617        file.write_all(b"Hello From Cache").unwrap();
618
619        dc.insert(k, (), fh);
620    }
621}