git_internal/internal/pack/
cache.rs

1use std::path::Path;
2use std::path::PathBuf;
3use std::sync::Once;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex};
6use std::thread::sleep;
7use std::{fs, io};
8
9use dashmap::{DashMap, DashSet};
10use lru_mem::LruCache;
11use threadpool::ThreadPool;
12
13use crate::hash::ObjectHash;
14use crate::internal::pack::cache_object::{
15    ArcWrapper, CacheObject, FileLoadStore, MemSizeRecorder,
16};
17use crate::time_it;
18
19pub trait _Cache {
20    fn new(mem_size: Option<usize>, tmp_path: PathBuf, thread_num: usize) -> Self
21    where
22        Self: Sized;
23    fn get_hash(&self, offset: usize) -> Option<ObjectHash>;
24    fn insert(&self, offset: usize, hash: ObjectHash, obj: CacheObject) -> Arc<CacheObject>;
25    fn get_by_offset(&self, offset: usize) -> Option<Arc<CacheObject>>;
26    fn get_by_hash(&self, h: ObjectHash) -> Option<Arc<CacheObject>>;
27    fn total_inserted(&self) -> usize;
28    fn memory_used(&self) -> usize;
29    fn clear(&self);
30}
31
32impl lru_mem::HeapSize for ObjectHash {
33    fn heap_size(&self) -> usize {
34        0
35    }
36}
37
38pub struct Caches {
39    map_offset: DashMap<usize, ObjectHash>, // offset to hash
40    hash_set: DashSet<ObjectHash>,          // item in the cache
41    // dropping large lru cache will take a long time on Windows without multi-thread IO
42    // because "multi-thread IO" clone Arc<CacheObject>, so it won't be dropped in the main thread,
43    // and `CacheObjects` will be killed by OS after Process ends abnormally
44    // Solution: use `mimalloc`
45    lru_cache: Mutex<LruCache<ObjectHash, ArcWrapper<CacheObject>>>,
46    mem_size: Option<usize>,
47    tmp_path: PathBuf,
48    path_prefixes: [Once; 256],
49    pool: Arc<ThreadPool>,
50    complete_signal: Arc<AtomicBool>,
51}
52
53impl Caches {
54    /// only get object from memory, not from tmp file
55    fn try_get(&self, hash: ObjectHash) -> Option<Arc<CacheObject>> {
56        let mut map = self.lru_cache.lock().unwrap();
57        map.get(&hash).map(|x| x.data.clone())
58    }
59
60    /// !IMPORTANT: because of the process of pack, the file must be written / be writing before, so it won't be dead lock
61    /// fall back to temp to get item. **invoker should ensure the hash is in the cache, or it will block forever**
62    fn get_fallback(&self, hash: ObjectHash) -> io::Result<Arc<CacheObject>> {
63        let path = self.generate_temp_path(&self.tmp_path, hash);
64        // read from tmp file
65        let obj = {
66            loop {
67                match Self::read_from_temp(&path) {
68                    Ok(x) => break x,
69                    Err(e) if e.kind() == io::ErrorKind::NotFound => {
70                        sleep(std::time::Duration::from_millis(10));
71                        continue;
72                    }
73                    Err(e) => return Err(e), // other error
74                }
75            }
76        };
77
78        let mut map = self.lru_cache.lock().unwrap();
79        let obj = Arc::new(obj);
80        let mut x = ArcWrapper::new(
81            obj.clone(),
82            self.complete_signal.clone(),
83            Some(self.pool.clone()),
84        );
85        x.set_store_path(path);
86        let _ = map.insert(hash, x); // handle the error
87        Ok(obj)
88    }
89
90    /// generate the temp file path, hex string of the hash
91    fn generate_temp_path(&self, tmp_path: &Path, hash: ObjectHash) -> PathBuf {
92        // Reserve capacity for base path, 2-char subdir, hex hash string, and separators
93        let mut path =
94            PathBuf::with_capacity(self.tmp_path.capacity() + hash.to_string().len() + 5);
95        path.push(tmp_path);
96        let hash_str = hash._to_string();
97        path.push(&hash_str[..2]); // use first 2 chars as the directory
98        self.path_prefixes[hash.as_ref()[0] as usize].call_once(|| {
99            // Check if the directory exists, if not, create it
100            if !path.exists() {
101                fs::create_dir_all(&path).unwrap();
102            }
103        });
104        path.push(hash_str);
105        path
106    }
107
108    fn read_from_temp(path: &Path) -> io::Result<CacheObject> {
109        let obj = CacheObject::f_load(path)?;
110        // Deserializing will also create an object but without Construction outside and `::new()`
111        // So if you want to do sth. while Constructing, impl Deserialize trait yourself
112        obj.record_mem_size();
113        Ok(obj)
114    }
115
116    pub fn queued_tasks(&self) -> usize {
117        self.pool.queued_count()
118    }
119
120    /// memory used by the index (exclude lru_cache which is contained in CacheObject::get_mem_size())
121    pub fn memory_used_index(&self) -> usize {
122        self.map_offset.capacity()
123            * (std::mem::size_of::<usize>() + std::mem::size_of::<ObjectHash>())
124            + self.hash_set.capacity() * (std::mem::size_of::<ObjectHash>())
125    }
126
127    /// remove the tmp dir
128    pub fn remove_tmp_dir(&self) {
129        time_it!("Remove tmp dir", {
130            if self.tmp_path.exists() {
131                fs::remove_dir_all(&self.tmp_path).unwrap(); //very slow
132            }
133        });
134    }
135}
136
137impl _Cache for Caches {
138    /// @param size: the size of the memory lru cache. **None means no limit**
139    /// @param tmp_path: the path to store the cache object in the tmp file
140    fn new(mem_size: Option<usize>, tmp_path: PathBuf, thread_num: usize) -> Self
141    where
142        Self: Sized,
143    {
144        // `None` means no limit, so no need to create the tmp dir
145        if mem_size.is_some() {
146            fs::create_dir_all(&tmp_path).unwrap();
147        }
148
149        Caches {
150            map_offset: DashMap::new(),
151            hash_set: DashSet::new(),
152            lru_cache: Mutex::new(LruCache::new(mem_size.unwrap_or(usize::MAX))),
153            mem_size,
154            tmp_path,
155            path_prefixes: [const { Once::new() }; 256],
156            pool: Arc::new(ThreadPool::new(thread_num)),
157            complete_signal: Arc::new(AtomicBool::new(false)),
158        }
159    }
160
161    fn get_hash(&self, offset: usize) -> Option<ObjectHash> {
162        self.map_offset.get(&offset).map(|x| *x)
163    }
164
165    fn insert(&self, offset: usize, hash: ObjectHash, obj: CacheObject) -> Arc<CacheObject> {
166        let obj_arc = Arc::new(obj);
167        {
168            // ? whether insert to cache directly or only write to tmp file
169            let mut map = self.lru_cache.lock().unwrap();
170            let mut a_obj = ArcWrapper::new(
171                obj_arc.clone(),
172                self.complete_signal.clone(),
173                Some(self.pool.clone()),
174            );
175            if self.mem_size.is_some() {
176                a_obj.set_store_path(self.generate_temp_path(&self.tmp_path, hash));
177            }
178            let _ = map.insert(hash, a_obj);
179        }
180        //order maters as for reading in 'get_by_offset()'
181        self.hash_set.insert(hash);
182        self.map_offset.insert(offset, hash);
183
184        obj_arc
185    }
186
187    fn get_by_offset(&self, offset: usize) -> Option<Arc<CacheObject>> {
188        match self.map_offset.get(&offset) {
189            Some(x) => self.get_by_hash(*x),
190            None => None,
191        }
192    }
193
194    fn get_by_hash(&self, hash: ObjectHash) -> Option<Arc<CacheObject>> {
195        // check if the hash is in the cache( lru or tmp file)
196        if self.hash_set.contains(&hash) {
197            match self.try_get(hash) {
198                Some(x) => Some(x),
199                None => {
200                    if self.mem_size.is_none() {
201                        panic!("should not be here when mem_size is not set")
202                    }
203                    self.get_fallback(hash).ok()
204                }
205            }
206        } else {
207            None
208        }
209    }
210
211    fn total_inserted(&self) -> usize {
212        self.hash_set.len()
213    }
214    fn memory_used(&self) -> usize {
215        self.lru_cache.lock().unwrap().current_size() + self.memory_used_index()
216    }
217    fn clear(&self) {
218        time_it!("Caches clear", {
219            self.complete_signal.store(true, Ordering::Release);
220            self.pool.join();
221            self.lru_cache.lock().unwrap().clear();
222            self.hash_set.clear();
223            self.hash_set.shrink_to_fit();
224            self.map_offset.clear();
225            self.map_offset.shrink_to_fit();
226        });
227
228        assert_eq!(self.pool.queued_count(), 0);
229        assert_eq!(self.pool.active_count(), 0);
230        assert_eq!(self.lru_cache.lock().unwrap().len(), 0);
231    }
232}
233
234#[cfg(test)]
235mod test {
236    use std::env;
237
238    use super::*;
239    use crate::{
240        hash::{HashKind, ObjectHash, set_hash_kind_for_test},
241        internal::{object::types::ObjectType, pack::cache_object::CacheObjectInfo},
242    };
243    use std::sync::Arc;
244    use std::thread;
245
246    #[test]
247    fn test_cache_single_thread() {
248        let _guard = set_hash_kind_for_test(HashKind::Sha1);
249        let source = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
250        let tmp_path = source.clone().join("tests/.cache_tmp");
251
252        if tmp_path.exists() {
253            fs::remove_dir_all(&tmp_path).unwrap();
254        }
255
256        let cache = Caches::new(Some(2048), tmp_path, 1);
257        let a_hash = ObjectHash::new(String::from("a").as_bytes());
258        let b_hash = ObjectHash::new(String::from("b").as_bytes());
259        let a = CacheObject {
260            info: CacheObjectInfo::BaseObject(ObjectType::Blob, a_hash),
261            data_decompressed: vec![0; 800],
262            mem_recorder: None,
263            offset: 0,
264            is_delta_in_pack: false,
265        };
266        let b = CacheObject {
267            info: CacheObjectInfo::BaseObject(ObjectType::Blob, b_hash),
268            data_decompressed: vec![0; 800],
269            mem_recorder: None,
270            offset: 0,
271            is_delta_in_pack: false,
272        };
273        // insert a
274        cache.insert(a.offset, a_hash, a.clone());
275        assert!(cache.hash_set.contains(&a_hash));
276        assert!(cache.try_get(a_hash).is_some());
277
278        // insert b, a should still be in cache
279        cache.insert(b.offset, b_hash, b.clone());
280        assert!(cache.hash_set.contains(&b_hash));
281        assert!(cache.try_get(b_hash).is_some());
282        assert!(cache.try_get(a_hash).is_some());
283
284        let c_hash = ObjectHash::new(String::from("c").as_bytes());
285        // insert c which will evict both a and b
286        let c = CacheObject {
287            info: CacheObjectInfo::BaseObject(ObjectType::Blob, c_hash),
288            data_decompressed: vec![0; 1700],
289            mem_recorder: None,
290            offset: 0,
291            is_delta_in_pack: false,
292        };
293        cache.insert(c.offset, c_hash, c.clone());
294        assert!(cache.try_get(a_hash).is_none());
295        assert!(cache.try_get(b_hash).is_none());
296        assert!(cache.try_get(c_hash).is_some());
297        assert!(cache.get_by_hash(c_hash).is_some());
298    }
299    #[test]
300    fn test_cache_single_thread_sha256() {
301        let _guard = set_hash_kind_for_test(HashKind::Sha256);
302        let source = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
303        let tmp_path = source.clone().join("tests/.cache_tmp_sha256");
304
305        if tmp_path.exists() {
306            fs::remove_dir_all(&tmp_path).unwrap();
307        }
308        let cache = Caches::new(Some(4096), tmp_path, 1);
309        let a_hash = ObjectHash::new(String::from("a").as_bytes());
310        let b_hash = ObjectHash::new(String::from("b").as_bytes());
311        let a = CacheObject {
312            info: CacheObjectInfo::BaseObject(ObjectType::Blob, a_hash),
313            data_decompressed: vec![0; 1500],
314            mem_recorder: None,
315            offset: 0,
316            is_delta_in_pack: false,
317        };
318        let b = CacheObject {
319            info: CacheObjectInfo::BaseObject(ObjectType::Blob, b_hash),
320            data_decompressed: vec![0; 1500],
321            mem_recorder: None,
322            offset: 0,
323            is_delta_in_pack: false,
324        };
325        // insert a
326        cache.insert(a.offset, a_hash, a.clone());
327        assert!(cache.hash_set.contains(&a_hash));
328        assert!(cache.try_get(a_hash).is_some());
329        // insert b, a should still be in cache
330        cache.insert(b.offset, b_hash, b.clone());
331        assert!(cache.hash_set.contains(&b_hash));
332        assert!(cache.try_get(b_hash).is_some());
333        assert!(cache.try_get(a_hash).is_some());
334        let c_hash = ObjectHash::new(String::from("c").as_bytes());
335        // insert c which will evict both a and b
336        let c = CacheObject {
337            info: CacheObjectInfo::BaseObject(ObjectType::Blob, c_hash),
338            data_decompressed: vec![0; 3000],
339            mem_recorder: None,
340            offset: 0,
341            is_delta_in_pack: false,
342        };
343        cache.insert(c.offset, c_hash, c.clone());
344        assert!(cache.try_get(a_hash).is_none());
345        assert!(cache.try_get(b_hash).is_none());
346        assert!(cache.try_get(c_hash).is_some());
347        assert!(cache.get_by_hash(c_hash).is_some());
348    }
349    #[test]
350    /// consider the multi-threaded scenario where different threads use different hash kinds
351    fn test_cache_multi_thread_mixed_hash_kinds() {
352        let base = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
353        let tmp_path = base.join("tests/.cache_tmp_mixed");
354        if tmp_path.exists() {
355            fs::remove_dir_all(&tmp_path).unwrap();
356        }
357
358        let cache = Arc::new(Caches::new(Some(4096), tmp_path, 2));
359
360        let cache_sha1 = Arc::clone(&cache);
361        let handle_sha1 = thread::spawn(move || {
362            let _g = set_hash_kind_for_test(HashKind::Sha1);
363            let hash = ObjectHash::new(b"sha1-entry");
364            let obj = CacheObject {
365                info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash),
366                data_decompressed: vec![0; 800],
367                mem_recorder: None,
368                offset: 1,
369                is_delta_in_pack: false,
370            };
371            cache_sha1.insert(obj.offset, hash, obj.clone());
372            assert!(cache_sha1.hash_set.contains(&hash));
373            assert!(cache_sha1.try_get(hash).is_some());
374        });
375
376        let cache_sha256 = Arc::clone(&cache);
377        let handle_sha256 = thread::spawn(move || {
378            let _g = set_hash_kind_for_test(HashKind::Sha256);
379            let hash = ObjectHash::new(b"sha256-entry");
380            let obj = CacheObject {
381                info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash),
382                data_decompressed: vec![0; 1500],
383                mem_recorder: None,
384                offset: 2,
385                is_delta_in_pack: false,
386            };
387            cache_sha256.insert(obj.offset, hash, obj.clone());
388            assert!(cache_sha256.hash_set.contains(&hash));
389            assert!(cache_sha256.try_get(hash).is_some());
390        });
391
392        handle_sha1.join().unwrap();
393        handle_sha256.join().unwrap();
394
395        assert_eq!(cache.total_inserted(), 2);
396    }
397}