Skip to main content

git_internal/internal/pack/
cache.rs

1//! Multi-tier cache for pack decoding that combines an in-memory LRU with spill-to-disk storage and
2//! bookkeeping for concurrent rebuild tasks.
3
4use std::{
5    fs, io,
6    path::{Path, PathBuf},
7    sync::{
8        Arc, Mutex, Once,
9        atomic::{AtomicBool, Ordering},
10    },
11    thread::sleep,
12};
13
14use dashmap::{DashMap, DashSet};
15use lru_mem::LruCache;
16use threadpool::ThreadPool;
17
18use crate::{
19    hash::ObjectHash,
20    internal::pack::cache_object::{ArcWrapper, CacheObject, FileLoadStore, MemSizeRecorder},
21    time_it,
22};
23
24/// Trait defining the interface for a multi-tier cache system.
25/// This cache supports insertion and retrieval of objects by both offset and hash,
26/// as well as memory usage tracking and clearing functionality.
27pub trait _Cache {
28    fn new(mem_size: Option<usize>, tmp_path: PathBuf, thread_num: usize) -> Self
29    where
30        Self: Sized;
31    fn get_hash(&self, offset: usize) -> Option<ObjectHash>;
32    fn insert(&self, offset: usize, hash: ObjectHash, obj: CacheObject) -> Arc<CacheObject>;
33    fn get_by_offset(&self, offset: usize) -> Option<Arc<CacheObject>>;
34    fn get_by_hash(&self, h: ObjectHash) -> Option<Arc<CacheObject>>;
35    fn total_inserted(&self) -> usize;
36    fn memory_used(&self) -> usize;
37    fn clear(&self);
38}
39
40impl lru_mem::HeapSize for ObjectHash {
41    fn heap_size(&self) -> usize {
42        0
43    }
44}
45
46/// Multi-tier cache implementation combining an in-memory LRU cache with spill-to-disk storage.
47/// It uses a DashMap for offset-to-hash mapping and a DashSet to track cached hashes.
48/// The cache supports concurrent rebuild tasks using a thread pool.
49pub struct Caches {
50    map_offset: DashMap<usize, ObjectHash>, // offset to hash
51    hash_set: DashSet<ObjectHash>,          // item in the cache
52    // dropping large lru cache will take a long time on Windows without multi-thread IO
53    // because "multi-thread IO" clone Arc<CacheObject>, so it won't be dropped in the main thread,
54    // and `CacheObjects` will be killed by OS after Process ends abnormally
55    // Solution: use `mimalloc`
56    lru_cache: Mutex<LruCache<ObjectHash, ArcWrapper<CacheObject>>>,
57    mem_size: Option<usize>,
58    tmp_path: PathBuf,
59    path_prefixes: [Once; 256],
60    pool: Arc<ThreadPool>,
61    complete_signal: Arc<AtomicBool>,
62}
63
64impl Caches {
65    /// only get object from memory, not from tmp file
66    fn try_get(&self, hash: ObjectHash) -> Option<Arc<CacheObject>> {
67        let mut map = self.lru_cache.lock().unwrap();
68        map.get(&hash).map(|x| x.data.clone())
69    }
70
71    /// !IMPORTANT: because of the process of pack, the file must be written / be writing before, so it won't be dead lock
72    /// fall back to temp to get item. **invoker should ensure the hash is in the cache, or it will block forever**
73    fn get_fallback(&self, hash: ObjectHash) -> io::Result<Arc<CacheObject>> {
74        let path = self.generate_temp_path(&self.tmp_path, hash);
75        // read from tmp file
76        let obj = {
77            loop {
78                match Self::read_from_temp(&path) {
79                    Ok(x) => break x,
80                    Err(e) if e.kind() == io::ErrorKind::NotFound => {
81                        sleep(std::time::Duration::from_millis(10));
82                        continue;
83                    }
84                    Err(e) => return Err(e), // other error
85                }
86            }
87        };
88
89        let mut map = self.lru_cache.lock().unwrap();
90        let obj = Arc::new(obj);
91        let mut x = ArcWrapper::new(
92            obj.clone(),
93            self.complete_signal.clone(),
94            Some(self.pool.clone()),
95        );
96        x.set_store_path(path);
97        let _ = map.insert(hash, x); // handle the error
98        Ok(obj)
99    }
100
101    /// generate the temp file path, hex string of the hash
102    fn generate_temp_path(&self, tmp_path: &Path, hash: ObjectHash) -> PathBuf {
103        // Reserve capacity for base path, 2-char subdir, hex hash string, and separators
104        let mut path =
105            PathBuf::with_capacity(self.tmp_path.capacity() + hash.to_string().len() + 5);
106        path.push(tmp_path);
107        let hash_str = hash._to_string();
108        path.push(&hash_str[..2]); // use first 2 chars as the directory
109        self.path_prefixes[hash.as_ref()[0] as usize].call_once(|| {
110            // Check if the directory exists, if not, create it
111            if !path.exists() {
112                fs::create_dir_all(&path).unwrap();
113            }
114        });
115        path.push(hash_str);
116        path
117    }
118
119    /// read CacheObject from temp file
120    fn read_from_temp(path: &Path) -> io::Result<CacheObject> {
121        let obj = CacheObject::f_load(path)?;
122        // Deserializing will also create an object but without Construction outside and `::new()`
123        // So if you want to do sth. while Constructing, impl Deserialize trait yourself
124        obj.record_mem_size();
125        Ok(obj)
126    }
127
128    /// number of queued tasks in the thread pool
129    pub fn queued_tasks(&self) -> usize {
130        self.pool.queued_count()
131    }
132
133    /// memory used by the index (exclude lru_cache which is contained in CacheObject::get_mem_size())
134    pub fn memory_used_index(&self) -> usize {
135        self.map_offset.capacity()
136            * (std::mem::size_of::<usize>() + std::mem::size_of::<ObjectHash>())
137            + self.hash_set.capacity() * (std::mem::size_of::<ObjectHash>())
138    }
139
140    /// remove the tmp dir
141    pub fn remove_tmp_dir(&self) {
142        time_it!("Remove tmp dir", {
143            if self.tmp_path.exists() {
144                fs::remove_dir_all(&self.tmp_path).unwrap(); //very slow
145                // Try to remove parent .cache_temp directory if it's empty
146                if let Some(parent) = self.tmp_path.parent() {
147                    let is_cache_temp = parent
148                        .file_name()
149                        .and_then(|n| n.to_str())
150                        .map(|n| n == ".cache_temp")
151                        .unwrap_or(false);
152                    if is_cache_temp {
153                        // Attempt to remove the parent directory if empty
154                        // This will fail silently if the directory is not empty or doesn't exist
155                        let _ = fs::remove_dir(parent);
156                    }
157                }
158            }
159        });
160    }
161}
162
163impl _Cache for Caches {
164    /// @param size: the size of the memory lru cache. **None means no limit**
165    /// @param tmp_path: the path to store the cache object in the tmp file
166    fn new(mem_size: Option<usize>, tmp_path: PathBuf, thread_num: usize) -> Self
167    where
168        Self: Sized,
169    {
170        // `None` means no limit, so no need to create the tmp dir
171        if mem_size.is_some() {
172            fs::create_dir_all(&tmp_path).unwrap();
173        }
174
175        Caches {
176            map_offset: DashMap::new(),
177            hash_set: DashSet::new(),
178            lru_cache: Mutex::new(LruCache::new(mem_size.unwrap_or(usize::MAX))),
179            mem_size,
180            tmp_path,
181            path_prefixes: [const { Once::new() }; 256],
182            pool: Arc::new(ThreadPool::new(thread_num)),
183            complete_signal: Arc::new(AtomicBool::new(false)),
184        }
185    }
186
187    fn get_hash(&self, offset: usize) -> Option<ObjectHash> {
188        self.map_offset.get(&offset).map(|x| *x)
189    }
190
191    fn insert(&self, offset: usize, hash: ObjectHash, obj: CacheObject) -> Arc<CacheObject> {
192        let obj_arc = Arc::new(obj);
193        {
194            // ? whether insert to cache directly or only write to tmp file
195            let mut map = self.lru_cache.lock().unwrap();
196            let mut a_obj = ArcWrapper::new(
197                obj_arc.clone(),
198                self.complete_signal.clone(),
199                Some(self.pool.clone()),
200            );
201            if self.mem_size.is_some() {
202                a_obj.set_store_path(self.generate_temp_path(&self.tmp_path, hash));
203            }
204            let _ = map.insert(hash, a_obj);
205        }
206        //order maters as for reading in 'get_by_offset()'
207        self.hash_set.insert(hash);
208        self.map_offset.insert(offset, hash);
209
210        obj_arc
211    }
212
213    /// get object by offset, from memory or tmp file
214    fn get_by_offset(&self, offset: usize) -> Option<Arc<CacheObject>> {
215        match self.map_offset.get(&offset) {
216            Some(x) => self.get_by_hash(*x),
217            None => None,
218        }
219    }
220
221    /// get object by hash, from memory or tmp file
222    fn get_by_hash(&self, hash: ObjectHash) -> Option<Arc<CacheObject>> {
223        // check if the hash is in the cache( lru or tmp file)
224        if self.hash_set.contains(&hash) {
225            match self.try_get(hash) {
226                Some(x) => Some(x),
227                None => {
228                    if self.mem_size.is_none() {
229                        panic!("should not be here when mem_size is not set")
230                    }
231                    self.get_fallback(hash).ok()
232                }
233            }
234        } else {
235            None
236        }
237    }
238
239    fn total_inserted(&self) -> usize {
240        self.hash_set.len()
241    }
242    fn memory_used(&self) -> usize {
243        self.lru_cache.lock().unwrap().current_size() + self.memory_used_index()
244    }
245    fn clear(&self) {
246        time_it!("Caches clear", {
247            self.complete_signal.store(true, Ordering::Release);
248            self.pool.join();
249            self.lru_cache.lock().unwrap().clear();
250            self.hash_set.clear();
251            self.hash_set.shrink_to_fit();
252            self.map_offset.clear();
253            self.map_offset.shrink_to_fit();
254        });
255
256        assert_eq!(self.pool.queued_count(), 0);
257        assert_eq!(self.pool.active_count(), 0);
258        assert_eq!(self.lru_cache.lock().unwrap().len(), 0);
259    }
260}
261
262#[cfg(test)]
263mod test {
264    use std::{env, sync::Arc, thread};
265
266    use super::*;
267    use crate::{
268        hash::{HashKind, ObjectHash, set_hash_kind_for_test},
269        internal::{object::types::ObjectType, pack::cache_object::CacheObjectInfo},
270    };
271
272    /// Helper to build a base CacheObject with given size and hash.
273    fn make_obj(size: usize, hash: ObjectHash) -> CacheObject {
274        CacheObject {
275            info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash),
276            data_decompressed: vec![0; size],
277            mem_recorder: None,
278            offset: 0,
279            crc32: 0,
280            is_delta_in_pack: false,
281        }
282    }
283
284    /// test single-threaded cache behavior with different hash kinds and capacities
285    #[test]
286    fn test_cache_single_thread() {
287        for (kind, cap, size_ab, size_c, tmp_dir) in [
288            (
289                HashKind::Sha1,
290                2048usize,
291                800usize,
292                1700usize,
293                "tests/.cache_tmp",
294            ),
295            (
296                HashKind::Sha256,
297                4096usize,
298                1500usize,
299                3000usize,
300                "tests/.cache_tmp_sha256",
301            ),
302        ] {
303            let _guard = set_hash_kind_for_test(kind);
304            let source = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
305            let tmp_path = source.clone().join(tmp_dir);
306            if tmp_path.exists() {
307                fs::remove_dir_all(&tmp_path).unwrap();
308            }
309
310            let cache = Caches::new(Some(cap), tmp_path, 1);
311            let a_hash = ObjectHash::new(String::from("a").as_bytes());
312            let b_hash = ObjectHash::new(String::from("b").as_bytes());
313            let c_hash = ObjectHash::new(String::from("c").as_bytes());
314
315            let a = make_obj(size_ab, a_hash);
316            let b = make_obj(size_ab, b_hash);
317            let c = make_obj(size_c, c_hash);
318
319            // insert a
320            cache.insert(a.offset, a_hash, a.clone());
321            assert!(cache.hash_set.contains(&a_hash));
322            assert!(cache.try_get(a_hash).is_some());
323
324            // insert b, a should still be in cache
325            cache.insert(b.offset, b_hash, b.clone());
326            assert!(cache.hash_set.contains(&b_hash));
327            assert!(cache.try_get(b_hash).is_some());
328            assert!(cache.try_get(a_hash).is_some());
329
330            // insert c which will evict both a and b
331            cache.insert(c.offset, c_hash, c.clone());
332            assert!(cache.try_get(a_hash).is_none());
333            assert!(cache.try_get(b_hash).is_none());
334            assert!(cache.try_get(c_hash).is_some());
335            assert!(cache.get_by_hash(c_hash).is_some());
336        }
337    }
338
339    /// consider the multi-threaded scenario where different threads use different hash kinds
340    #[test]
341    fn test_cache_multi_thread_mixed_hash_kinds() {
342        let base = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
343        let tmp_path = base.join("tests/.cache_tmp_mixed");
344        if tmp_path.exists() {
345            fs::remove_dir_all(&tmp_path).unwrap();
346        }
347
348        let cache = Arc::new(Caches::new(Some(4096), tmp_path, 2));
349
350        let cache_sha1 = Arc::clone(&cache);
351        let handle_sha1 = thread::spawn(move || {
352            let _g = set_hash_kind_for_test(HashKind::Sha1);
353            let hash = ObjectHash::new(b"sha1-entry");
354            let obj = CacheObject {
355                info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash),
356                data_decompressed: vec![0; 800],
357                mem_recorder: None,
358                offset: 1,
359                crc32: 0,
360                is_delta_in_pack: false,
361            };
362            cache_sha1.insert(obj.offset, hash, obj.clone());
363            assert!(cache_sha1.hash_set.contains(&hash));
364            assert!(cache_sha1.try_get(hash).is_some());
365        });
366
367        let cache_sha256 = Arc::clone(&cache);
368        let handle_sha256 = thread::spawn(move || {
369            let _g = set_hash_kind_for_test(HashKind::Sha256);
370            let hash = ObjectHash::new(b"sha256-entry");
371            let obj = CacheObject {
372                info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash),
373                data_decompressed: vec![0; 1500],
374                mem_recorder: None,
375                offset: 2,
376                crc32: 0,
377                is_delta_in_pack: false,
378            };
379            cache_sha256.insert(obj.offset, hash, obj.clone());
380            assert!(cache_sha256.hash_set.contains(&hash));
381            assert!(cache_sha256.try_get(hash).is_some());
382        });
383
384        handle_sha1.join().unwrap();
385        handle_sha256.join().unwrap();
386
387        assert_eq!(cache.total_inserted(), 2);
388    }
389}