1use std::fs::OpenOptions;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::{fs, io};
6use std::{ops::Deref, sync::Arc};
7
8use lru_mem::{HeapSize, MemSize};
9use serde::{Deserialize, Serialize};
10use threadpool::ThreadPool;
11
12use crate::internal::metadata::{EntryMeta, MetaAttached};
13use crate::internal::pack::entry::Entry;
14use crate::internal::pack::utils;
15use crate::{hash::ObjectHash, internal::object::types::ObjectType};
16
17pub trait FileLoadStore: Serialize + for<'a> Deserialize<'a> {
22 fn f_load(path: &Path) -> Result<Self, io::Error>;
23 fn f_save(&self, path: &Path) -> Result<(), io::Error>;
24}
25
26impl<T: Serialize + for<'a> Deserialize<'a>> FileLoadStore for T {
28 fn f_load(path: &Path) -> Result<T, io::Error> {
29 let data = fs::read(path)?;
30 let obj: T = bincode::serde::decode_from_slice(&data, bincode::config::standard())
31 .map_err(io::Error::other)?
32 .0;
33 Ok(obj)
34 }
35 fn f_save(&self, path: &Path) -> Result<(), io::Error> {
36 if path.exists() {
37 return Ok(());
38 }
39 let data = bincode::serde::encode_to_vec(self, bincode::config::standard()).unwrap();
40 let path = path.with_extension("temp");
41 {
42 let mut file = OpenOptions::new()
43 .write(true)
44 .create_new(true)
45 .open(path.clone())?;
46 file.write_all(&data)?;
47 }
48 let final_path = path.with_extension("");
49 fs::rename(&path, final_path.clone())?;
50 Ok(())
51 }
52}
53
54#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
56pub(crate) enum CacheObjectInfo {
57 BaseObject(ObjectType, ObjectHash),
61 OffsetDelta(usize, usize),
64 OffsetZstdelta(usize, usize),
66 HashDelta(ObjectHash, usize),
69}
70
71impl CacheObjectInfo {
72 pub(crate) fn object_type(&self) -> ObjectType {
74 match self {
75 CacheObjectInfo::BaseObject(obj_type, _) => *obj_type,
76 CacheObjectInfo::OffsetDelta(_, _) => ObjectType::OffsetDelta,
77 CacheObjectInfo::OffsetZstdelta(_, _) => ObjectType::OffsetZstdelta,
78 CacheObjectInfo::HashDelta(_, _) => ObjectType::HashDelta,
79 }
80 }
81}
82
83#[derive(Debug, Serialize, Deserialize)]
84pub struct CacheObject {
85 pub(crate) info: CacheObjectInfo,
86 pub offset: usize,
87 pub data_decompressed: Vec<u8>,
88 pub mem_recorder: Option<Arc<AtomicUsize>>, pub is_delta_in_pack: bool,
90}
91
92impl Clone for CacheObject {
93 fn clone(&self) -> Self {
94 let obj = CacheObject {
95 info: self.info.clone(),
96 offset: self.offset,
97 data_decompressed: self.data_decompressed.clone(),
98 mem_recorder: self.mem_recorder.clone(),
99 is_delta_in_pack: self.is_delta_in_pack,
100 };
101 obj.record_mem_size();
102 obj
103 }
104}
105
106impl HeapSize for CacheObject {
110 fn heap_size(&self) -> usize {
120 match &self.info {
121 CacheObjectInfo::BaseObject(_, _) => self.data_decompressed.heap_size(),
122 CacheObjectInfo::OffsetDelta(_, delta_final_size)
123 | CacheObjectInfo::OffsetZstdelta(_, delta_final_size)
124 | CacheObjectInfo::HashDelta(_, delta_final_size) => {
125 self.data_decompressed.heap_size() + delta_final_size
136 }
137 }
138 }
139}
140
141impl Drop for CacheObject {
142 fn drop(&mut self) {
145 if let Some(mem_recorder) = &self.mem_recorder {
147 mem_recorder.fetch_sub((*self).mem_size(), Ordering::Release);
148 }
149 }
150}
151
152pub trait MemSizeRecorder: MemSize {
159 fn record_mem_size(&self);
160 fn set_mem_recorder(&mut self, mem_size: Arc<AtomicUsize>);
161 }
163
164impl MemSizeRecorder for CacheObject {
165 fn record_mem_size(&self) {
168 if let Some(mem_recorder) = &self.mem_recorder {
169 mem_recorder.fetch_add(self.mem_size(), Ordering::Release);
170 }
171 }
172
173 fn set_mem_recorder(&mut self, mem_recorder: Arc<AtomicUsize>) {
174 self.mem_recorder = Some(mem_recorder);
175 }
176
177 }
181
182impl CacheObject {
183 pub fn new_for_undeltified(obj_type: ObjectType, data: Vec<u8>, offset: usize) -> Self {
185 let hash = utils::calculate_object_hash(obj_type, &data);
186 CacheObject {
187 info: CacheObjectInfo::BaseObject(obj_type, hash),
188 offset,
189 data_decompressed: data,
190 mem_recorder: None,
191 is_delta_in_pack: false,
192 }
193 }
194
195 pub fn object_type(&self) -> ObjectType {
197 self.info.object_type()
198 }
199
200 pub fn base_object_hash(&self) -> Option<ObjectHash> {
204 match &self.info {
205 CacheObjectInfo::BaseObject(_, hash) => Some(*hash),
206 _ => None,
207 }
208 }
209
210 pub fn offset_delta(&self) -> Option<usize> {
214 match &self.info {
215 CacheObjectInfo::OffsetDelta(offset, _) => Some(*offset),
216 _ => None,
217 }
218 }
219
220 pub fn hash_delta(&self) -> Option<ObjectHash> {
224 match &self.info {
225 CacheObjectInfo::HashDelta(hash, _) => Some(*hash),
226 _ => None,
227 }
228 }
229
230 pub fn to_entry(&self) -> Entry {
232 match self.info {
233 CacheObjectInfo::BaseObject(obj_type, hash) => Entry {
234 obj_type,
235 data: self.data_decompressed.clone(),
236 hash,
237 chain_len: 0,
238 },
239 _ => {
240 unreachable!("delta object should not persist!")
241 }
242 }
243 }
244
245 pub fn to_entry_metadata(&self) -> MetaAttached<Entry, EntryMeta> {
246 match self.info {
247 CacheObjectInfo::BaseObject(obj_type, hash) => {
248 let entry = Entry {
249 obj_type,
250 data: self.data_decompressed.clone(),
251 hash,
252 chain_len: 0,
253 };
254 let meta = EntryMeta {
255 pack_offset: Some(self.offset),
257 is_delta: Some(self.is_delta_in_pack),
258 ..Default::default()
259 };
260 MetaAttached { inner: entry, meta }
261 }
262
263 _ => {
264 unreachable!("delta object should not persist!")
265 }
266 }
267 }
268}
269
270pub trait ArcWrapperBounds:
272 HeapSize + Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static
273{
274}
275impl<T: HeapSize + Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static> ArcWrapperBounds
278 for T
279{
280}
281
282pub struct ArcWrapper<T: ArcWrapperBounds> {
286 pub data: Arc<T>,
287 complete_signal: Arc<AtomicBool>,
288 pool: Option<Arc<ThreadPool>>,
289 pub store_path: Option<PathBuf>, }
291impl<T: ArcWrapperBounds> ArcWrapper<T> {
292 pub fn new(data: Arc<T>, share_flag: Arc<AtomicBool>, pool: Option<Arc<ThreadPool>>) -> Self {
294 ArcWrapper {
295 data,
296 complete_signal: share_flag,
297 pool,
298 store_path: None,
299 }
300 }
301 pub fn set_store_path(&mut self, path: PathBuf) {
302 self.store_path = Some(path);
303 }
304}
305
306impl<T: ArcWrapperBounds> HeapSize for ArcWrapper<T> {
307 fn heap_size(&self) -> usize {
308 self.data.heap_size()
309 }
310}
311
312impl<T: ArcWrapperBounds> Clone for ArcWrapper<T> {
313 fn clone(&self) -> Self {
315 ArcWrapper {
316 data: self.data.clone(),
317 complete_signal: self.complete_signal.clone(),
318 pool: self.pool.clone(),
319 store_path: None,
320 }
321 }
322}
323
324impl<T: ArcWrapperBounds> Deref for ArcWrapper<T> {
325 type Target = Arc<T>;
326 fn deref(&self) -> &Self::Target {
327 &self.data
328 }
329}
330impl<T: ArcWrapperBounds> Drop for ArcWrapper<T> {
331 fn drop(&mut self) {
334 if !self.complete_signal.load(Ordering::Acquire)
335 && let Some(path) = &self.store_path
336 {
337 match &self.pool {
338 Some(pool) => {
339 let data_copy = self.data.clone();
340 let path_copy = path.clone();
341 let complete_signal = self.complete_signal.clone();
342 while pool.queued_count() > 2000 {
345 std::thread::yield_now();
346 }
347 pool.execute(move || {
348 if !complete_signal.load(Ordering::Acquire) {
349 let res = data_copy.f_save(&path_copy);
350 if let Err(e) = res {
351 println!("[f_save] {path_copy:?} error: {e:?}");
352 }
353 }
354 });
355 }
356 None => {
357 let res = self.data.f_save(path);
358 if let Err(e) = res {
359 println!("[f_save] {path:?} error: {e:?}");
360 }
361 }
362 }
363 }
364 }
365}
366#[cfg(test)]
367mod test {
368 use std::{fs, sync::Mutex};
369
370 use lru_mem::LruCache;
371
372 use super::*;
373 use crate::hash::{HashKind, set_hash_kind_for_test};
374 #[test]
375 fn test_heap_size_record() {
376 let _guard = set_hash_kind_for_test(HashKind::Sha1);
377 let mut obj = CacheObject {
378 info: CacheObjectInfo::BaseObject(ObjectType::Blob, ObjectHash::default()),
379 offset: 0,
380 data_decompressed: vec![0; 1024],
381 mem_recorder: None,
382 is_delta_in_pack: false,
383 };
384 let mem = Arc::new(AtomicUsize::default());
385 assert_eq!(mem.load(Ordering::Relaxed), 0);
386 obj.set_mem_recorder(mem.clone());
387 obj.record_mem_size();
388 assert_eq!(mem.load(Ordering::Relaxed), obj.mem_size());
389 drop(obj);
390 assert_eq!(mem.load(Ordering::Relaxed), 0);
391 }
392 #[test]
393 fn test_heap_size_record_sha256() {
394 let _guard = set_hash_kind_for_test(HashKind::Sha256);
395 let mut obj = CacheObject {
396 info: CacheObjectInfo::BaseObject(ObjectType::Blob, ObjectHash::default()),
397 offset: 0,
398 data_decompressed: vec![0; 2048],
399 mem_recorder: None,
400 is_delta_in_pack: false,
401 };
402 let mem = Arc::new(AtomicUsize::default());
403 assert_eq!(mem.load(Ordering::Relaxed), 0);
404 obj.set_mem_recorder(mem.clone());
405 obj.record_mem_size();
406 assert_eq!(mem.load(Ordering::Relaxed), obj.mem_size());
407 drop(obj);
408 assert_eq!(mem.load(Ordering::Relaxed), 0);
409 }
410
411 #[test]
412 fn test_cache_object_with_same_size() {
413 let _guard = set_hash_kind_for_test(HashKind::Sha1);
414 let a = CacheObject {
415 info: CacheObjectInfo::BaseObject(ObjectType::Blob, ObjectHash::default()),
416 offset: 0,
417 data_decompressed: vec![0; 1024],
418 mem_recorder: None,
419 is_delta_in_pack: false,
420 };
421 assert!(a.heap_size() == 1024);
422
423 let b = ArcWrapper::new(Arc::new(a.clone()), Arc::new(AtomicBool::new(false)), None);
425 assert!(b.heap_size() == 1024);
426 }
427 #[test]
428 fn test_cache_object_with_same_size_sha256() {
429 let _guard = set_hash_kind_for_test(HashKind::Sha256);
430 let a = CacheObject {
431 info: CacheObjectInfo::BaseObject(ObjectType::Blob, ObjectHash::default()),
432 offset: 0,
433 data_decompressed: vec![0; 2048],
434 mem_recorder: None,
435 is_delta_in_pack: false,
436 };
437 assert!(a.heap_size() == 2048);
438
439 let b = ArcWrapper::new(Arc::new(a.clone()), Arc::new(AtomicBool::new(false)), None);
441 assert!(b.heap_size() == 2048);
442 }
443
444 #[test]
445 fn test_cache_object_with_lru() {
446 let _guard = set_hash_kind_for_test(HashKind::Sha1);
447 let mut cache = LruCache::new(2048);
448
449 let hash_a = ObjectHash::default();
450 let hash_b = ObjectHash::new(b"b"); let a = CacheObject {
452 info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash_a),
453 offset: 0,
454 data_decompressed: vec![0; 1024],
455 mem_recorder: None,
456 is_delta_in_pack: false,
457 };
458 println!("a.heap_size() = {}", a.heap_size());
459
460 let b = CacheObject {
461 info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash_b),
462 offset: 0,
463 data_decompressed: vec![0; (1024.0 * 1.5) as usize],
464 mem_recorder: None,
465 is_delta_in_pack: false,
466 };
467 {
468 let r = cache.insert(
469 hash_a.to_string(),
470 ArcWrapper::new(Arc::new(a.clone()), Arc::new(AtomicBool::new(true)), None),
471 );
472 assert!(r.is_ok())
473 }
474 {
475 let r = cache.try_insert(
476 hash_b.to_string(),
477 ArcWrapper::new(Arc::new(b.clone()), Arc::new(AtomicBool::new(true)), None),
478 );
479 assert!(r.is_err());
480 if let Err(lru_mem::TryInsertError::WouldEjectLru { .. }) = r {
481 } else {
483 panic!("Expected WouldEjectLru error");
484 }
485 let r = cache.insert(
487 hash_b.to_string(),
488 ArcWrapper::new(Arc::new(b.clone()), Arc::new(AtomicBool::new(true)), None),
489 );
490 assert!(r.is_ok());
491 }
492 {
493 let r = cache.get(&hash_a.to_string());
495 assert!(r.is_none());
496 }
497 }
498 #[test]
499 fn test_cache_object_with_lru_sha256() {
500 let _guard = set_hash_kind_for_test(HashKind::Sha256);
501 let mut cache = LruCache::new(4096);
502
503 let hash_a = ObjectHash::default();
504 let hash_b = ObjectHash::new(b"b"); let a = CacheObject {
506 info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash_a),
507 offset: 0,
508 data_decompressed: vec![0; 2048],
509 mem_recorder: None,
510 is_delta_in_pack: false,
511 };
512 println!("a.heap_size() = {}", a.heap_size());
513
514 let b = CacheObject {
515 info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash_b),
516 offset: 0,
517 data_decompressed: vec![0; 3072],
518 mem_recorder: None,
519 is_delta_in_pack: false,
520 };
521 {
522 let r = cache.insert(
523 hash_a.to_string(),
524 ArcWrapper::new(Arc::new(a.clone()), Arc::new(AtomicBool::new(true)), None),
525 );
526 assert!(r.is_ok())
527 }
528 {
529 let r = cache.try_insert(
530 hash_b.to_string(),
531 ArcWrapper::new(Arc::new(b.clone()), Arc::new(AtomicBool::new(true)), None),
532 );
533 assert!(r.is_err());
534 if let Err(lru_mem::TryInsertError::WouldEjectLru { .. }) = r {
535 } else {
537 panic!("Expected WouldEjectLru error");
538 }
539 let r = cache.insert(
541 hash_b.to_string(),
542 ArcWrapper::new(Arc::new(b.clone()), Arc::new(AtomicBool::new(true)), None),
543 );
544 assert!(r.is_ok());
545 }
546 {
547 let r = cache.get(&hash_a.to_string());
549 assert!(r.is_none());
550 }
551 }
552
553 #[derive(Serialize, Deserialize)]
554 struct Test {
555 a: usize,
556 }
557 impl Drop for Test {
558 fn drop(&mut self) {
559 println!("drop Test");
560 }
561 }
562 impl HeapSize for Test {
563 fn heap_size(&self) -> usize {
564 self.a
565 }
566 }
567 #[test]
568 fn test_lru_drop() {
569 println!("insert a");
570 let cache = LruCache::new(2048);
571 let cache = Arc::new(Mutex::new(cache));
572 {
573 let mut c = cache.as_ref().lock().unwrap();
574 let _ = c.insert(
575 "a",
576 ArcWrapper::new(
577 Arc::new(Test { a: 1024 }),
578 Arc::new(AtomicBool::new(true)),
579 None,
580 ),
581 );
582 }
583 println!("insert b, a should be ejected");
584 {
585 let mut c = cache.as_ref().lock().unwrap();
586 let _ = c.insert(
587 "b",
588 ArcWrapper::new(
589 Arc::new(Test { a: 1200 }),
590 Arc::new(AtomicBool::new(true)),
591 None,
592 ),
593 );
594 }
595 let b = {
596 let mut c = cache.as_ref().lock().unwrap();
597 c.get("b").cloned()
598 };
599 println!("insert c, b should not be ejected");
600 {
601 let mut c = cache.as_ref().lock().unwrap();
602 let _ = c.insert(
603 "c",
604 ArcWrapper::new(
605 Arc::new(Test { a: 1200 }),
606 Arc::new(AtomicBool::new(true)),
607 None,
608 ),
609 );
610 }
611 println!("user b: {}", b.as_ref().unwrap().a);
612 println!("test over, enject all");
613 }
614
615 #[test]
616 fn test_cache_object_serialize() {
617 let _guard = set_hash_kind_for_test(HashKind::Sha1);
618 let a = CacheObject {
619 info: CacheObjectInfo::BaseObject(ObjectType::Blob, ObjectHash::default()),
620 offset: 0,
621 data_decompressed: vec![0; 1024],
622 mem_recorder: None,
623 is_delta_in_pack: false,
624 };
625 let s = bincode::serde::encode_to_vec(&a, bincode::config::standard()).unwrap();
626 let b: CacheObject = bincode::serde::decode_from_slice(&s, bincode::config::standard())
627 .unwrap()
628 .0;
629 assert_eq!(a.info, b.info);
630 assert_eq!(a.data_decompressed, b.data_decompressed);
631 assert_eq!(a.offset, b.offset);
632 }
633 #[test]
634 fn test_cache_object_serialize_sha256() {
635 let _guard = set_hash_kind_for_test(HashKind::Sha256);
636 let a = CacheObject {
637 info: CacheObjectInfo::BaseObject(ObjectType::Blob, ObjectHash::default()),
638 offset: 0,
639 data_decompressed: vec![0; 2048],
640 mem_recorder: None,
641 is_delta_in_pack: false,
642 };
643 let s = bincode::serde::encode_to_vec(&a, bincode::config::standard()).unwrap();
644 let b: CacheObject = bincode::serde::decode_from_slice(&s, bincode::config::standard())
645 .unwrap()
646 .0;
647 assert_eq!(a.info, b.info);
648 assert_eq!(a.data_decompressed, b.data_decompressed);
649 assert_eq!(a.offset, b.offset);
650 }
651
652 #[test]
653 fn test_arc_wrapper_drop_store() {
654 let mut path = PathBuf::from(".cache_temp/test_arc_wrapper_drop_store");
655 fs::create_dir_all(&path).unwrap();
656 path.push("test_obj");
657 let mut a = ArcWrapper::new(Arc::new(1024), Arc::new(AtomicBool::new(false)), None);
658 a.set_store_path(path.clone());
659 drop(a);
660
661 assert!(path.exists());
662 path.pop();
663 fs::remove_dir_all(path).unwrap();
664 }
665
666 #[test]
667 fn test_arc_wrapper_with_lru() {
669 let mut cache = LruCache::new(1500);
670 let path = PathBuf::from(".cache_temp/test_arc_wrapper_with_lru");
671 let _ = fs::remove_dir_all(&path);
672 fs::create_dir_all(&path).unwrap();
673 let shared_flag = Arc::new(AtomicBool::new(false));
674
675 let a_path = path.join("a");
677 {
678 let mut a = ArcWrapper::new(Arc::new(Test { a: 1024 }), shared_flag.clone(), None);
679 a.set_store_path(a_path.clone());
680 let b = ArcWrapper::new(Arc::new(1024), shared_flag.clone(), None);
681 assert!(b.store_path.is_none());
682
683 println!("insert a with heap size: {:?}", a.heap_size());
684 let rt = cache.insert("a", a);
685 if let Err(e) = rt {
686 panic!("{}", format!("insert a failed: {:?}", e.to_string()));
687 }
688 println!("after insert a, cache used = {}", cache.current_size());
689 }
690 assert!(!a_path.exists());
691
692 let b_path = path.join("b");
693 {
695 let mut b = ArcWrapper::new(Arc::new(Test { a: 996 }), shared_flag.clone(), None);
696 b.set_store_path(b_path.clone());
697 let rt = cache.insert("b", b);
698 if let Err(e) = rt {
699 panic!("{}", format!("insert a failed: {:?}", e.to_string()));
700 }
701 println!("after insert b, cache used = {}", cache.current_size());
702 }
703 assert!(a_path.exists());
704 assert!(!b_path.exists());
705 shared_flag.store(true, Ordering::Release);
706 fs::remove_dir_all(path).unwrap();
707 }
709}