1#[macro_use]
2extern crate tracing;
3
4use concread::arcache::{ARCache, ARCacheBuilder, CacheStats};
5use serde::de::DeserializeOwned;
6use serde::{Deserialize, Serialize};
7
8use tempfile::NamedTempFile;
9
10use std::collections::BTreeSet;
11
12use std::borrow::Borrow;
13use std::fmt::Debug;
14use std::fs::File;
15use std::hash::Hash;
16use std::io::{BufRead, BufReader, BufWriter, Seek, Write};
17use std::num::NonZeroUsize;
18use std::path::{Path, PathBuf};
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::Arc;
21
22use rand::prelude::*;
23
24static CHECK_INLINE: usize = 536870912;
25
26pub mod prelude {
27 pub use concread::arcache::CacheStats;
28 pub use tempfile::NamedTempFile;
29}
30
31#[derive(Debug, Serialize, Deserialize)]
32pub struct CacheObjMeta<K, D> {
33 pub key: K,
34 pub key_str: String,
35 pub crc: u32,
36 pub userdata: D,
37}
38
39#[derive(Clone, Debug)]
40pub struct CacheObj<K, D>
41where
42 K: Serialize
43 + DeserializeOwned
44 + AsRef<[u8]>
45 + Hash
46 + Eq
47 + Ord
48 + Clone
49 + Debug
50 + Sync
51 + Send
52 + 'static,
53 D: Serialize + DeserializeOwned + Clone + Debug + Sync + Send + 'static,
54{
55 pub key: K,
56 pub fhandle: Arc<FileHandle>,
57 pub userdata: D,
58}
59
60#[derive(Clone, Debug)]
61pub struct FileHandle {
62 pub key_str: String,
63 pub meta_path: PathBuf,
64 pub path: PathBuf,
65 pub amt: usize,
66 pub crc: u32,
67 running: Arc<AtomicBool>,
68}
69
70impl Drop for FileHandle {
71 fn drop(&mut self) {
72 if self.running.load(Ordering::Acquire) {
73 info!("🗑 remove fhandle -> {:?}", self.path);
74 let _ = std::fs::remove_file(&self.meta_path);
75 let _ = std::fs::remove_file(&self.path);
76 }
77 }
78}
79
80impl FileHandle {
81 pub fn reopen(&self) -> Result<File, std::io::Error> {
82 File::open(&self.path)
83 }
84}
85
86#[instrument(level = "trace")]
87fn crc32c_len(file: &mut File) -> Result<u32, ()> {
88 file.seek(std::io::SeekFrom::Start(0)).map_err(|e| {
89 error!("Unable to seek tempfile -> {:?}", e);
90 })?;
91
92 let mut buf_file = BufReader::with_capacity(8192, file);
99 let mut crc = 0;
100 loop {
101 match buf_file.fill_buf() {
102 Ok(buffer) => {
103 let length = buffer.len();
104 if length == 0 {
105 break;
107 } else {
108 crc = crc32c::crc32c_append(crc, &buffer);
110 buf_file.consume(length);
111 }
112 }
113 Err(e) => {
114 error!("Bufreader error -> {:?}", e);
115 return Err(());
116 }
117 }
118 }
119 debug!("crc32c is: {:x}", crc);
120
121 Ok(crc)
122}
123
124#[derive(Clone)]
125pub struct ArcDiskCache<K, D>
126where
127 K: Serialize
128 + DeserializeOwned
129 + AsRef<[u8]>
130 + Hash
131 + Eq
132 + Ord
133 + Clone
134 + Debug
135 + Sync
136 + Send
137 + 'static,
138 D: Serialize + DeserializeOwned + Clone + Debug + Sync + Send + 'static,
139{
140 cache: Arc<ARCache<K, CacheObj<K, D>>>,
141 pub content_dir: PathBuf,
142 running: Arc<AtomicBool>,
143 durable_fs: bool,
144}
145
146impl<K, D> Drop for ArcDiskCache<K, D>
147where
148 K: Serialize
149 + DeserializeOwned
150 + AsRef<[u8]>
151 + Hash
152 + Eq
153 + Ord
154 + Clone
155 + Debug
156 + Sync
157 + Send
158 + 'static,
159 D: Serialize + DeserializeOwned + Clone + Debug + Sync + Send + 'static,
160{
161 fn drop(&mut self) {
162 trace!("ArcDiskCache - setting running to false");
163 self.running.store(false, Ordering::Release);
164 }
165}
166
167impl<K, D> ArcDiskCache<K, D>
168where
169 K: Serialize
170 + DeserializeOwned
171 + AsRef<[u8]>
172 + Hash
173 + Eq
174 + Ord
175 + Clone
176 + Debug
177 + Sync
178 + Send
179 + 'static,
180 D: Serialize + DeserializeOwned + Clone + Debug + Sync + Send + 'static,
181{
182 pub fn new(capacity: usize, content_dir: &Path, durable_fs: bool) -> Self {
183 info!("capacity: {} content_dir: {:?}", capacity, content_dir);
184
185 let cache = Arc::new(
186 ARCacheBuilder::new()
187 .set_size(capacity, 0)
188 .set_watermark(0)
189 .build()
190 .expect("Invalid ARCache Parameters"),
191 );
192
193 let running = Arc::new(AtomicBool::new(true));
194
195 let mut entries = std::fs::read_dir(content_dir)
198 .expect("unable to read content dir")
199 .map(|res| res.map(|e| e.path()))
200 .collect::<Result<Vec<_>, std::io::Error>>()
201 .expect("Failed to access some dirents");
202
203 entries.sort();
204
205 let (meta, files): (Vec<_>, Vec<_>) = entries
206 .into_iter()
207 .partition(|p| p.extension() == Some(std::ffi::OsStr::new("meta")));
208
209 let meta_len = meta.len();
210 info!("Will process {} metadata", meta_len);
211
212 let meta: Vec<(PathBuf, CacheObjMeta<K, D>)> = meta
214 .into_iter()
215 .enumerate()
216 .filter_map(|(i, p)| {
217 if i % 1000 == 0 {
218 info!("{} of {}", i, meta_len);
219 }
220 trace!(?p, "meta read");
221 File::open(&p)
222 .ok()
223 .map(|f| BufReader::new(f))
224 .and_then(|rdr| serde_json::from_reader(rdr).ok())
225 .map(|m| (p.to_path_buf(), m))
226 })
227 .collect();
228
229 let meta: Vec<CacheObj<K, D>> = meta
230 .into_iter()
231 .enumerate()
232 .filter_map(|(i, (meta_path, m))| {
233 if i % 1000 == 0 {
234 info!("{} of {}", i, meta_len);
235 }
236 let CacheObjMeta {
237 key,
238 key_str,
239 crc,
240 userdata,
241 } = m;
242
243 let path = content_dir.join(&key_str);
244
245 if !path.exists() {
246 return None;
247 }
248
249 let mut file = File::open(&path).ok()?;
250
251 let amt = match file.metadata().map(|m| m.len() as usize) {
252 Ok(a) => a,
253 Err(e) => {
254 error!("Unable to access metadata -> {:?}", e);
255 return None;
256 }
257 };
258
259 if amt >= CHECK_INLINE {
260 let crc_ck = crc32c_len(&mut file).ok()?;
262 if crc_ck != crc {
263 warn!("file potentially corrupted - {:?}", meta_path);
264 return None;
265 }
266 }
267
268 Some(CacheObj {
269 key,
270 userdata,
271 fhandle: Arc::new(FileHandle {
272 key_str,
273 meta_path,
274 path,
275 amt,
276 crc,
277 running: running.clone(),
278 }),
279 })
280 })
281 .collect();
282
283 info!("Found {:?} existing metadata", meta.len());
284
285 let mut files: BTreeSet<_> = files.into_iter().collect();
287 meta.iter().for_each(|co| {
288 files.remove(&co.fhandle.path);
289 });
290
291 files.iter().for_each(|p| {
292 trace!("🗑 -> {:?}", p);
293 let _ = std::fs::remove_file(p);
294 });
295
296 let mut wrtxn = cache.write();
298 meta.into_iter().for_each(|co| {
299 let key = co.key.clone();
300 let amt = NonZeroUsize::new(co.fhandle.amt)
301 .unwrap_or(unsafe { NonZeroUsize::new_unchecked(1) });
302 wrtxn.insert_sized(key, co, amt);
303 });
304 wrtxn.commit();
305
306 cache.reset_stats();
308
309 debug!("ArcDiskCache Ready!");
310
311 ArcDiskCache {
312 content_dir: content_dir.to_path_buf(),
313 cache,
314 running,
315 durable_fs,
316 }
317 }
318
319 pub fn get<Q: ?Sized>(&self, q: &Q) -> Option<CacheObj<K, D>>
320 where
321 K: Borrow<Q>,
322 Q: Hash + Eq + Ord,
323 {
324 let mut rtxn = self.cache.read();
325 rtxn.get(q)
326 .and_then(|obj| {
327 let mut file = File::open(&obj.fhandle.path).ok()?;
328
329 let amt = file
330 .metadata()
331 .map(|m| m.len() as usize)
332 .map_err(|e| {
333 error!("Unable to access metadata -> {:?}", e);
334 })
335 .ok()?;
336
337 if !self.durable_fs {
338 if amt < CHECK_INLINE {
339 let crc_ck = crc32c_len(&mut file).ok()?;
340 if crc_ck != obj.fhandle.crc {
341 warn!("file potentially corrupted - {:?}", obj.fhandle.meta_path);
342 return None;
343 }
344 } else {
345 info!("Skipping crc check, file too large");
346 }
347 }
348
349 Some(obj)
350 })
351 .cloned()
352 }
353
354 pub fn path(&self) -> &Path {
355 &self.content_dir
356 }
357
358 pub fn view_stats(&self) -> CacheStats {
359 (*self.cache.view_stats()).clone()
360 }
361
362 pub fn insert_bytes(&self, k: K, d: D, bytes: &[u8]) -> () {
363 let mut fh = match self.new_tempfile() {
364 Some(fh) => fh,
365 None => return,
366 };
367
368 if let Err(e) = fh.write(bytes) {
369 error!(?e, "failed to write bytes to file");
370 return;
371 };
372
373 if let Err(e) = fh.flush() {
374 error!(?e, "failed to flush bytes to file");
375 return;
376 }
377
378 self.insert(k, d, fh)
379 }
380
381 pub fn insert(&self, k: K, d: D, mut fh: NamedTempFile) -> () {
383 let file = fh.as_file_mut();
384
385 let amt = match file.metadata().map(|m| m.len() as usize) {
386 Ok(a) => a,
387 Err(e) => {
388 error!("Unable to access metadata -> {:?}", e);
389 return;
390 }
391 };
392
393 let crc = match crc32c_len(file) {
394 Ok(v) => v,
395 Err(_) => return,
396 };
397
398 let mut rng = rand::thread_rng();
400 let mut salt: [u8; 16] = [0; 16];
401 rng.fill(&mut salt);
402
403 let k_slice: &[u8] = k.as_ref();
404
405 let mut adapted_k = Vec::with_capacity(16 + k_slice.len());
406 adapted_k.extend_from_slice(k_slice);
407 adapted_k.extend_from_slice(&salt);
408
409 let key_str = base64::encode_config(&adapted_k, base64::URL_SAFE);
410 let key_str = if key_str.len() > 160 {
411 debug!("Needing to truncate filename due to excessive key length");
412 let at = key_str.len() - 160;
413 key_str.split_at(at).1.to_string()
414 } else {
415 key_str
416 };
417
418 let path = self.content_dir.join(&key_str);
419 let mut meta_str = key_str.clone();
420 meta_str.push_str(".meta");
421 let meta_path = self.content_dir.join(&meta_str);
422
423 info!("{:?}", path);
424 info!("{:?}", meta_path);
425
426 let objmeta = CacheObjMeta {
427 key: k.clone(),
428 key_str: key_str.clone(),
429 crc,
430 userdata: d.clone(),
431 };
432
433 if meta_path.exists() {
434 warn!(
435 immediate = true,
436 "file collision detected, skipping write of {}", meta_str
437 );
438 return;
439 }
440
441 let m_file = match File::create(&meta_path).map(BufWriter::new) {
442 Ok(f) => f,
443 Err(e) => {
444 error!(
445 immediate = true,
446 "CRITICAL! Failed to open metadata {:?}", e
447 );
448 return;
449 }
450 };
451
452 if let Err(e) = serde_json::to_writer(m_file, &objmeta) {
453 error!(
454 immediate = true,
455 "CRITICAL! Failed to write metadata {:?}", e
456 );
457 return;
458 } else {
459 info!("Persisted metadata for {:?}", &meta_path);
460
461 if let Err(e) = fh.persist(&path) {
462 error!(immediate = true, "CRITICAL! Failed to persist file {:?}", e);
463 return;
464 }
465 }
466
467 info!("Persisted data for {:?}", &path);
468
469 let co = CacheObj {
471 key: k.clone(),
472 userdata: d,
473 fhandle: Arc::new(FileHandle {
474 key_str,
475 meta_path,
476 path,
477 amt,
478 crc,
479 running: self.running.clone(),
480 }),
481 };
482
483 let amt = NonZeroUsize::new(amt).unwrap_or(unsafe { NonZeroUsize::new_unchecked(1) });
484
485 let mut wrtxn = self.cache.write();
486 wrtxn.insert_sized(k, co, amt);
487 debug!("commit");
488 wrtxn.commit();
489 }
490
491 pub fn update_userdata<Q: ?Sized, F>(&self, q: &Q, mut func: F)
493 where
494 K: Borrow<Q>,
495 Q: Hash + Eq + Ord,
496 F: FnMut(&mut D),
497 {
498 let mut wrtxn = self.cache.write();
499
500 if let Some(mref) = wrtxn.get_mut(q, false) {
501 func(&mut mref.userdata);
502
503 let objmeta = CacheObjMeta {
504 key: mref.key.clone(),
505 key_str: mref.fhandle.key_str.clone(),
506 crc: mref.fhandle.crc,
507 userdata: mref.userdata.clone(),
508 };
509
510 let m_file = File::create(&mref.fhandle.meta_path)
512 .map(BufWriter::new)
513 .map_err(|e| {
514 error!("Failed to open metadata {:?}", e);
515 })
516 .unwrap();
517
518 serde_json::to_writer(m_file, &objmeta)
519 .map_err(|e| {
520 error!("Failed to write metadata {:?}", e);
521 })
522 .unwrap();
523
524 info!("Persisted metadata for {:?}", &mref.fhandle.meta_path);
525
526 debug!("commit");
527 wrtxn.commit();
528 }
529 }
530
531 pub fn update_all_userdata<F, C>(&self, check: C, mut func: F)
532 where
533 C: Fn(&D) -> bool,
534 F: FnMut(&mut D),
535 {
536 let mut wrtxn = self.cache.write();
537
538 let keys: Vec<_> = wrtxn
539 .iter()
540 .filter_map(|(k, mref)| {
541 if check(&mref.userdata) {
542 Some(k.clone())
543 } else {
544 None
545 }
546 })
547 .collect();
548
549 for k in keys {
550 if let Some(mref) = wrtxn.get_mut(&k, false) {
551 func(&mut mref.userdata);
552
553 let objmeta = CacheObjMeta {
554 key: mref.key.clone(),
555 key_str: mref.fhandle.key_str.clone(),
556 crc: mref.fhandle.crc,
557 userdata: mref.userdata.clone(),
558 };
559
560 let m_file = File::create(&mref.fhandle.meta_path)
562 .map(BufWriter::new)
563 .map_err(|e| {
564 error!("Failed to open metadata {:?}", e);
565 })
566 .unwrap();
567
568 serde_json::to_writer(m_file, &objmeta)
569 .map_err(|e| {
570 error!("Failed to write metadata {:?}", e);
571 })
572 .unwrap();
573
574 info!("Persisted metadata for {:?}", &mref.fhandle.meta_path);
575 }
576 }
577
578 debug!("commit");
579 wrtxn.commit();
580 }
581
582 pub fn remove(&self, k: K) {
584 let mut wrtxn = self.cache.write();
585 let _ = wrtxn.remove(k);
586 debug!("commit");
588 wrtxn.commit();
589 }
590
591 pub fn new_tempfile(&self) -> Option<NamedTempFile> {
593 NamedTempFile::new_in(&self.content_dir)
594 .map_err(|e| error!(?e))
595 .ok()
596 }
597}
598
599#[cfg(test)]
600mod tests {
601 use super::ArcDiskCache;
602 use std::io::Write;
603 use tempfile::tempdir;
604
605 #[test]
606 fn disk_cache_test_basic() {
607 let _ = tracing_subscriber::fmt::try_init();
608
609 let dir = tempdir().expect("Failed to build tempdir");
610 let dc: ArcDiskCache<Vec<u8>, ()> = ArcDiskCache::new(1024, dir.path(), false);
612
613 let mut fh = dc.new_tempfile().unwrap();
614 let k = vec![0, 1, 2, 3, 4, 5];
615
616 let file = fh.as_file_mut();
617 file.write_all(b"Hello From Cache").unwrap();
618
619 dc.insert(k, (), fh);
620 }
621}