git_internal/internal/pack/
cache.rs

1//! Multi-tier cache for pack decoding that combines an in-memory LRU with spill-to-disk storage and
2//! bookkeeping for concurrent rebuild tasks.
3
4use std::{
5    fs, io,
6    path::{Path, PathBuf},
7    sync::{
8        Arc, Mutex, Once,
9        atomic::{AtomicBool, Ordering},
10    },
11    thread::sleep,
12};
13
14use dashmap::{DashMap, DashSet};
15use lru_mem::LruCache;
16use threadpool::ThreadPool;
17
18use crate::{
19    hash::ObjectHash,
20    internal::pack::cache_object::{ArcWrapper, CacheObject, FileLoadStore, MemSizeRecorder},
21    time_it,
22};
23
24/// Trait defining the interface for a multi-tier cache system.
25/// This cache supports insertion and retrieval of objects by both offset and hash,
26/// as well as memory usage tracking and clearing functionality.
27pub trait _Cache {
28    fn new(mem_size: Option<usize>, tmp_path: PathBuf, thread_num: usize) -> Self
29    where
30        Self: Sized;
31    fn get_hash(&self, offset: usize) -> Option<ObjectHash>;
32    fn insert(&self, offset: usize, hash: ObjectHash, obj: CacheObject) -> Arc<CacheObject>;
33    fn get_by_offset(&self, offset: usize) -> Option<Arc<CacheObject>>;
34    fn get_by_hash(&self, h: ObjectHash) -> Option<Arc<CacheObject>>;
35    fn total_inserted(&self) -> usize;
36    fn memory_used(&self) -> usize;
37    fn clear(&self);
38}
39
40impl lru_mem::HeapSize for ObjectHash {
41    fn heap_size(&self) -> usize {
42        0
43    }
44}
45
46/// Multi-tier cache implementation combining an in-memory LRU cache with spill-to-disk storage.
47/// It uses a DashMap for offset-to-hash mapping and a DashSet to track cached hashes.
48/// The cache supports concurrent rebuild tasks using a thread pool.
49pub struct Caches {
50    map_offset: DashMap<usize, ObjectHash>, // offset to hash
51    hash_set: DashSet<ObjectHash>,          // item in the cache
52    // dropping large lru cache will take a long time on Windows without multi-thread IO
53    // because "multi-thread IO" clone Arc<CacheObject>, so it won't be dropped in the main thread,
54    // and `CacheObjects` will be killed by OS after Process ends abnormally
55    // Solution: use `mimalloc`
56    lru_cache: Mutex<LruCache<ObjectHash, ArcWrapper<CacheObject>>>,
57    mem_size: Option<usize>,
58    tmp_path: PathBuf,
59    path_prefixes: [Once; 256],
60    pool: Arc<ThreadPool>,
61    complete_signal: Arc<AtomicBool>,
62}
63
64impl Caches {
65    /// only get object from memory, not from tmp file
66    fn try_get(&self, hash: ObjectHash) -> Option<Arc<CacheObject>> {
67        let mut map = self.lru_cache.lock().unwrap();
68        map.get(&hash).map(|x| x.data.clone())
69    }
70
71    /// !IMPORTANT: because of the process of pack, the file must be written / be writing before, so it won't be dead lock
72    /// fall back to temp to get item. **invoker should ensure the hash is in the cache, or it will block forever**
73    fn get_fallback(&self, hash: ObjectHash) -> io::Result<Arc<CacheObject>> {
74        let path = self.generate_temp_path(&self.tmp_path, hash);
75        // read from tmp file
76        let obj = {
77            loop {
78                match Self::read_from_temp(&path) {
79                    Ok(x) => break x,
80                    Err(e) if e.kind() == io::ErrorKind::NotFound => {
81                        sleep(std::time::Duration::from_millis(10));
82                        continue;
83                    }
84                    Err(e) => return Err(e), // other error
85                }
86            }
87        };
88
89        let mut map = self.lru_cache.lock().unwrap();
90        let obj = Arc::new(obj);
91        let mut x = ArcWrapper::new(
92            obj.clone(),
93            self.complete_signal.clone(),
94            Some(self.pool.clone()),
95        );
96        x.set_store_path(path);
97        let _ = map.insert(hash, x); // handle the error
98        Ok(obj)
99    }
100
101    /// generate the temp file path, hex string of the hash
102    fn generate_temp_path(&self, tmp_path: &Path, hash: ObjectHash) -> PathBuf {
103        // Reserve capacity for base path, 2-char subdir, hex hash string, and separators
104        let mut path =
105            PathBuf::with_capacity(self.tmp_path.capacity() + hash.to_string().len() + 5);
106        path.push(tmp_path);
107        let hash_str = hash._to_string();
108        path.push(&hash_str[..2]); // use first 2 chars as the directory
109        self.path_prefixes[hash.as_ref()[0] as usize].call_once(|| {
110            // Check if the directory exists, if not, create it
111            if !path.exists() {
112                fs::create_dir_all(&path).unwrap();
113            }
114        });
115        path.push(hash_str);
116        path
117    }
118
119    /// read CacheObject from temp file
120    fn read_from_temp(path: &Path) -> io::Result<CacheObject> {
121        let obj = CacheObject::f_load(path)?;
122        // Deserializing will also create an object but without Construction outside and `::new()`
123        // So if you want to do sth. while Constructing, impl Deserialize trait yourself
124        obj.record_mem_size();
125        Ok(obj)
126    }
127
128    /// number of queued tasks in the thread pool
129    pub fn queued_tasks(&self) -> usize {
130        self.pool.queued_count()
131    }
132
133    /// memory used by the index (exclude lru_cache which is contained in CacheObject::get_mem_size())
134    pub fn memory_used_index(&self) -> usize {
135        self.map_offset.capacity()
136            * (std::mem::size_of::<usize>() + std::mem::size_of::<ObjectHash>())
137            + self.hash_set.capacity() * (std::mem::size_of::<ObjectHash>())
138    }
139
140    /// remove the tmp dir
141    pub fn remove_tmp_dir(&self) {
142        time_it!("Remove tmp dir", {
143            if self.tmp_path.exists() {
144                fs::remove_dir_all(&self.tmp_path).unwrap(); //very slow
145            }
146        });
147    }
148}
149
150impl _Cache for Caches {
151    /// @param size: the size of the memory lru cache. **None means no limit**
152    /// @param tmp_path: the path to store the cache object in the tmp file
153    fn new(mem_size: Option<usize>, tmp_path: PathBuf, thread_num: usize) -> Self
154    where
155        Self: Sized,
156    {
157        // `None` means no limit, so no need to create the tmp dir
158        if mem_size.is_some() {
159            fs::create_dir_all(&tmp_path).unwrap();
160        }
161
162        Caches {
163            map_offset: DashMap::new(),
164            hash_set: DashSet::new(),
165            lru_cache: Mutex::new(LruCache::new(mem_size.unwrap_or(usize::MAX))),
166            mem_size,
167            tmp_path,
168            path_prefixes: [const { Once::new() }; 256],
169            pool: Arc::new(ThreadPool::new(thread_num)),
170            complete_signal: Arc::new(AtomicBool::new(false)),
171        }
172    }
173
174    fn get_hash(&self, offset: usize) -> Option<ObjectHash> {
175        self.map_offset.get(&offset).map(|x| *x)
176    }
177
178    fn insert(&self, offset: usize, hash: ObjectHash, obj: CacheObject) -> Arc<CacheObject> {
179        let obj_arc = Arc::new(obj);
180        {
181            // ? whether insert to cache directly or only write to tmp file
182            let mut map = self.lru_cache.lock().unwrap();
183            let mut a_obj = ArcWrapper::new(
184                obj_arc.clone(),
185                self.complete_signal.clone(),
186                Some(self.pool.clone()),
187            );
188            if self.mem_size.is_some() {
189                a_obj.set_store_path(self.generate_temp_path(&self.tmp_path, hash));
190            }
191            let _ = map.insert(hash, a_obj);
192        }
193        //order maters as for reading in 'get_by_offset()'
194        self.hash_set.insert(hash);
195        self.map_offset.insert(offset, hash);
196
197        obj_arc
198    }
199
200    /// get object by offset, from memory or tmp file
201    fn get_by_offset(&self, offset: usize) -> Option<Arc<CacheObject>> {
202        match self.map_offset.get(&offset) {
203            Some(x) => self.get_by_hash(*x),
204            None => None,
205        }
206    }
207
208    /// get object by hash, from memory or tmp file
209    fn get_by_hash(&self, hash: ObjectHash) -> Option<Arc<CacheObject>> {
210        // check if the hash is in the cache( lru or tmp file)
211        if self.hash_set.contains(&hash) {
212            match self.try_get(hash) {
213                Some(x) => Some(x),
214                None => {
215                    if self.mem_size.is_none() {
216                        panic!("should not be here when mem_size is not set")
217                    }
218                    self.get_fallback(hash).ok()
219                }
220            }
221        } else {
222            None
223        }
224    }
225
226    fn total_inserted(&self) -> usize {
227        self.hash_set.len()
228    }
229    fn memory_used(&self) -> usize {
230        self.lru_cache.lock().unwrap().current_size() + self.memory_used_index()
231    }
232    fn clear(&self) {
233        time_it!("Caches clear", {
234            self.complete_signal.store(true, Ordering::Release);
235            self.pool.join();
236            self.lru_cache.lock().unwrap().clear();
237            self.hash_set.clear();
238            self.hash_set.shrink_to_fit();
239            self.map_offset.clear();
240            self.map_offset.shrink_to_fit();
241        });
242
243        assert_eq!(self.pool.queued_count(), 0);
244        assert_eq!(self.pool.active_count(), 0);
245        assert_eq!(self.lru_cache.lock().unwrap().len(), 0);
246    }
247}
248
249#[cfg(test)]
250mod test {
251    use std::{env, sync::Arc, thread};
252
253    use super::*;
254    use crate::{
255        hash::{HashKind, ObjectHash, set_hash_kind_for_test},
256        internal::{object::types::ObjectType, pack::cache_object::CacheObjectInfo},
257    };
258
259    /// Helper to build a base CacheObject with given size and hash.
260    fn make_obj(size: usize, hash: ObjectHash) -> CacheObject {
261        CacheObject {
262            info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash),
263            data_decompressed: vec![0; size],
264            mem_recorder: None,
265            offset: 0,
266            crc32: 0,
267            is_delta_in_pack: false,
268        }
269    }
270
271    /// test single-threaded cache behavior with different hash kinds and capacities
272    #[test]
273    fn test_cache_single_thread() {
274        for (kind, cap, size_ab, size_c, tmp_dir) in [
275            (
276                HashKind::Sha1,
277                2048usize,
278                800usize,
279                1700usize,
280                "tests/.cache_tmp",
281            ),
282            (
283                HashKind::Sha256,
284                4096usize,
285                1500usize,
286                3000usize,
287                "tests/.cache_tmp_sha256",
288            ),
289        ] {
290            let _guard = set_hash_kind_for_test(kind);
291            let source = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
292            let tmp_path = source.clone().join(tmp_dir);
293            if tmp_path.exists() {
294                fs::remove_dir_all(&tmp_path).unwrap();
295            }
296
297            let cache = Caches::new(Some(cap), tmp_path, 1);
298            let a_hash = ObjectHash::new(String::from("a").as_bytes());
299            let b_hash = ObjectHash::new(String::from("b").as_bytes());
300            let c_hash = ObjectHash::new(String::from("c").as_bytes());
301
302            let a = make_obj(size_ab, a_hash);
303            let b = make_obj(size_ab, b_hash);
304            let c = make_obj(size_c, c_hash);
305
306            // insert a
307            cache.insert(a.offset, a_hash, a.clone());
308            assert!(cache.hash_set.contains(&a_hash));
309            assert!(cache.try_get(a_hash).is_some());
310
311            // insert b, a should still be in cache
312            cache.insert(b.offset, b_hash, b.clone());
313            assert!(cache.hash_set.contains(&b_hash));
314            assert!(cache.try_get(b_hash).is_some());
315            assert!(cache.try_get(a_hash).is_some());
316
317            // insert c which will evict both a and b
318            cache.insert(c.offset, c_hash, c.clone());
319            assert!(cache.try_get(a_hash).is_none());
320            assert!(cache.try_get(b_hash).is_none());
321            assert!(cache.try_get(c_hash).is_some());
322            assert!(cache.get_by_hash(c_hash).is_some());
323        }
324    }
325
326    /// consider the multi-threaded scenario where different threads use different hash kinds
327    #[test]
328    fn test_cache_multi_thread_mixed_hash_kinds() {
329        let base = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
330        let tmp_path = base.join("tests/.cache_tmp_mixed");
331        if tmp_path.exists() {
332            fs::remove_dir_all(&tmp_path).unwrap();
333        }
334
335        let cache = Arc::new(Caches::new(Some(4096), tmp_path, 2));
336
337        let cache_sha1 = Arc::clone(&cache);
338        let handle_sha1 = thread::spawn(move || {
339            let _g = set_hash_kind_for_test(HashKind::Sha1);
340            let hash = ObjectHash::new(b"sha1-entry");
341            let obj = CacheObject {
342                info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash),
343                data_decompressed: vec![0; 800],
344                mem_recorder: None,
345                offset: 1,
346                crc32: 0,
347                is_delta_in_pack: false,
348            };
349            cache_sha1.insert(obj.offset, hash, obj.clone());
350            assert!(cache_sha1.hash_set.contains(&hash));
351            assert!(cache_sha1.try_get(hash).is_some());
352        });
353
354        let cache_sha256 = Arc::clone(&cache);
355        let handle_sha256 = thread::spawn(move || {
356            let _g = set_hash_kind_for_test(HashKind::Sha256);
357            let hash = ObjectHash::new(b"sha256-entry");
358            let obj = CacheObject {
359                info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash),
360                data_decompressed: vec![0; 1500],
361                mem_recorder: None,
362                offset: 2,
363                crc32: 0,
364                is_delta_in_pack: false,
365            };
366            cache_sha256.insert(obj.offset, hash, obj.clone());
367            assert!(cache_sha256.hash_set.contains(&hash));
368            assert!(cache_sha256.try_get(hash).is_some());
369        });
370
371        handle_sha1.join().unwrap();
372        handle_sha256.join().unwrap();
373
374        assert_eq!(cache.total_inserted(), 2);
375    }
376}