1use std::{
5 borrow::Cow,
6 fs, io,
7 io::Write,
8 ops::Deref,
9 path::{Path, PathBuf},
10 sync::{
11 Arc,
12 atomic::{AtomicBool, AtomicUsize, Ordering},
13 },
14};
15
16use lru_mem::{HeapSize, MemSize};
17use rkyv::{
18 Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
19 api::high::{HighSerializer, HighValidator},
20 bytecheck::CheckBytes,
21 de::Pool,
22 rancor::{Error as RkyvError, Strategy},
23 ser::allocator::ArenaHandle,
24 util::AlignedVec,
25};
26use tempfile::NamedTempFile;
27use threadpool::ThreadPool;
28
29use crate::{
30 hash::ObjectHash,
31 internal::{
32 metadata::{EntryMeta, MetaAttached},
33 object::types::ObjectType,
34 pack::{entry::Entry, utils},
35 },
36};
37
38pub trait FileLoadStore: Sized {
42 fn f_load(path: &Path) -> Result<Self, io::Error>;
43 fn f_save(&self, path: &Path) -> Result<(), io::Error>;
44}
45
46fn write_bytes_atomically(path: &Path, data: &[u8]) -> Result<(), io::Error> {
47 if path.exists() {
48 return Ok(());
49 }
50 let dir = path
51 .parent()
52 .filter(|parent| !parent.as_os_str().is_empty())
53 .unwrap_or_else(|| Path::new("."));
54
55 let mut temp_file = NamedTempFile::new_in(dir)?;
56 temp_file.write_all(data)?;
57
58 match temp_file.persist_noclobber(path) {
59 Ok(_persisted) => Ok(()),
60 Err(err) if err.error.kind() == io::ErrorKind::AlreadyExists && path.exists() => Ok(()),
61 Err(err) => Err(err.error),
62 }
63}
64
65impl<T> FileLoadStore for T
66where
67 T: Archive,
68 T: for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
69 T::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
70 + RkyvDeserialize<T, Strategy<Pool, RkyvError>>,
71{
72 fn f_load(path: &Path) -> Result<T, io::Error> {
74 let data = fs::read(path)?;
75 let obj = rkyv::from_bytes::<T, RkyvError>(&data).map_err(io::Error::other)?;
76 Ok(obj)
77 }
78
79 fn f_save(&self, path: &Path) -> Result<(), io::Error> {
80 if path.exists() {
81 return Ok(());
82 }
83
84 let data = rkyv::to_bytes::<RkyvError>(self).map_err(io::Error::other)?;
85 write_bytes_atomically(path, &data)
86 }
87}
88
89#[derive(
91 PartialEq,
92 Eq,
93 Clone,
94 Debug,
95 serde::Serialize,
96 serde::Deserialize,
97 rkyv::Archive,
98 rkyv::Serialize,
99 rkyv::Deserialize,
100)]
101pub(crate) enum CacheObjectInfo {
102 BaseObject(ObjectType, ObjectHash),
106 OffsetDelta(usize, usize),
109 OffsetZstdelta(usize, usize),
111 HashDelta(ObjectHash, usize),
114}
115
116impl CacheObjectInfo {
117 pub(crate) fn object_type(&self) -> ObjectType {
119 match self {
120 CacheObjectInfo::BaseObject(obj_type, _) => *obj_type,
121 CacheObjectInfo::OffsetDelta(_, _) => ObjectType::OffsetDelta,
122 CacheObjectInfo::OffsetZstdelta(_, _) => ObjectType::OffsetZstdelta,
123 CacheObjectInfo::HashDelta(_, _) => ObjectType::HashDelta,
124 }
125 }
126}
127
128#[derive(Debug)]
130pub struct CacheObject {
131 pub(crate) info: CacheObjectInfo,
132 pub offset: usize,
133 pub crc32: u32,
134 pub data_decompressed: Vec<u8>,
135 pub mem_recorder: Option<Arc<AtomicUsize>>, pub is_delta_in_pack: bool,
137}
138
139#[derive(Debug, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
140struct CacheObjectOnDisk {
141 info: CacheObjectInfo,
142 offset: usize,
143 crc32: u32,
144 data_decompressed: Vec<u8>,
145 is_delta_in_pack: bool,
146}
147
148#[derive(Debug, rkyv::Archive, rkyv::Serialize)]
149struct CacheObjectOnDiskRef<'a> {
150 #[rkyv(with = rkyv::with::AsOwned)]
151 info: Cow<'a, CacheObjectInfo>,
152 offset: usize,
153 crc32: u32,
154 #[rkyv(with = rkyv::with::AsOwned)]
155 data_decompressed: Cow<'a, [u8]>,
156 is_delta_in_pack: bool,
157}
158
159impl<'a> From<&'a CacheObject> for CacheObjectOnDiskRef<'a> {
160 fn from(value: &'a CacheObject) -> Self {
161 Self {
162 info: Cow::Borrowed(&value.info),
163 offset: value.offset,
164 crc32: value.crc32,
165 data_decompressed: Cow::Borrowed(value.data_decompressed.as_slice()),
166 is_delta_in_pack: value.is_delta_in_pack,
167 }
168 }
169}
170
171impl From<CacheObjectOnDisk> for CacheObject {
172 fn from(value: CacheObjectOnDisk) -> Self {
173 Self {
174 info: value.info,
175 offset: value.offset,
176 crc32: value.crc32,
177 data_decompressed: value.data_decompressed,
178 mem_recorder: None,
179 is_delta_in_pack: value.is_delta_in_pack,
180 }
181 }
182}
183
184impl FileLoadStore for CacheObject {
185 fn f_load(path: &Path) -> Result<Self, io::Error> {
186 let obj = CacheObjectOnDisk::f_load(path)?;
187 Ok(obj.into())
188 }
189
190 fn f_save(&self, path: &Path) -> Result<(), io::Error> {
191 if path.exists() {
192 return Ok(());
193 }
194
195 let data = rkyv::to_bytes::<RkyvError>(&CacheObjectOnDiskRef::from(self))
196 .map_err(io::Error::other)?;
197 write_bytes_atomically(path, &data)
198 }
199}
200
201impl Clone for CacheObject {
202 fn clone(&self) -> Self {
203 let obj = CacheObject {
204 info: self.info.clone(),
205 offset: self.offset,
206 crc32: self.crc32,
207 data_decompressed: self.data_decompressed.clone(),
208 mem_recorder: self.mem_recorder.clone(),
209 is_delta_in_pack: self.is_delta_in_pack,
210 };
211 obj.record_mem_size();
212 obj
213 }
214}
215
216impl HeapSize for CacheObject {
220 fn heap_size(&self) -> usize {
230 match &self.info {
231 CacheObjectInfo::BaseObject(_, _) => self.data_decompressed.heap_size(),
232 CacheObjectInfo::OffsetDelta(_, delta_final_size)
233 | CacheObjectInfo::OffsetZstdelta(_, delta_final_size)
234 | CacheObjectInfo::HashDelta(_, delta_final_size) => {
235 self.data_decompressed.heap_size() + delta_final_size
246 }
247 }
248 }
249}
250
251impl Drop for CacheObject {
252 fn drop(&mut self) {
255 if let Some(mem_recorder) = &self.mem_recorder {
257 mem_recorder.fetch_sub((*self).mem_size(), Ordering::Release);
258 }
259 }
260}
261
262pub trait MemSizeRecorder: MemSize {
269 fn record_mem_size(&self);
270 fn set_mem_recorder(&mut self, mem_size: Arc<AtomicUsize>);
271 }
273
274impl MemSizeRecorder for CacheObject {
275 fn record_mem_size(&self) {
278 if let Some(mem_recorder) = &self.mem_recorder {
279 mem_recorder.fetch_add(self.mem_size(), Ordering::Release);
280 }
281 }
282
283 fn set_mem_recorder(&mut self, mem_recorder: Arc<AtomicUsize>) {
284 self.mem_recorder = Some(mem_recorder);
285 }
286
287 }
291
292impl CacheObject {
293 pub fn new_for_undeltified(
295 obj_type: ObjectType,
296 data: Vec<u8>,
297 offset: usize,
298 crc32: u32,
299 ) -> Self {
300 let hash = utils::calculate_object_hash(obj_type, &data);
301 CacheObject {
302 info: CacheObjectInfo::BaseObject(obj_type, hash),
303 offset,
304 crc32,
305 data_decompressed: data,
306 mem_recorder: None,
307 is_delta_in_pack: false,
308 }
309 }
310
311 pub fn object_type(&self) -> ObjectType {
313 self.info.object_type()
314 }
315
316 pub fn base_object_hash(&self) -> Option<ObjectHash> {
320 match &self.info {
321 CacheObjectInfo::BaseObject(_, hash) => Some(*hash),
322 _ => None,
323 }
324 }
325
326 pub fn offset_delta(&self) -> Option<usize> {
330 match &self.info {
331 CacheObjectInfo::OffsetDelta(offset, _) => Some(*offset),
332 _ => None,
333 }
334 }
335
336 pub fn hash_delta(&self) -> Option<ObjectHash> {
340 match &self.info {
341 CacheObjectInfo::HashDelta(hash, _) => Some(*hash),
342 _ => None,
343 }
344 }
345
346 pub fn to_entry(&self) -> Entry {
348 match self.info {
349 CacheObjectInfo::BaseObject(obj_type, hash) => Entry {
350 obj_type,
351 data: self.data_decompressed.clone(),
352 hash,
353 chain_len: 0,
354 },
355 _ => {
356 unreachable!("delta object should not persist!")
357 }
358 }
359 }
360
361 pub fn to_entry_metadata(&self) -> MetaAttached<Entry, EntryMeta> {
363 match self.info {
364 CacheObjectInfo::BaseObject(obj_type, hash) => {
365 let entry = Entry {
366 obj_type,
367 data: self.data_decompressed.clone(),
368 hash,
369 chain_len: 0,
370 };
371 let meta = EntryMeta {
372 pack_offset: Some(self.offset),
374 crc32: Some(self.crc32),
375 is_delta: Some(self.is_delta_in_pack),
376 ..Default::default()
377 };
378 MetaAttached { inner: entry, meta }
379 }
380
381 _ => {
382 unreachable!("delta object should not persist!")
383 }
384 }
385 }
386}
387
388pub trait ArcWrapperBounds: HeapSize + FileLoadStore + Send + Sync + 'static {}
390impl<T: HeapSize + FileLoadStore + Send + Sync + 'static> ArcWrapperBounds for T {}
393
394pub struct ArcWrapper<T: ArcWrapperBounds> {
398 pub data: Arc<T>,
399 complete_signal: Arc<AtomicBool>,
400 pool: Option<Arc<ThreadPool>>,
401 pub store_path: Option<PathBuf>, }
403impl<T: ArcWrapperBounds> ArcWrapper<T> {
404 pub fn new(data: Arc<T>, share_flag: Arc<AtomicBool>, pool: Option<Arc<ThreadPool>>) -> Self {
406 ArcWrapper {
407 data,
408 complete_signal: share_flag,
409 pool,
410 store_path: None,
411 }
412 }
413 pub fn set_store_path(&mut self, path: PathBuf) {
415 self.store_path = Some(path);
416 }
417}
418
419impl<T: ArcWrapperBounds> HeapSize for ArcWrapper<T> {
420 fn heap_size(&self) -> usize {
421 self.data.heap_size()
422 }
423}
424
425impl<T: ArcWrapperBounds> Clone for ArcWrapper<T> {
426 fn clone(&self) -> Self {
428 ArcWrapper {
429 data: self.data.clone(),
430 complete_signal: self.complete_signal.clone(),
431 pool: self.pool.clone(),
432 store_path: None,
433 }
434 }
435}
436
437impl<T: ArcWrapperBounds> Deref for ArcWrapper<T> {
438 type Target = Arc<T>;
439 fn deref(&self) -> &Self::Target {
440 &self.data
441 }
442}
443impl<T: ArcWrapperBounds> Drop for ArcWrapper<T> {
444 fn drop(&mut self) {
447 if !self.complete_signal.load(Ordering::Acquire)
448 && let Some(path) = &self.store_path
449 {
450 match &self.pool {
451 Some(pool) => {
452 let data_copy = self.data.clone();
453 let path_copy = path.clone();
454 let complete_signal = self.complete_signal.clone();
455 while pool.queued_count() > 2000 {
458 std::thread::yield_now();
459 }
460 pool.execute(move || {
461 if !complete_signal.load(Ordering::Acquire) {
462 let res = data_copy.f_save(&path_copy);
463 if let Err(e) = res {
464 println!("[f_save] {path_copy:?} error: {e:?}");
465 }
466 }
467 });
468 }
469 None => {
470 let res = self.data.f_save(path);
471 if let Err(e) = res {
472 println!("[f_save] {path:?} error: {e:?}");
473 }
474 }
475 }
476 }
477 }
478}
479#[cfg(test)]
480mod test {
481 use std::{fs, sync::Mutex};
482
483 use lru_mem::LruCache;
484 use tempfile::tempdir;
485
486 use super::*;
487 use crate::hash::{HashKind, set_hash_kind_for_test};
488
489 fn make_obj(size: usize) -> CacheObject {
491 CacheObject {
492 info: CacheObjectInfo::BaseObject(ObjectType::Blob, ObjectHash::default()),
493 offset: 0,
494 crc32: 0,
495 data_decompressed: vec![0; size],
496 mem_recorder: None,
497 is_delta_in_pack: false,
498 }
499 }
500
501 #[test]
503 fn test_heap_size_record() {
504 for (kind, size) in [(HashKind::Sha1, 1024usize), (HashKind::Sha256, 2048usize)] {
505 let _guard = set_hash_kind_for_test(kind);
506 let mut obj = make_obj(size);
507 let mem = Arc::new(AtomicUsize::default());
508 assert_eq!(mem.load(Ordering::Relaxed), 0);
509 obj.set_mem_recorder(mem.clone());
510 obj.record_mem_size();
511 assert_eq!(mem.load(Ordering::Relaxed), obj.mem_size());
512 drop(obj);
513 assert_eq!(mem.load(Ordering::Relaxed), 0);
514 }
515 }
516
517 #[test]
519 fn test_cache_object_with_same_size() {
520 for (kind, size) in [(HashKind::Sha1, 1024usize), (HashKind::Sha256, 2048usize)] {
521 let _guard = set_hash_kind_for_test(kind);
522 let a = make_obj(size);
523 assert_eq!(a.heap_size(), size);
524
525 let b = ArcWrapper::new(Arc::new(a.clone()), Arc::new(AtomicBool::new(false)), None);
526 assert_eq!(b.heap_size(), size);
527 }
528 }
529
530 #[test]
532 fn test_cache_object_with_lru() {
533 for (kind, cap, size_a, size_b) in [
534 (
535 HashKind::Sha1,
536 2048usize,
537 1024usize,
538 (1024.0 * 1.5) as usize,
539 ),
540 (
541 HashKind::Sha256,
542 4096usize,
543 2048usize,
544 (2048.0 * 1.5) as usize,
545 ),
546 ] {
547 let _guard = set_hash_kind_for_test(kind);
548 let mut cache = LruCache::new(cap);
549
550 let hash_a = ObjectHash::default();
551 let hash_b = ObjectHash::new(b"b"); let a = make_obj(size_a);
553 let b = make_obj(size_b);
554
555 {
556 let r = cache.insert(
557 hash_a.to_string(),
558 ArcWrapper::new(Arc::new(a.clone()), Arc::new(AtomicBool::new(true)), None),
559 );
560 assert!(r.is_ok());
561 }
562
563 {
564 let r = cache.try_insert(
565 hash_b.to_string(),
566 ArcWrapper::new(Arc::new(b.clone()), Arc::new(AtomicBool::new(true)), None),
567 );
568 assert!(r.is_err());
569 if let Err(lru_mem::TryInsertError::WouldEjectLru { .. }) = r {
570 } else {
572 panic!("Expected WouldEjectLru error");
573 }
574
575 let r = cache.insert(
576 hash_b.to_string(),
577 ArcWrapper::new(Arc::new(b.clone()), Arc::new(AtomicBool::new(true)), None),
578 );
579 assert!(r.is_ok());
580 }
581
582 {
583 let r = cache.get(&hash_a.to_string());
584 assert!(r.is_none());
585 }
586 }
587 }
588
589 #[derive(
591 serde::Serialize, serde::Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize,
592 )]
593 struct Test {
594 a: usize,
595 }
596 impl Drop for Test {
597 fn drop(&mut self) {
598 println!("drop Test");
599 }
600 }
601 impl HeapSize for Test {
602 fn heap_size(&self) -> usize {
603 self.a
604 }
605 }
606 #[test]
607 fn test_lru_drop() {
608 println!("insert a");
609 let cache = LruCache::new(2048);
610 let cache = Arc::new(Mutex::new(cache));
611 {
612 let mut c = cache.as_ref().lock().unwrap();
613 let _ = c.insert(
614 "a",
615 ArcWrapper::new(
616 Arc::new(Test { a: 1024 }),
617 Arc::new(AtomicBool::new(true)),
618 None,
619 ),
620 );
621 }
622 println!("insert b, a should be ejected");
623 {
624 let mut c = cache.as_ref().lock().unwrap();
625 let _ = c.insert(
626 "b",
627 ArcWrapper::new(
628 Arc::new(Test { a: 1200 }),
629 Arc::new(AtomicBool::new(true)),
630 None,
631 ),
632 );
633 }
634 let b = {
635 let mut c = cache.as_ref().lock().unwrap();
636 c.get("b").cloned()
637 };
638 println!("insert c, b should not be ejected");
639 {
640 let mut c = cache.as_ref().lock().unwrap();
641 let _ = c.insert(
642 "c",
643 ArcWrapper::new(
644 Arc::new(Test { a: 1200 }),
645 Arc::new(AtomicBool::new(true)),
646 None,
647 ),
648 );
649 }
650 println!("user b: {}", b.as_ref().unwrap().a);
651 println!("test over, enject all");
652 }
653
654 #[test]
655 fn test_cache_object_serialize() {
656 for (kind, size) in [(HashKind::Sha1, 1024usize), (HashKind::Sha256, 2048usize)] {
657 let _guard = set_hash_kind_for_test(kind);
658 let a = make_obj(size);
659 let s = rkyv::to_bytes::<RkyvError>(&CacheObjectOnDiskRef::from(&a)).unwrap();
660 let b: CacheObject = rkyv::from_bytes::<CacheObjectOnDisk, RkyvError>(&s)
661 .unwrap()
662 .into();
663 assert_eq!(a.info, b.info);
664 assert_eq!(a.data_decompressed, b.data_decompressed);
665 assert_eq!(a.offset, b.offset);
666 assert!(b.mem_recorder.is_none());
667 }
668 }
669
670 #[test]
671 fn test_write_bytes_atomically_creates_file_when_missing() {
672 let dir = tempdir().unwrap();
673 let path = dir.path().join("object");
674
675 write_bytes_atomically(&path, b"fresh").unwrap();
676
677 assert_eq!(fs::read(&path).unwrap(), b"fresh");
678 }
679
680 #[test]
681 fn test_write_bytes_atomically_returns_when_target_exists() {
682 let dir = tempdir().unwrap();
683 let path = dir.path().join("object");
684
685 fs::write(&path, b"existing").unwrap();
686 write_bytes_atomically(&path, b"new-data").unwrap();
687
688 assert_eq!(fs::read(&path).unwrap(), b"existing");
689 }
690
691 #[test]
692 fn test_cache_object_file_roundtrip() {
693 let dir = tempdir().unwrap();
694 let path = dir.path().join("object");
695 let a = make_obj(1024);
696
697 a.f_save(&path).unwrap();
698 let b = CacheObject::f_load(&path).unwrap();
699
700 assert_eq!(a.info, b.info);
701 assert_eq!(a.data_decompressed, b.data_decompressed);
702 assert_eq!(a.offset, b.offset);
703 assert!(b.mem_recorder.is_none());
704 }
705
706 #[test]
707 fn test_arc_wrapper_drop_store() {
708 let mut path = PathBuf::from(".cache_temp/test_arc_wrapper_drop_store");
709 fs::create_dir_all(&path).unwrap();
710 path.push("test_obj");
711 let mut a = ArcWrapper::new(Arc::new(1024), Arc::new(AtomicBool::new(false)), None);
712 a.set_store_path(path.clone());
713 drop(a);
714
715 assert!(path.exists());
716 path.pop();
717 fs::remove_dir_all(&path).unwrap();
718 let _ = fs::remove_dir(".cache_temp");
720 }
721
722 #[test]
723 fn test_arc_wrapper_with_lru() {
725 let mut cache = LruCache::new(1500);
726 let path = PathBuf::from(".cache_temp/test_arc_wrapper_with_lru");
727 let _ = fs::remove_dir_all(&path);
728 fs::create_dir_all(&path).unwrap();
729 let shared_flag = Arc::new(AtomicBool::new(false));
730
731 let a_path = path.join("a");
733 {
734 let mut a = ArcWrapper::new(Arc::new(Test { a: 1024 }), shared_flag.clone(), None);
735 a.set_store_path(a_path.clone());
736 let b = ArcWrapper::new(Arc::new(1024), shared_flag.clone(), None);
737 assert!(b.store_path.is_none());
738
739 println!("insert a with heap size: {:?}", a.heap_size());
740 let rt = cache.insert("a", a);
741 if let Err(e) = rt {
742 panic!("{}", format!("insert a failed: {:?}", e.to_string()));
743 }
744 println!("after insert a, cache used = {}", cache.current_size());
745 }
746 assert!(!a_path.exists());
747
748 let b_path = path.join("b");
749 {
751 let mut b = ArcWrapper::new(Arc::new(Test { a: 996 }), shared_flag.clone(), None);
752 b.set_store_path(b_path.clone());
753 let rt = cache.insert("b", b);
754 if let Err(e) = rt {
755 panic!("{}", format!("insert a failed: {:?}", e.to_string()));
756 }
757 println!("after insert b, cache used = {}", cache.current_size());
758 }
759 assert!(a_path.exists());
760 assert!(!b_path.exists());
761 shared_flag.store(true, Ordering::Release);
762 fs::remove_dir_all(path).unwrap();
763 let _ = fs::remove_dir(".cache_temp");
765 }
767}