git_internal/internal/pack/
cache_object.rs

1use std::fs::OpenOptions;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::{fs, io};
6use std::{ops::Deref, sync::Arc};
7
8use lru_mem::{HeapSize, MemSize};
9use serde::{Deserialize, Serialize};
10use threadpool::ThreadPool;
11
12use crate::internal::pack::entry::Entry;
13use crate::internal::pack::utils;
14use crate::{hash::SHA1, internal::object::types::ObjectType};
15
16// /// record heap-size of all CacheObjects, used for memory limit.
17// static CACHE_OBJS_MEM_SIZE: AtomicUsize = AtomicUsize::new(0);
18
19/// file load&store trait
20pub trait FileLoadStore: Serialize + for<'a> Deserialize<'a> {
21    fn f_load(path: &Path) -> Result<Self, io::Error>;
22    fn f_save(&self, path: &Path) -> Result<(), io::Error>;
23}
24
25// trait alias, so that impl FileLoadStore == impl Serialize + Deserialize
26impl<T: Serialize + for<'a> Deserialize<'a>> FileLoadStore for T {
27    fn f_load(path: &Path) -> Result<T, io::Error> {
28        let data = fs::read(path)?;
29        let obj: T = bincode::serde::decode_from_slice(&data, bincode::config::standard())
30            .map_err(io::Error::other)?
31            .0;
32        Ok(obj)
33    }
34    fn f_save(&self, path: &Path) -> Result<(), io::Error> {
35        if path.exists() {
36            return Ok(());
37        }
38        let data = bincode::serde::encode_to_vec(self, bincode::config::standard()).unwrap();
39        let path = path.with_extension("temp");
40        {
41            let mut file = OpenOptions::new()
42                .write(true)
43                .create_new(true)
44                .open(path.clone())?;
45            file.write_all(&data)?;
46        }
47        let final_path = path.with_extension("");
48        fs::rename(&path, final_path.clone())?;
49        Ok(())
50    }
51}
52
53/// Represents the metadata of a cache object, indicating whether it is a delta or not.
54#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
55pub(crate) enum CacheObjectInfo {
56    /// The object is one of the four basic types:
57    /// [`ObjectType::Blob`], [`ObjectType::Tree`], [`ObjectType::Commit`], or [`ObjectType::Tag`].
58    /// The metadata contains the [`ObjectType`] and the [`SHA1`] hash of the object.
59    BaseObject(ObjectType, SHA1),
60    /// The object is an offset delta with a specified offset delta [`usize`],
61    /// and the size of the expanded object (previously `delta_final_size`).
62    OffsetDelta(usize, usize),
63    /// Similar to [`OffsetDelta`], but delta algorithm is `zstd`.
64    OffsetZstdelta(usize, usize),
65    /// The object is a hash delta with a specified [`SHA1`] hash,
66    /// and the size of the expanded object (previously `delta_final_size`).
67    HashDelta(SHA1, usize),
68}
69
70impl CacheObjectInfo {
71    /// Get the [`ObjectType`] of the object.
72    pub(crate) fn object_type(&self) -> ObjectType {
73        match self {
74            CacheObjectInfo::BaseObject(obj_type, _) => *obj_type,
75            CacheObjectInfo::OffsetDelta(_, _) => ObjectType::OffsetDelta,
76            CacheObjectInfo::OffsetZstdelta(_, _) => ObjectType::OffsetZstdelta,
77            CacheObjectInfo::HashDelta(_, _) => ObjectType::HashDelta,
78        }
79    }
80}
81
82#[derive(Debug, Serialize, Deserialize)]
83pub struct CacheObject {
84    pub(crate) info: CacheObjectInfo,
85    pub offset: usize,
86    pub data_decompressed: Vec<u8>,
87    pub mem_recorder: Option<Arc<AtomicUsize>>, // record mem-size of all CacheObjects of a Pack
88}
89
90impl Clone for CacheObject {
91    fn clone(&self) -> Self {
92        let obj = CacheObject {
93            info: self.info.clone(),
94            offset: self.offset,
95            data_decompressed: self.data_decompressed.clone(),
96            mem_recorder: self.mem_recorder.clone(),
97        };
98        obj.record_mem_size();
99        obj
100    }
101}
102
103// ! used by lru_mem to calculate the size of the object, limit the memory usage.
104// ! the implementation of HeapSize is not accurate, only calculate the size of the data_decompress
105// Note that: mem_size == value_size + heap_size, and we only need to impl HeapSize because value_size is known
106impl HeapSize for CacheObject {
107    /// If a [`CacheObject`] is [`ObjectType::HashDelta`] or [`ObjectType::OffsetDelta`],
108    /// it will expand to another [`CacheObject`] of other types. To prevent potential OOM,
109    /// we record the size of the expanded object as well as that of the object itself.
110    ///
111    /// Base objects, *i.e.*, [`ObjectType::Blob`], [`ObjectType::Tree`], [`ObjectType::Commit`],
112    /// and [`ObjectType::Tag`], will not be expanded, so the heap-size of the object is the same
113    /// as the size of the data.
114    ///
115    /// See [Comment in PR #755](https://github.com/web3infra-foundation/mega/pull/755#issuecomment-2543100481) for more details.
116    fn heap_size(&self) -> usize {
117        match &self.info {
118            CacheObjectInfo::BaseObject(_, _) => self.data_decompressed.heap_size(),
119            CacheObjectInfo::OffsetDelta(_, delta_final_size)
120            | CacheObjectInfo::OffsetZstdelta(_, delta_final_size)
121            | CacheObjectInfo::HashDelta(_, delta_final_size) => {
122                // To those who are concerned about why these two values are added,
123                // let's consider the lifetime of two `CacheObject`s, say `delta_obj`
124                // and `final_obj` in the function `Pack::rebuild_delta`.
125                //
126                // `delta_obj` is dropped only after `Pack::rebuild_delta` returns,
127                // but the space for `final_obj` is allocated in that function.
128                //
129                // Therefore, during the execution of `Pack::rebuild_delta`, both `delta_obj`
130                // and `final_obj` coexist. The maximum memory usage is the sum of the memory
131                // usage of `delta_obj` and `final_obj`.
132                self.data_decompressed.heap_size() + delta_final_size
133            }
134        }
135    }
136}
137
138impl Drop for CacheObject {
139    // Check: the heap-size subtracted when Drop is equal to the heap-size recorded
140    // (cannot change the heap-size during life cycle)
141    fn drop(&mut self) {
142        // (&*self).heap_size() != self.heap_size()
143        if let Some(mem_recorder) = &self.mem_recorder {
144            mem_recorder.fetch_sub((*self).mem_size(), Ordering::Release);
145        }
146    }
147}
148
149/// Heap-size recorder for a class(struct)
150/// <br> You should use a static Var to record mem-size
151/// and record mem-size after construction & minus it in `drop()`
152/// <br> So, variable-size fields in object should NOT be modified to keep heap-size stable.
153/// <br> Or, you can record the initial mem-size in this object
154/// <br> Or, update it (not impl)
155pub trait MemSizeRecorder: MemSize {
156    fn record_mem_size(&self);
157    fn set_mem_recorder(&mut self, mem_size: Arc<AtomicUsize>);
158    // fn get_mem_size() -> usize;
159}
160
161impl MemSizeRecorder for CacheObject {
162    /// record the mem-size of this `CacheObj` in a `static` `var`
163    /// <br> since that, DO NOT modify `CacheObj` after recording
164    fn record_mem_size(&self) {
165        if let Some(mem_recorder) = &self.mem_recorder {
166            mem_recorder.fetch_add(self.mem_size(), Ordering::Release);
167        }
168    }
169
170    fn set_mem_recorder(&mut self, mem_recorder: Arc<AtomicUsize>) {
171        self.mem_recorder = Some(mem_recorder);
172    }
173
174    // fn get_mem_size() -> usize {
175    //     CACHE_OBJS_MEM_SIZE.load(Ordering::Acquire)
176    // }
177}
178
179impl CacheObject {
180    /// Create a new CacheObject which is neither [`ObjectType::OffsetDelta`] nor [`ObjectType::HashDelta`].
181    pub fn new_for_undeltified(obj_type: ObjectType, data: Vec<u8>, offset: usize) -> Self {
182        let hash = utils::calculate_object_hash(obj_type, &data);
183        CacheObject {
184            info: CacheObjectInfo::BaseObject(obj_type, hash),
185            offset,
186            data_decompressed: data,
187            mem_recorder: None,
188        }
189    }
190
191    /// Get the [`ObjectType`] of the object.
192    pub fn object_type(&self) -> ObjectType {
193        self.info.object_type()
194    }
195
196    /// Get the [`SHA1`] hash of the object.
197    ///
198    /// If the object is a delta object, return [`None`].
199    pub fn base_object_hash(&self) -> Option<SHA1> {
200        match &self.info {
201            CacheObjectInfo::BaseObject(_, hash) => Some(*hash),
202            _ => None,
203        }
204    }
205
206    /// Get the offset delta of the object.
207    ///
208    /// If the object is not an offset delta, return [`None`].
209    pub fn offset_delta(&self) -> Option<usize> {
210        match &self.info {
211            CacheObjectInfo::OffsetDelta(offset, _) => Some(*offset),
212            _ => None,
213        }
214    }
215
216    /// Get the hash delta of the object.
217    ///
218    /// If the object is not a hash delta, return [`None`].
219    pub fn hash_delta(&self) -> Option<SHA1> {
220        match &self.info {
221            CacheObjectInfo::HashDelta(hash, _) => Some(*hash),
222            _ => None,
223        }
224    }
225
226    /// transform the CacheObject to Entry
227    pub fn to_entry(&self) -> Entry {
228        match self.info {
229            CacheObjectInfo::BaseObject(obj_type, hash) => Entry {
230                obj_type,
231                data: self.data_decompressed.clone(),
232                hash,
233                chain_len: 0,
234            },
235            _ => {
236                unreachable!("delta object should not persist!")
237            }
238        }
239    }
240}
241
242/// trait alias for simple use
243pub trait ArcWrapperBounds:
244    HeapSize + Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static
245{
246}
247// You must impl `Alias Trait` for all the `T` satisfying Constraints
248// Or, `T` will not satisfy `Alias Trait` even if it satisfies the Original traits
249impl<T: HeapSize + Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static> ArcWrapperBounds
250    for T
251{
252}
253
254/// Implementing encapsulation of Arc to enable third-party Trait HeapSize implementation for the Arc type
255/// Because of use Arc in LruCache, the LruCache is not clear whether a pointer will drop the referenced
256/// content when it is ejected from the cache, the actual memory usage is not accurate
257pub struct ArcWrapper<T: ArcWrapperBounds> {
258    pub data: Arc<T>,
259    complete_signal: Arc<AtomicBool>,
260    pool: Option<Arc<ThreadPool>>,
261    pub store_path: Option<PathBuf>, // path to store when drop
262}
263impl<T: ArcWrapperBounds> ArcWrapper<T> {
264    /// Create a new ArcWrapper
265    pub fn new(data: Arc<T>, share_flag: Arc<AtomicBool>, pool: Option<Arc<ThreadPool>>) -> Self {
266        ArcWrapper {
267            data,
268            complete_signal: share_flag,
269            pool,
270            store_path: None,
271        }
272    }
273    pub fn set_store_path(&mut self, path: PathBuf) {
274        self.store_path = Some(path);
275    }
276}
277
278impl<T: ArcWrapperBounds> HeapSize for ArcWrapper<T> {
279    fn heap_size(&self) -> usize {
280        self.data.heap_size()
281    }
282}
283
284impl<T: ArcWrapperBounds> Clone for ArcWrapper<T> {
285    /// clone won't clone the store_path
286    fn clone(&self) -> Self {
287        ArcWrapper {
288            data: self.data.clone(),
289            complete_signal: self.complete_signal.clone(),
290            pool: self.pool.clone(),
291            store_path: None,
292        }
293    }
294}
295
296impl<T: ArcWrapperBounds> Deref for ArcWrapper<T> {
297    type Target = Arc<T>;
298    fn deref(&self) -> &Self::Target {
299        &self.data
300    }
301}
302impl<T: ArcWrapperBounds> Drop for ArcWrapper<T> {
303    // `drop` will be called in `lru_cache.insert()` when cache full & eject the LRU
304    // `lru_cache.insert()` is protected by Mutex
305    fn drop(&mut self) {
306        if !self.complete_signal.load(Ordering::Acquire)
307            && let Some(path) = &self.store_path
308        {
309            match &self.pool {
310                Some(pool) => {
311                    let data_copy = self.data.clone();
312                    let path_copy = path.clone();
313                    let complete_signal = self.complete_signal.clone();
314                    // block entire process, wait for IO, Control Memory
315                    // queue size will influence the Memory usage
316                    while pool.queued_count() > 2000 {
317                        std::thread::yield_now();
318                    }
319                    pool.execute(move || {
320                        if !complete_signal.load(Ordering::Acquire) {
321                            let res = data_copy.f_save(&path_copy);
322                            if let Err(e) = res {
323                                println!("[f_save] {path_copy:?} error: {e:?}");
324                            }
325                        }
326                    });
327                }
328                None => {
329                    let res = self.data.f_save(path);
330                    if let Err(e) = res {
331                        println!("[f_save] {path:?} error: {e:?}");
332                    }
333                }
334            }
335        }
336    }
337}
338#[cfg(test)]
339mod test {
340    use std::{fs, sync::Mutex};
341
342    use lru_mem::LruCache;
343
344    use super::*;
345    
346    #[test]
347    fn test_heap_size_record() {
348        let mut obj = CacheObject {
349            info: CacheObjectInfo::BaseObject(ObjectType::Blob, SHA1::default()),
350            offset: 0,
351            data_decompressed: vec![0; 1024],
352            mem_recorder: None,
353        };
354        let mem = Arc::new(AtomicUsize::default());
355        assert_eq!(mem.load(Ordering::Relaxed), 0);
356        obj.set_mem_recorder(mem.clone());
357        obj.record_mem_size();
358        assert_eq!(mem.load(Ordering::Relaxed), obj.mem_size());
359        drop(obj);
360        assert_eq!(mem.load(Ordering::Relaxed), 0);
361    }
362
363    #[test]
364    fn test_cache_object_with_same_size() {
365        let a = CacheObject {
366            info: CacheObjectInfo::BaseObject(ObjectType::Blob, SHA1::default()),
367            offset: 0,
368            data_decompressed: vec![0; 1024],
369            mem_recorder: None,
370        };
371        assert!(a.heap_size() == 1024);
372
373        // let b = ArcWrapper(Arc::new(a.clone()));
374        let b = ArcWrapper::new(Arc::new(a.clone()), Arc::new(AtomicBool::new(false)), None);
375        assert!(b.heap_size() == 1024);
376    }
377    
378    #[test]
379    fn test_cache_object_with_lru() {
380        let mut cache = LruCache::new(2048);
381
382        let hash_a = SHA1::default();
383        let hash_b = SHA1::new(b"b"); // whatever different hash
384        let a = CacheObject {
385            info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash_a),
386            offset: 0,
387            data_decompressed: vec![0; 1024],
388            mem_recorder: None,
389        };
390        println!("a.heap_size() = {}", a.heap_size());
391
392        let b = CacheObject {
393            info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash_b),
394            offset: 0,
395            data_decompressed: vec![0; (1024.0 * 1.5) as usize],
396            mem_recorder: None,
397        };
398        {
399            let r = cache.insert(
400                hash_a.to_string(),
401                ArcWrapper::new(Arc::new(a.clone()), Arc::new(AtomicBool::new(true)), None),
402            );
403            assert!(r.is_ok())
404        }
405        {
406            let r = cache.try_insert(
407                hash_b.to_string(),
408                ArcWrapper::new(Arc::new(b.clone()), Arc::new(AtomicBool::new(true)), None),
409            );
410            assert!(r.is_err());
411            if let Err(lru_mem::TryInsertError::WouldEjectLru { .. }) = r {
412                // 匹配到指定错误,不需要额外操作
413            } else {
414                panic!("Expected WouldEjectLru error");
415            }
416            // 使用不同的键插入b,这样a会被驱逐
417            let r = cache.insert(
418                hash_b.to_string(),
419                ArcWrapper::new(Arc::new(b.clone()), Arc::new(AtomicBool::new(true)), None),
420            );
421            assert!(r.is_ok());
422        }
423        {
424            // a should be ejected
425            let r = cache.get(&hash_a.to_string());
426            assert!(r.is_none());
427        }
428    }
429
430    #[derive(Serialize, Deserialize)]
431    struct Test {
432        a: usize,
433    }
434    impl Drop for Test {
435        fn drop(&mut self) {
436            println!("drop Test");
437        }
438    }
439    impl HeapSize for Test {
440        fn heap_size(&self) -> usize {
441            self.a
442        }
443    }
444    #[test]
445    fn test_lru_drop() {
446        println!("insert a");
447        let cache = LruCache::new(2048);
448        let cache = Arc::new(Mutex::new(cache));
449        {
450            let mut c = cache.as_ref().lock().unwrap();
451            let _ = c.insert(
452                "a",
453                ArcWrapper::new(
454                    Arc::new(Test { a: 1024 }),
455                    Arc::new(AtomicBool::new(true)),
456                    None,
457                ),
458            );
459        }
460        println!("insert b, a should be ejected");
461        {
462            let mut c = cache.as_ref().lock().unwrap();
463            let _ = c.insert(
464                "b",
465                ArcWrapper::new(
466                    Arc::new(Test { a: 1200 }),
467                    Arc::new(AtomicBool::new(true)),
468                    None,
469                ),
470            );
471        }
472        let b = {
473            let mut c = cache.as_ref().lock().unwrap();
474            c.get("b").cloned()
475        };
476        println!("insert c, b should not be ejected");
477        {
478            let mut c = cache.as_ref().lock().unwrap();
479            let _ = c.insert(
480                "c",
481                ArcWrapper::new(
482                    Arc::new(Test { a: 1200 }),
483                    Arc::new(AtomicBool::new(true)),
484                    None,
485                ),
486            );
487        }
488        println!("user b: {}", b.as_ref().unwrap().a);
489        println!("test over, enject all");
490    }
491
492    #[test]
493    fn test_cache_object_serialize() {
494        let a = CacheObject {
495            info: CacheObjectInfo::BaseObject(ObjectType::Blob, SHA1::default()),
496            offset: 0,
497            data_decompressed: vec![0; 1024],
498            mem_recorder: None,
499        };
500        let s = bincode::serde::encode_to_vec(&a, bincode::config::standard()).unwrap();
501        let b: CacheObject = bincode::serde::decode_from_slice(&s, bincode::config::standard())
502            .unwrap()
503            .0;
504        assert_eq!(a.info, b.info);
505        assert_eq!(a.data_decompressed, b.data_decompressed);
506        assert_eq!(a.offset, b.offset);
507    }
508
509    #[test]
510    fn test_arc_wrapper_drop_store() {
511        let mut path = PathBuf::from(".cache_temp/test_arc_wrapper_drop_store");
512        fs::create_dir_all(&path).unwrap();
513        path.push("test_obj");
514        let mut a = ArcWrapper::new(Arc::new(1024), Arc::new(AtomicBool::new(false)), None);
515        a.set_store_path(path.clone());
516        drop(a);
517
518        assert!(path.exists());
519        path.pop();
520        fs::remove_dir_all(path).unwrap();
521    }
522
523    #[test]
524    /// test warpper can't correctly store the data when lru eject it
525    fn test_arc_wrapper_with_lru() {
526        let mut cache = LruCache::new(1500);
527        let path = PathBuf::from(".cache_temp/test_arc_wrapper_with_lru");
528        let _ = fs::remove_dir_all(&path);
529        fs::create_dir_all(&path).unwrap();
530        let shared_flag = Arc::new(AtomicBool::new(false));
531
532        // insert a, a not ejected
533        let a_path = path.join("a");
534        {
535            let mut a = ArcWrapper::new(Arc::new(Test { a: 1024 }), shared_flag.clone(), None);
536            a.set_store_path(a_path.clone());
537            let b = ArcWrapper::new(Arc::new(1024), shared_flag.clone(), None);
538            assert!(b.store_path.is_none());
539
540            println!("insert a with heap size: {:?}", a.heap_size());
541            let rt = cache.insert("a", a);
542            if let Err(e) = rt {
543                panic!("{}", format!("insert a failed: {:?}", e.to_string()));
544            }
545            println!("after insert a, cache used = {}", cache.current_size());
546        }
547        assert!(!a_path.exists());
548
549        let b_path = path.join("b");
550        // insert b, a should be ejected
551        {
552            let mut b = ArcWrapper::new(Arc::new(Test { a: 996 }), shared_flag.clone(), None);
553            b.set_store_path(b_path.clone());
554            let rt = cache.insert("b", b);
555            if let Err(e) = rt {
556                panic!("{}", format!("insert a failed: {:?}", e.to_string()));
557            }
558            println!("after insert b, cache used = {}", cache.current_size());
559        }
560        assert!(a_path.exists());
561        assert!(!b_path.exists());
562        shared_flag.store(true, Ordering::Release);
563        fs::remove_dir_all(path).unwrap();
564        // should pass even b's path not exists
565    }
566}