Skip to main content

git_internal/internal/pack/
cache.rs

1//! Multi-tier cache for pack decoding that combines an in-memory LRU with spill-to-disk storage and
2//! bookkeeping for concurrent rebuild tasks.
3
4use std::{
5    fs, io,
6    path::{Path, PathBuf},
7    sync::{
8        Arc, Mutex, Once,
9        atomic::{AtomicBool, Ordering},
10    },
11    thread::sleep,
12};
13
14use dashmap::{DashMap, DashSet};
15use lru_mem::LruCache;
16use threadpool::ThreadPool;
17
18use crate::{
19    hash::ObjectHash,
20    internal::pack::cache_object::{ArcWrapper, CacheObject, FileLoadStore, MemSizeRecorder},
21    time_it,
22};
23
24/// Cache format version appended to the disk path so that caches written with an
25/// incompatible serialization format (for example the previous bincode layout)
26/// are ignored instead of causing deserialization errors.
27const CACHE_LAYOUT_VERSION: &str = "rkyv-v1";
28
29/// Trait defining the interface for a multi-tier cache system.
30/// This cache supports insertion and retrieval of objects by both offset and hash,
31/// as well as memory usage tracking and clearing functionality.
32pub trait _Cache {
33    fn new(mem_size: Option<usize>, tmp_path: PathBuf, thread_num: usize) -> Self
34    where
35        Self: Sized;
36    fn get_hash(&self, offset: usize) -> Option<ObjectHash>;
37    fn insert(&self, offset: usize, hash: ObjectHash, obj: CacheObject) -> Arc<CacheObject>;
38    fn get_by_offset(&self, offset: usize) -> Option<Arc<CacheObject>>;
39    fn get_by_hash(&self, h: ObjectHash) -> Option<Arc<CacheObject>>;
40    fn total_inserted(&self) -> usize;
41    fn memory_used(&self) -> usize;
42    fn clear(&self);
43}
44
45impl lru_mem::HeapSize for ObjectHash {
46    fn heap_size(&self) -> usize {
47        0
48    }
49}
50
51/// Multi-tier cache implementation combining an in-memory LRU cache with spill-to-disk storage.
52/// It uses a DashMap for offset-to-hash mapping and a DashSet to track cached hashes.
53/// The cache supports concurrent rebuild tasks using a thread pool.
54pub struct Caches {
55    map_offset: DashMap<usize, ObjectHash>, // offset to hash
56    hash_set: DashSet<ObjectHash>,          // item in the cache
57    // dropping large lru cache will take a long time on Windows without multi-thread IO
58    // because "multi-thread IO" clone Arc<CacheObject>, so it won't be dropped in the main thread,
59    // and `CacheObjects` will be killed by OS after Process ends abnormally
60    // Solution: use `mimalloc`
61    lru_cache: Mutex<LruCache<ObjectHash, ArcWrapper<CacheObject>>>,
62    mem_size: Option<usize>,
63    tmp_path: PathBuf,
64    path_prefixes: [Once; 256],
65    pool: Arc<ThreadPool>,
66    complete_signal: Arc<AtomicBool>,
67}
68
69impl Caches {
70    /// only get object from memory, not from tmp file
71    fn try_get(&self, hash: ObjectHash) -> Option<Arc<CacheObject>> {
72        let mut map = self.lru_cache.lock().unwrap();
73        map.get(&hash).map(|x| x.data.clone())
74    }
75
76    /// !IMPORTANT: because of the process of pack, the file must be written / be writing before, so it won't be dead lock
77    /// fall back to temp to get item. **invoker should ensure the hash is in the cache, or it will block forever**
78    fn get_fallback(&self, hash: ObjectHash) -> io::Result<Arc<CacheObject>> {
79        let path = self.generate_temp_path(&self.tmp_path, hash);
80        // read from tmp file
81        let obj = {
82            loop {
83                match Self::read_from_temp(&path) {
84                    Ok(x) => break x,
85                    Err(e) if e.kind() == io::ErrorKind::NotFound => {
86                        sleep(std::time::Duration::from_millis(10));
87                        continue;
88                    }
89                    Err(e) => return Err(e), // other error
90                }
91            }
92        };
93
94        let mut map = self.lru_cache.lock().unwrap();
95        let obj = Arc::new(obj);
96        let mut x = ArcWrapper::new(
97            obj.clone(),
98            self.complete_signal.clone(),
99            Some(self.pool.clone()),
100        );
101        x.set_store_path(path);
102        let _ = map.insert(hash, x); // handle the error
103        Ok(obj)
104    }
105
106    /// generate the temp file path, hex string of the hash
107    fn generate_temp_path(&self, tmp_path: &Path, hash: ObjectHash) -> PathBuf {
108        // Reserve capacity for base path, 2-char subdir, hex hash string, and separators
109        let mut path =
110            PathBuf::with_capacity(self.tmp_path.capacity() + hash.to_string().len() + 5);
111        path.push(tmp_path);
112        path.push(CACHE_LAYOUT_VERSION);
113        let hash_str = hash._to_string();
114        path.push(&hash_str[..2]); // use first 2 chars as the directory
115        self.path_prefixes[hash.as_ref()[0] as usize].call_once(|| {
116            // Check if the directory exists, if not, create it
117            if !path.exists() {
118                fs::create_dir_all(&path).unwrap();
119            }
120        });
121        path.push(hash_str);
122        path
123    }
124
125    /// read CacheObject from temp file
126    fn read_from_temp(path: &Path) -> io::Result<CacheObject> {
127        let obj = CacheObject::f_load(path)?;
128        // Deserializing will also create an object but without Construction outside and `::new()`
129        // So if you want to do sth. while Constructing, impl Deserialize trait yourself
130        obj.record_mem_size();
131        Ok(obj)
132    }
133
134    /// number of queued tasks in the thread pool
135    pub fn queued_tasks(&self) -> usize {
136        self.pool.queued_count()
137    }
138
139    /// memory used by the index (exclude lru_cache which is contained in CacheObject::get_mem_size())
140    pub fn memory_used_index(&self) -> usize {
141        self.map_offset.capacity()
142            * (std::mem::size_of::<usize>() + std::mem::size_of::<ObjectHash>())
143            + self.hash_set.capacity() * (std::mem::size_of::<ObjectHash>())
144    }
145
146    /// remove the tmp dir
147    pub fn remove_tmp_dir(&self) {
148        time_it!("Remove tmp dir", {
149            if self.tmp_path.exists() {
150                fs::remove_dir_all(&self.tmp_path).unwrap(); //very slow
151                // Try to remove parent .cache_temp directory if it's empty
152                if let Some(parent) = self.tmp_path.parent() {
153                    let is_cache_temp = parent
154                        .file_name()
155                        .and_then(|n| n.to_str())
156                        .map(|n| n == ".cache_temp")
157                        .unwrap_or(false);
158                    if is_cache_temp {
159                        // Attempt to remove the parent directory if empty
160                        // This will fail silently if the directory is not empty or doesn't exist
161                        let _ = fs::remove_dir(parent);
162                    }
163                }
164            }
165        });
166    }
167}
168
169impl _Cache for Caches {
170    /// @param size: the size of the memory lru cache. **None means no limit**
171    /// @param tmp_path: the path to store the cache object in the tmp file
172    fn new(mem_size: Option<usize>, tmp_path: PathBuf, thread_num: usize) -> Self
173    where
174        Self: Sized,
175    {
176        // `None` means no limit, so no need to create the tmp dir
177        if mem_size.is_some() {
178            fs::create_dir_all(&tmp_path).unwrap();
179        }
180
181        Caches {
182            map_offset: DashMap::new(),
183            hash_set: DashSet::new(),
184            lru_cache: Mutex::new(LruCache::new(mem_size.unwrap_or(usize::MAX))),
185            mem_size,
186            tmp_path,
187            path_prefixes: [const { Once::new() }; 256],
188            pool: Arc::new(ThreadPool::new(thread_num)),
189            complete_signal: Arc::new(AtomicBool::new(false)),
190        }
191    }
192
193    fn get_hash(&self, offset: usize) -> Option<ObjectHash> {
194        self.map_offset.get(&offset).map(|x| *x)
195    }
196
197    fn insert(&self, offset: usize, hash: ObjectHash, obj: CacheObject) -> Arc<CacheObject> {
198        let obj_arc = Arc::new(obj);
199        {
200            // ? whether insert to cache directly or only write to tmp file
201            let mut map = self.lru_cache.lock().unwrap();
202            let mut a_obj = ArcWrapper::new(
203                obj_arc.clone(),
204                self.complete_signal.clone(),
205                Some(self.pool.clone()),
206            );
207            if self.mem_size.is_some() {
208                a_obj.set_store_path(self.generate_temp_path(&self.tmp_path, hash));
209            }
210            let _ = map.insert(hash, a_obj);
211        }
212        //order maters as for reading in 'get_by_offset()'
213        self.hash_set.insert(hash);
214        self.map_offset.insert(offset, hash);
215
216        obj_arc
217    }
218
219    /// get object by offset, from memory or tmp file
220    fn get_by_offset(&self, offset: usize) -> Option<Arc<CacheObject>> {
221        match self.map_offset.get(&offset) {
222            Some(x) => self.get_by_hash(*x),
223            None => None,
224        }
225    }
226
227    /// get object by hash, from memory or tmp file
228    fn get_by_hash(&self, hash: ObjectHash) -> Option<Arc<CacheObject>> {
229        // check if the hash is in the cache( lru or tmp file)
230        if self.hash_set.contains(&hash) {
231            match self.try_get(hash) {
232                Some(x) => Some(x),
233                None => {
234                    if self.mem_size.is_none() {
235                        panic!("should not be here when mem_size is not set")
236                    }
237                    self.get_fallback(hash).ok()
238                }
239            }
240        } else {
241            None
242        }
243    }
244
245    fn total_inserted(&self) -> usize {
246        self.hash_set.len()
247    }
248    fn memory_used(&self) -> usize {
249        self.lru_cache.lock().unwrap().current_size() + self.memory_used_index()
250    }
251    fn clear(&self) {
252        time_it!("Caches clear", {
253            self.complete_signal.store(true, Ordering::Release);
254            self.pool.join();
255            self.lru_cache.lock().unwrap().clear();
256            self.hash_set.clear();
257            self.hash_set.shrink_to_fit();
258            self.map_offset.clear();
259            self.map_offset.shrink_to_fit();
260        });
261
262        assert_eq!(self.pool.queued_count(), 0);
263        assert_eq!(self.pool.active_count(), 0);
264        assert_eq!(self.lru_cache.lock().unwrap().len(), 0);
265    }
266}
267
268#[cfg(test)]
269mod test {
270    use std::{env, sync::Arc, thread};
271
272    use super::*;
273    use crate::{
274        hash::{HashKind, ObjectHash, set_hash_kind_for_test},
275        internal::{object::types::ObjectType, pack::cache_object::CacheObjectInfo},
276    };
277
278    /// Helper to build a base CacheObject with given size and hash.
279    fn make_obj(size: usize, hash: ObjectHash) -> CacheObject {
280        CacheObject {
281            info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash),
282            data_decompressed: vec![0; size],
283            mem_recorder: None,
284            offset: 0,
285            crc32: 0,
286            is_delta_in_pack: false,
287        }
288    }
289
290    /// test single-threaded cache behavior with different hash kinds and capacities
291    #[test]
292    fn test_cache_single_thread() {
293        for (kind, cap, size_ab, size_c, tmp_dir) in [
294            (
295                HashKind::Sha1,
296                2048usize,
297                800usize,
298                1700usize,
299                "tests/.cache_tmp",
300            ),
301            (
302                HashKind::Sha256,
303                4096usize,
304                1500usize,
305                3000usize,
306                "tests/.cache_tmp_sha256",
307            ),
308        ] {
309            let _guard = set_hash_kind_for_test(kind);
310            let source = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
311            let tmp_path = source.clone().join(tmp_dir);
312            if tmp_path.exists() {
313                fs::remove_dir_all(&tmp_path).unwrap();
314            }
315
316            let cache = Caches::new(Some(cap), tmp_path, 1);
317            let a_hash = ObjectHash::new(String::from("a").as_bytes());
318            let b_hash = ObjectHash::new(String::from("b").as_bytes());
319            let c_hash = ObjectHash::new(String::from("c").as_bytes());
320
321            let a = make_obj(size_ab, a_hash);
322            let b = make_obj(size_ab, b_hash);
323            let c = make_obj(size_c, c_hash);
324
325            // insert a
326            cache.insert(a.offset, a_hash, a.clone());
327            assert!(cache.hash_set.contains(&a_hash));
328            assert!(cache.try_get(a_hash).is_some());
329
330            // insert b, a should still be in cache
331            cache.insert(b.offset, b_hash, b.clone());
332            assert!(cache.hash_set.contains(&b_hash));
333            assert!(cache.try_get(b_hash).is_some());
334            assert!(cache.try_get(a_hash).is_some());
335
336            // insert c which will evict both a and b
337            cache.insert(c.offset, c_hash, c.clone());
338            assert!(cache.try_get(a_hash).is_none());
339            assert!(cache.try_get(b_hash).is_none());
340            assert!(cache.try_get(c_hash).is_some());
341            assert!(cache.get_by_hash(c_hash).is_some());
342        }
343    }
344
345    /// consider the multi-threaded scenario where different threads use different hash kinds
346    #[test]
347    fn test_cache_multi_thread_mixed_hash_kinds() {
348        let base = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
349        let tmp_path = base.join("tests/.cache_tmp_mixed");
350        if tmp_path.exists() {
351            fs::remove_dir_all(&tmp_path).unwrap();
352        }
353
354        let cache = Arc::new(Caches::new(Some(4096), tmp_path, 2));
355
356        let cache_sha1 = Arc::clone(&cache);
357        let handle_sha1 = thread::spawn(move || {
358            let _g = set_hash_kind_for_test(HashKind::Sha1);
359            let hash = ObjectHash::new(b"sha1-entry");
360            let obj = CacheObject {
361                info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash),
362                data_decompressed: vec![0; 800],
363                mem_recorder: None,
364                offset: 1,
365                crc32: 0,
366                is_delta_in_pack: false,
367            };
368            cache_sha1.insert(obj.offset, hash, obj.clone());
369            assert!(cache_sha1.hash_set.contains(&hash));
370            assert!(cache_sha1.try_get(hash).is_some());
371        });
372
373        let cache_sha256 = Arc::clone(&cache);
374        let handle_sha256 = thread::spawn(move || {
375            let _g = set_hash_kind_for_test(HashKind::Sha256);
376            let hash = ObjectHash::new(b"sha256-entry");
377            let obj = CacheObject {
378                info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash),
379                data_decompressed: vec![0; 1500],
380                mem_recorder: None,
381                offset: 2,
382                crc32: 0,
383                is_delta_in_pack: false,
384            };
385            cache_sha256.insert(obj.offset, hash, obj.clone());
386            assert!(cache_sha256.hash_set.contains(&hash));
387            assert!(cache_sha256.try_get(hash).is_some());
388        });
389
390        handle_sha1.join().unwrap();
391        handle_sha256.join().unwrap();
392
393        assert_eq!(cache.total_inserted(), 2);
394    }
395}