git_internal/internal/pack/
cache_object.rs

1//! Cache object representation plus disk-backed serialization helpers used by the pack decoder to
2//! bound memory while still serving delta reconstruction quickly.
3
4use std::{
5    fs,
6    fs::OpenOptions,
7    io,
8    io::Write,
9    ops::Deref,
10    path::{Path, PathBuf},
11    sync::{
12        Arc,
13        atomic::{AtomicBool, AtomicUsize, Ordering},
14    },
15};
16
17use lru_mem::{HeapSize, MemSize};
18use serde::{Deserialize, Serialize};
19use threadpool::ThreadPool;
20
21use crate::{
22    hash::ObjectHash,
23    internal::{
24        metadata::{EntryMeta, MetaAttached},
25        object::types::ObjectType,
26        pack::{entry::Entry, utils},
27    },
28};
29
30// static CACHE_OBJS_MEM_SIZE: AtomicUsize = AtomicUsize::new(0);
31
32/// file load&store trait
33pub trait FileLoadStore: Serialize + for<'a> Deserialize<'a> {
34    fn f_load(path: &Path) -> Result<Self, io::Error>;
35    fn f_save(&self, path: &Path) -> Result<(), io::Error>;
36}
37
38// trait alias, so that impl FileLoadStore == impl Serialize + Deserialize
39impl<T: Serialize + for<'a> Deserialize<'a>> FileLoadStore for T {
40    /// load object from file
41    fn f_load(path: &Path) -> Result<T, io::Error> {
42        let data = fs::read(path)?;
43        let obj: T = bincode::serde::decode_from_slice(&data, bincode::config::standard())
44            .map_err(io::Error::other)?
45            .0;
46        Ok(obj)
47    }
48    fn f_save(&self, path: &Path) -> Result<(), io::Error> {
49        if path.exists() {
50            return Ok(());
51        }
52        let data = bincode::serde::encode_to_vec(self, bincode::config::standard()).unwrap();
53        let path = path.with_extension("temp");
54        {
55            let mut file = OpenOptions::new()
56                .write(true)
57                .create_new(true)
58                .open(path.clone())?;
59            file.write_all(&data)?;
60        }
61        let final_path = path.with_extension("");
62        fs::rename(&path, final_path.clone())?;
63        Ok(())
64    }
65}
66
67/// Represents the metadata of a cache object, indicating whether it is a delta or not.
68#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
69pub(crate) enum CacheObjectInfo {
70    /// The object is one of the four basic types:
71    /// [`ObjectType::Blob`], [`ObjectType::Tree`], [`ObjectType::Commit`], or [`ObjectType::Tag`].
72    /// The metadata contains the [`ObjectType`] and the [`ObjectHash`] hash of the object.
73    BaseObject(ObjectType, ObjectHash),
74    /// The object is an offset delta with a specified offset delta [`usize`],
75    /// and the size of the expanded object (previously `delta_final_size`).
76    OffsetDelta(usize, usize),
77    /// Similar to [`OffsetDelta`], but delta algorithm is `zstd`.
78    OffsetZstdelta(usize, usize),
79    /// The object is a hash delta with a specified [`ObjectHash`] hash,
80    /// and the size of the expanded object (previously `delta_final_size`).
81    HashDelta(ObjectHash, usize),
82}
83
84impl CacheObjectInfo {
85    /// Get the [`ObjectType`] of the object.
86    pub(crate) fn object_type(&self) -> ObjectType {
87        match self {
88            CacheObjectInfo::BaseObject(obj_type, _) => *obj_type,
89            CacheObjectInfo::OffsetDelta(_, _) => ObjectType::OffsetDelta,
90            CacheObjectInfo::OffsetZstdelta(_, _) => ObjectType::OffsetZstdelta,
91            CacheObjectInfo::HashDelta(_, _) => ObjectType::HashDelta,
92        }
93    }
94}
95
96/// Represents a cached object in memory, which may be a delta or a base object.
97#[derive(Debug, Serialize, Deserialize)]
98pub struct CacheObject {
99    pub(crate) info: CacheObjectInfo,
100    pub offset: usize,
101    pub crc32: u32,
102    pub data_decompressed: Vec<u8>,
103    pub mem_recorder: Option<Arc<AtomicUsize>>, // record mem-size of all CacheObjects of a Pack
104    pub is_delta_in_pack: bool,
105}
106
107impl Clone for CacheObject {
108    fn clone(&self) -> Self {
109        let obj = CacheObject {
110            info: self.info.clone(),
111            offset: self.offset,
112            crc32: self.crc32,
113            data_decompressed: self.data_decompressed.clone(),
114            mem_recorder: self.mem_recorder.clone(),
115            is_delta_in_pack: self.is_delta_in_pack,
116        };
117        obj.record_mem_size();
118        obj
119    }
120}
121
122// ! used by lru_mem to calculate the size of the object, limit the memory usage.
123// ! the implementation of HeapSize is not accurate, only calculate the size of the data_decompress
124// Note that: mem_size == value_size + heap_size, and we only need to impl HeapSize because value_size is known
125impl HeapSize for CacheObject {
126    /// If a [`CacheObject`] is [`ObjectType::HashDelta`] or [`ObjectType::OffsetDelta`],
127    /// it will expand to another [`CacheObject`] of other types. To prevent potential OOM,
128    /// we record the size of the expanded object as well as that of the object itself.
129    ///
130    /// Base objects, *i.e.*, [`ObjectType::Blob`], [`ObjectType::Tree`], [`ObjectType::Commit`],
131    /// and [`ObjectType::Tag`], will not be expanded, so the heap-size of the object is the same
132    /// as the size of the data.
133    ///
134    /// See [Comment in PR #755](https://github.com/web3infra-foundation/mega/pull/755#issuecomment-2543100481) for more details.
135    fn heap_size(&self) -> usize {
136        match &self.info {
137            CacheObjectInfo::BaseObject(_, _) => self.data_decompressed.heap_size(),
138            CacheObjectInfo::OffsetDelta(_, delta_final_size)
139            | CacheObjectInfo::OffsetZstdelta(_, delta_final_size)
140            | CacheObjectInfo::HashDelta(_, delta_final_size) => {
141                // To those who are concerned about why these two values are added,
142                // let's consider the lifetime of two `CacheObject`s, say `delta_obj`
143                // and `final_obj` in the function `Pack::rebuild_delta`.
144                //
145                // `delta_obj` is dropped only after `Pack::rebuild_delta` returns,
146                // but the space for `final_obj` is allocated in that function.
147                //
148                // Therefore, during the execution of `Pack::rebuild_delta`, both `delta_obj`
149                // and `final_obj` coexist. The maximum memory usage is the sum of the memory
150                // usage of `delta_obj` and `final_obj`.
151                self.data_decompressed.heap_size() + delta_final_size
152            }
153        }
154    }
155}
156
157impl Drop for CacheObject {
158    // Check: the heap-size subtracted when Drop is equal to the heap-size recorded
159    // (cannot change the heap-size during life cycle)
160    fn drop(&mut self) {
161        // (&*self).heap_size() != self.heap_size()
162        if let Some(mem_recorder) = &self.mem_recorder {
163            mem_recorder.fetch_sub((*self).mem_size(), Ordering::Release);
164        }
165    }
166}
167
168/// Heap-size recorder for a class(struct)
169/// <br> You should use a static Var to record mem-size
170/// and record mem-size after construction & minus it in `drop()`
171/// <br> So, variable-size fields in object should NOT be modified to keep heap-size stable.
172/// <br> Or, you can record the initial mem-size in this object
173/// <br> Or, update it (not impl)
174pub trait MemSizeRecorder: MemSize {
175    fn record_mem_size(&self);
176    fn set_mem_recorder(&mut self, mem_size: Arc<AtomicUsize>);
177    // fn get_mem_size() -> usize;
178}
179
180impl MemSizeRecorder for CacheObject {
181    /// record the mem-size of this `CacheObj` in a `static` `var`
182    /// <br> since that, DO NOT modify `CacheObj` after recording
183    fn record_mem_size(&self) {
184        if let Some(mem_recorder) = &self.mem_recorder {
185            mem_recorder.fetch_add(self.mem_size(), Ordering::Release);
186        }
187    }
188
189    fn set_mem_recorder(&mut self, mem_recorder: Arc<AtomicUsize>) {
190        self.mem_recorder = Some(mem_recorder);
191    }
192
193    // fn get_mem_size() -> usize {
194    //     CACHE_OBJS_MEM_SIZE.load(Ordering::Acquire)
195    // }
196}
197
198impl CacheObject {
199    /// Create a new CacheObject which is neither [`ObjectType::OffsetDelta`] nor [`ObjectType::HashDelta`].
200    pub fn new_for_undeltified(
201        obj_type: ObjectType,
202        data: Vec<u8>,
203        offset: usize,
204        crc32: u32,
205    ) -> Self {
206        let hash = utils::calculate_object_hash(obj_type, &data);
207        CacheObject {
208            info: CacheObjectInfo::BaseObject(obj_type, hash),
209            offset,
210            crc32,
211            data_decompressed: data,
212            mem_recorder: None,
213            is_delta_in_pack: false,
214        }
215    }
216
217    /// Get the [`ObjectType`] of the object.
218    pub fn object_type(&self) -> ObjectType {
219        self.info.object_type()
220    }
221
222    /// Get the [`ObjectHash`] hash of the object.
223    ///
224    /// If the object is a delta object, return [`None`].
225    pub fn base_object_hash(&self) -> Option<ObjectHash> {
226        match &self.info {
227            CacheObjectInfo::BaseObject(_, hash) => Some(*hash),
228            _ => None,
229        }
230    }
231
232    /// Get the offset delta of the object.
233    ///
234    /// If the object is not an offset delta, return [`None`].
235    pub fn offset_delta(&self) -> Option<usize> {
236        match &self.info {
237            CacheObjectInfo::OffsetDelta(offset, _) => Some(*offset),
238            _ => None,
239        }
240    }
241
242    /// Get the hash delta of the object.
243    ///
244    /// If the object is not a hash delta, return [`None`].
245    pub fn hash_delta(&self) -> Option<ObjectHash> {
246        match &self.info {
247            CacheObjectInfo::HashDelta(hash, _) => Some(*hash),
248            _ => None,
249        }
250    }
251
252    /// transform the CacheObject to Entry
253    pub fn to_entry(&self) -> Entry {
254        match self.info {
255            CacheObjectInfo::BaseObject(obj_type, hash) => Entry {
256                obj_type,
257                data: self.data_decompressed.clone(),
258                hash,
259                chain_len: 0,
260            },
261            _ => {
262                unreachable!("delta object should not persist!")
263            }
264        }
265    }
266
267    /// transform the CacheObject to MetaAttached<Entry, EntryMeta>
268    pub fn to_entry_metadata(&self) -> MetaAttached<Entry, EntryMeta> {
269        match self.info {
270            CacheObjectInfo::BaseObject(obj_type, hash) => {
271                let entry = Entry {
272                    obj_type,
273                    data: self.data_decompressed.clone(),
274                    hash,
275                    chain_len: 0,
276                };
277                let meta = EntryMeta {
278                    // pack_id:Some(pack_id),
279                    pack_offset: Some(self.offset),
280                    crc32: Some(self.crc32),
281                    is_delta: Some(self.is_delta_in_pack),
282                    ..Default::default()
283                };
284                MetaAttached { inner: entry, meta }
285            }
286
287            _ => {
288                unreachable!("delta object should not persist!")
289            }
290        }
291    }
292}
293
294/// trait alias for simple use
295pub trait ArcWrapperBounds:
296    HeapSize + Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static
297{
298}
299// You must impl `Alias Trait` for all the `T` satisfying Constraints
300// Or, `T` will not satisfy `Alias Trait` even if it satisfies the Original traits
301impl<T: HeapSize + Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static> ArcWrapperBounds
302    for T
303{
304}
305
306/// Implementing encapsulation of Arc to enable third-party Trait HeapSize implementation for the Arc type
307/// Because of use Arc in LruCache, the LruCache is not clear whether a pointer will drop the referenced
308/// content when it is ejected from the cache, the actual memory usage is not accurate
309pub struct ArcWrapper<T: ArcWrapperBounds> {
310    pub data: Arc<T>,
311    complete_signal: Arc<AtomicBool>,
312    pool: Option<Arc<ThreadPool>>,
313    pub store_path: Option<PathBuf>, // path to store when drop
314}
315impl<T: ArcWrapperBounds> ArcWrapper<T> {
316    /// Create a new ArcWrapper
317    pub fn new(data: Arc<T>, share_flag: Arc<AtomicBool>, pool: Option<Arc<ThreadPool>>) -> Self {
318        ArcWrapper {
319            data,
320            complete_signal: share_flag,
321            pool,
322            store_path: None,
323        }
324    }
325    /// Sets the file path where this object will be persisted when evicted from cache.
326    pub fn set_store_path(&mut self, path: PathBuf) {
327        self.store_path = Some(path);
328    }
329}
330
331impl<T: ArcWrapperBounds> HeapSize for ArcWrapper<T> {
332    fn heap_size(&self) -> usize {
333        self.data.heap_size()
334    }
335}
336
337impl<T: ArcWrapperBounds> Clone for ArcWrapper<T> {
338    /// clone won't clone the store_path
339    fn clone(&self) -> Self {
340        ArcWrapper {
341            data: self.data.clone(),
342            complete_signal: self.complete_signal.clone(),
343            pool: self.pool.clone(),
344            store_path: None,
345        }
346    }
347}
348
349impl<T: ArcWrapperBounds> Deref for ArcWrapper<T> {
350    type Target = Arc<T>;
351    fn deref(&self) -> &Self::Target {
352        &self.data
353    }
354}
355impl<T: ArcWrapperBounds> Drop for ArcWrapper<T> {
356    // `drop` will be called in `lru_cache.insert()` when cache full & eject the LRU
357    // `lru_cache.insert()` is protected by Mutex
358    fn drop(&mut self) {
359        if !self.complete_signal.load(Ordering::Acquire)
360            && let Some(path) = &self.store_path
361        {
362            match &self.pool {
363                Some(pool) => {
364                    let data_copy = self.data.clone();
365                    let path_copy = path.clone();
366                    let complete_signal = self.complete_signal.clone();
367                    // block entire process, wait for IO, Control Memory
368                    // queue size will influence the Memory usage
369                    while pool.queued_count() > 2000 {
370                        std::thread::yield_now();
371                    }
372                    pool.execute(move || {
373                        if !complete_signal.load(Ordering::Acquire) {
374                            let res = data_copy.f_save(&path_copy);
375                            if let Err(e) = res {
376                                println!("[f_save] {path_copy:?} error: {e:?}");
377                            }
378                        }
379                    });
380                }
381                None => {
382                    let res = self.data.f_save(path);
383                    if let Err(e) = res {
384                        println!("[f_save] {path:?} error: {e:?}");
385                    }
386                }
387            }
388        }
389    }
390}
391#[cfg(test)]
392mod test {
393    use std::{fs, sync::Mutex};
394
395    use lru_mem::LruCache;
396
397    use super::*;
398    use crate::hash::{HashKind, set_hash_kind_for_test};
399
400    /// Helper to build a base CacheObject with the given size.
401    fn make_obj(size: usize) -> CacheObject {
402        CacheObject {
403            info: CacheObjectInfo::BaseObject(ObjectType::Blob, ObjectHash::default()),
404            offset: 0,
405            crc32: 0,
406            data_decompressed: vec![0; size],
407            mem_recorder: None,
408            is_delta_in_pack: false,
409        }
410    }
411
412    /// Test that the memory size recording works correctly for CacheObject.
413    #[test]
414    fn test_heap_size_record() {
415        for (kind, size) in [(HashKind::Sha1, 1024usize), (HashKind::Sha256, 2048usize)] {
416            let _guard = set_hash_kind_for_test(kind);
417            let mut obj = make_obj(size);
418            let mem = Arc::new(AtomicUsize::default());
419            assert_eq!(mem.load(Ordering::Relaxed), 0);
420            obj.set_mem_recorder(mem.clone());
421            obj.record_mem_size();
422            assert_eq!(mem.load(Ordering::Relaxed), obj.mem_size());
423            drop(obj);
424            assert_eq!(mem.load(Ordering::Relaxed), 0);
425        }
426    }
427
428    /// Test that the heap size of CacheObject and ArcWrapper are the same.
429    #[test]
430    fn test_cache_object_with_same_size() {
431        for (kind, size) in [(HashKind::Sha1, 1024usize), (HashKind::Sha256, 2048usize)] {
432            let _guard = set_hash_kind_for_test(kind);
433            let a = make_obj(size);
434            assert_eq!(a.heap_size(), size);
435
436            let b = ArcWrapper::new(Arc::new(a.clone()), Arc::new(AtomicBool::new(false)), None);
437            assert_eq!(b.heap_size(), size);
438        }
439    }
440
441    /// Test that the LRU cache correctly ejects the least recently used object when capacity is exceeded.
442    #[test]
443    fn test_cache_object_with_lru() {
444        for (kind, cap, size_a, size_b) in [
445            (
446                HashKind::Sha1,
447                2048usize,
448                1024usize,
449                (1024.0 * 1.5) as usize,
450            ),
451            (
452                HashKind::Sha256,
453                4096usize,
454                2048usize,
455                (2048.0 * 1.5) as usize,
456            ),
457        ] {
458            let _guard = set_hash_kind_for_test(kind);
459            let mut cache = LruCache::new(cap);
460
461            let hash_a = ObjectHash::default();
462            let hash_b = ObjectHash::new(b"b"); // whatever different hash
463            let a = make_obj(size_a);
464            let b = make_obj(size_b);
465
466            {
467                let r = cache.insert(
468                    hash_a.to_string(),
469                    ArcWrapper::new(Arc::new(a.clone()), Arc::new(AtomicBool::new(true)), None),
470                );
471                assert!(r.is_ok());
472            }
473
474            {
475                let r = cache.try_insert(
476                    hash_b.to_string(),
477                    ArcWrapper::new(Arc::new(b.clone()), Arc::new(AtomicBool::new(true)), None),
478                );
479                assert!(r.is_err());
480                if let Err(lru_mem::TryInsertError::WouldEjectLru { .. }) = r {
481                    // expected
482                } else {
483                    panic!("Expected WouldEjectLru error");
484                }
485
486                let r = cache.insert(
487                    hash_b.to_string(),
488                    ArcWrapper::new(Arc::new(b.clone()), Arc::new(AtomicBool::new(true)), None),
489                );
490                assert!(r.is_ok());
491            }
492
493            {
494                let r = cache.get(&hash_a.to_string());
495                assert!(r.is_none());
496            }
497        }
498    }
499
500    /// test that the Drop trait is called when an object is ejected from the LRU cache
501    #[derive(Serialize, Deserialize)]
502    struct Test {
503        a: usize,
504    }
505    impl Drop for Test {
506        fn drop(&mut self) {
507            println!("drop Test");
508        }
509    }
510    impl HeapSize for Test {
511        fn heap_size(&self) -> usize {
512            self.a
513        }
514    }
515    #[test]
516    fn test_lru_drop() {
517        println!("insert a");
518        let cache = LruCache::new(2048);
519        let cache = Arc::new(Mutex::new(cache));
520        {
521            let mut c = cache.as_ref().lock().unwrap();
522            let _ = c.insert(
523                "a",
524                ArcWrapper::new(
525                    Arc::new(Test { a: 1024 }),
526                    Arc::new(AtomicBool::new(true)),
527                    None,
528                ),
529            );
530        }
531        println!("insert b, a should be ejected");
532        {
533            let mut c = cache.as_ref().lock().unwrap();
534            let _ = c.insert(
535                "b",
536                ArcWrapper::new(
537                    Arc::new(Test { a: 1200 }),
538                    Arc::new(AtomicBool::new(true)),
539                    None,
540                ),
541            );
542        }
543        let b = {
544            let mut c = cache.as_ref().lock().unwrap();
545            c.get("b").cloned()
546        };
547        println!("insert c, b should not be ejected");
548        {
549            let mut c = cache.as_ref().lock().unwrap();
550            let _ = c.insert(
551                "c",
552                ArcWrapper::new(
553                    Arc::new(Test { a: 1200 }),
554                    Arc::new(AtomicBool::new(true)),
555                    None,
556                ),
557            );
558        }
559        println!("user b: {}", b.as_ref().unwrap().a);
560        println!("test over, enject all");
561    }
562
563    #[test]
564    fn test_cache_object_serialize() {
565        for (kind, size) in [(HashKind::Sha1, 1024usize), (HashKind::Sha256, 2048usize)] {
566            let _guard = set_hash_kind_for_test(kind);
567            let a = make_obj(size);
568            let s = bincode::serde::encode_to_vec(&a, bincode::config::standard()).unwrap();
569            let b: CacheObject = bincode::serde::decode_from_slice(&s, bincode::config::standard())
570                .unwrap()
571                .0;
572            assert_eq!(a.info, b.info);
573            assert_eq!(a.data_decompressed, b.data_decompressed);
574            assert_eq!(a.offset, b.offset);
575        }
576    }
577
578    #[test]
579    fn test_arc_wrapper_drop_store() {
580        let mut path = PathBuf::from(".cache_temp/test_arc_wrapper_drop_store");
581        fs::create_dir_all(&path).unwrap();
582        path.push("test_obj");
583        let mut a = ArcWrapper::new(Arc::new(1024), Arc::new(AtomicBool::new(false)), None);
584        a.set_store_path(path.clone());
585        drop(a);
586
587        assert!(path.exists());
588        path.pop();
589        fs::remove_dir_all(path).unwrap();
590    }
591
592    #[test]
593    /// test warpper can't correctly store the data when lru eject it
594    fn test_arc_wrapper_with_lru() {
595        let mut cache = LruCache::new(1500);
596        let path = PathBuf::from(".cache_temp/test_arc_wrapper_with_lru");
597        let _ = fs::remove_dir_all(&path);
598        fs::create_dir_all(&path).unwrap();
599        let shared_flag = Arc::new(AtomicBool::new(false));
600
601        // insert a, a not ejected
602        let a_path = path.join("a");
603        {
604            let mut a = ArcWrapper::new(Arc::new(Test { a: 1024 }), shared_flag.clone(), None);
605            a.set_store_path(a_path.clone());
606            let b = ArcWrapper::new(Arc::new(1024), shared_flag.clone(), None);
607            assert!(b.store_path.is_none());
608
609            println!("insert a with heap size: {:?}", a.heap_size());
610            let rt = cache.insert("a", a);
611            if let Err(e) = rt {
612                panic!("{}", format!("insert a failed: {:?}", e.to_string()));
613            }
614            println!("after insert a, cache used = {}", cache.current_size());
615        }
616        assert!(!a_path.exists());
617
618        let b_path = path.join("b");
619        // insert b, a should be ejected
620        {
621            let mut b = ArcWrapper::new(Arc::new(Test { a: 996 }), shared_flag.clone(), None);
622            b.set_store_path(b_path.clone());
623            let rt = cache.insert("b", b);
624            if let Err(e) = rt {
625                panic!("{}", format!("insert a failed: {:?}", e.to_string()));
626            }
627            println!("after insert b, cache used = {}", cache.current_size());
628        }
629        assert!(a_path.exists());
630        assert!(!b_path.exists());
631        shared_flag.store(true, Ordering::Release);
632        fs::remove_dir_all(path).unwrap();
633        // should pass even b's path not exists
634    }
635}