git_internal/internal/pack/
cache.rs

1use std::path::Path;
2use std::path::PathBuf;
3use std::sync::Once;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex};
6use std::thread::sleep;
7use std::{fs, io};
8
9use dashmap::{DashMap, DashSet};
10use lru_mem::LruCache;
11use threadpool::ThreadPool;
12
13use crate::hash::SHA1;
14use crate::internal::pack::cache_object::{
15    ArcWrapper, CacheObject, FileLoadStore, MemSizeRecorder,
16};
17use crate::time_it;
18
19pub trait _Cache {
20    fn new(mem_size: Option<usize>, tmp_path: PathBuf, thread_num: usize) -> Self
21    where
22        Self: Sized;
23    fn get_hash(&self, offset: usize) -> Option<SHA1>;
24    fn insert(&self, offset: usize, hash: SHA1, obj: CacheObject) -> Arc<CacheObject>;
25    fn get_by_offset(&self, offset: usize) -> Option<Arc<CacheObject>>;
26    fn get_by_hash(&self, h: SHA1) -> Option<Arc<CacheObject>>;
27    fn total_inserted(&self) -> usize;
28    fn memory_used(&self) -> usize;
29    fn clear(&self);
30}
31
32impl lru_mem::HeapSize for SHA1 {
33    fn heap_size(&self) -> usize {
34        0
35    }
36}
37
38pub struct Caches {
39    map_offset: DashMap<usize, SHA1>, // offset to hash
40    hash_set: DashSet<SHA1>,          // item in the cache
41    // dropping large lru cache will take a long time on Windows without multi-thread IO
42    // because "multi-thread IO" clone Arc<CacheObject>, so it won't be dropped in the main thread,
43    // and `CacheObjects` will be killed by OS after Process ends abnormally
44    // Solution: use `mimalloc`
45    lru_cache: Mutex<LruCache<SHA1, ArcWrapper<CacheObject>>>,
46    mem_size: Option<usize>,
47    tmp_path: PathBuf,
48    path_prefixes: [Once; 256],
49    pool: Arc<ThreadPool>,
50    complete_signal: Arc<AtomicBool>,
51}
52
53impl Caches {
54    /// only get object from memory, not from tmp file
55    fn try_get(&self, hash: SHA1) -> Option<Arc<CacheObject>> {
56        let mut map = self.lru_cache.lock().unwrap();
57        map.get(&hash).map(|x| x.data.clone())
58    }
59
60    /// !IMPORTANT: because of the process of pack, the file must be written / be writing before, so it won't be dead lock
61    /// fall back to temp to get item. **invoker should ensure the hash is in the cache, or it will block forever**
62    fn get_fallback(&self, hash: SHA1) -> io::Result<Arc<CacheObject>> {
63        let path = self.generate_temp_path(&self.tmp_path, hash);
64        // read from tmp file
65        let obj = {
66            loop {
67                match Self::read_from_temp(&path) {
68                    Ok(x) => break x,
69                    Err(e) if e.kind() == io::ErrorKind::NotFound => {
70                        sleep(std::time::Duration::from_millis(10));
71                        continue;
72                    }
73                    Err(e) => return Err(e), // other error
74                }
75            }
76        };
77
78        let mut map = self.lru_cache.lock().unwrap();
79        let obj = Arc::new(obj);
80        let mut x = ArcWrapper::new(
81            obj.clone(),
82            self.complete_signal.clone(),
83            Some(self.pool.clone()),
84        );
85        x.set_store_path(path);
86        let _ = map.insert(hash, x); // handle the error
87        Ok(obj)
88    }
89
90    /// generate the temp file path, hex string of the hash
91    fn generate_temp_path(&self, tmp_path: &Path, hash: SHA1) -> PathBuf {
92        // This is enough for the original path, 2 chars directory, 40 chars hash, and extra slashes
93        let mut path = PathBuf::with_capacity(self.tmp_path.capacity() + SHA1::SIZE * 2 + 5);
94        path.push(tmp_path);
95        let hash_str = hash._to_string();
96        path.push(&hash_str[..2]); // use first 2 chars as the directory
97        self.path_prefixes[hash.as_ref()[0] as usize].call_once(|| {
98            // Check if the directory exists, if not, create it
99            if !path.exists() {
100                fs::create_dir_all(&path).unwrap();
101            }
102        });
103        path.push(hash_str);
104        path
105    }
106
107    fn read_from_temp(path: &Path) -> io::Result<CacheObject> {
108        let obj = CacheObject::f_load(path)?;
109        // Deserializing will also create an object but without Construction outside and `::new()`
110        // So if you want to do sth. while Constructing, impl Deserialize trait yourself
111        obj.record_mem_size();
112        Ok(obj)
113    }
114
115    pub fn queued_tasks(&self) -> usize {
116        self.pool.queued_count()
117    }
118
119    /// memory used by the index (exclude lru_cache which is contained in CacheObject::get_mem_size())
120    pub fn memory_used_index(&self) -> usize {
121        self.map_offset.capacity() * (std::mem::size_of::<usize>() + std::mem::size_of::<SHA1>())
122            + self.hash_set.capacity() * (std::mem::size_of::<SHA1>())
123    }
124
125    /// remove the tmp dir
126    pub fn remove_tmp_dir(&self) {
127        time_it!("Remove tmp dir", {
128            if self.tmp_path.exists() {
129                fs::remove_dir_all(&self.tmp_path).unwrap(); //very slow
130            }
131        });
132    }
133}
134
135impl _Cache for Caches {
136    /// @param size: the size of the memory lru cache. **None means no limit**
137    /// @param tmp_path: the path to store the cache object in the tmp file
138    fn new(mem_size: Option<usize>, tmp_path: PathBuf, thread_num: usize) -> Self
139    where
140        Self: Sized,
141    {
142        // `None` means no limit, so no need to create the tmp dir
143        if mem_size.is_some() {
144            fs::create_dir_all(&tmp_path).unwrap();
145        }
146
147        Caches {
148            map_offset: DashMap::new(),
149            hash_set: DashSet::new(),
150            lru_cache: Mutex::new(LruCache::new(mem_size.unwrap_or(usize::MAX))),
151            mem_size,
152            tmp_path,
153            path_prefixes: [const { Once::new() }; 256],
154            pool: Arc::new(ThreadPool::new(thread_num)),
155            complete_signal: Arc::new(AtomicBool::new(false)),
156        }
157    }
158
159    fn get_hash(&self, offset: usize) -> Option<SHA1> {
160        self.map_offset.get(&offset).map(|x| *x)
161    }
162
163    fn insert(&self, offset: usize, hash: SHA1, obj: CacheObject) -> Arc<CacheObject> {
164        let obj_arc = Arc::new(obj);
165        {
166            // ? whether insert to cache directly or only write to tmp file
167            let mut map = self.lru_cache.lock().unwrap();
168            let mut a_obj = ArcWrapper::new(
169                obj_arc.clone(),
170                self.complete_signal.clone(),
171                Some(self.pool.clone()),
172            );
173            if self.mem_size.is_some() {
174                a_obj.set_store_path(self.generate_temp_path(&self.tmp_path, hash));
175            }
176            let _ = map.insert(hash, a_obj);
177        }
178        //order maters as for reading in 'get_by_offset()'
179        self.hash_set.insert(hash);
180        self.map_offset.insert(offset, hash);
181
182        obj_arc
183    }
184
185    fn get_by_offset(&self, offset: usize) -> Option<Arc<CacheObject>> {
186        match self.map_offset.get(&offset) {
187            Some(x) => self.get_by_hash(*x),
188            None => None,
189        }
190    }
191
192    fn get_by_hash(&self, hash: SHA1) -> Option<Arc<CacheObject>> {
193        // check if the hash is in the cache( lru or tmp file)
194        if self.hash_set.contains(&hash) {
195            match self.try_get(hash) {
196                Some(x) => Some(x),
197                None => {
198                    if self.mem_size.is_none() {
199                        panic!("should not be here when mem_size is not set")
200                    }
201                    self.get_fallback(hash).ok()
202                }
203            }
204        } else {
205            None
206        }
207    }
208
209    fn total_inserted(&self) -> usize {
210        self.hash_set.len()
211    }
212    fn memory_used(&self) -> usize {
213        self.lru_cache.lock().unwrap().current_size() + self.memory_used_index()
214    }
215    fn clear(&self) {
216        time_it!("Caches clear", {
217            self.complete_signal.store(true, Ordering::Release);
218            self.pool.join();
219            self.lru_cache.lock().unwrap().clear();
220            self.hash_set.clear();
221            self.hash_set.shrink_to_fit();
222            self.map_offset.clear();
223            self.map_offset.shrink_to_fit();
224        });
225
226        assert_eq!(self.pool.queued_count(), 0);
227        assert_eq!(self.pool.active_count(), 0);
228        assert_eq!(self.lru_cache.lock().unwrap().len(), 0);
229    }
230}
231
232#[cfg(test)]
233mod test {
234    use std::env;
235
236    use super::*;
237    use crate::{
238        hash::SHA1,
239        internal::{object::types::ObjectType, pack::cache_object::CacheObjectInfo},
240    };
241
242    #[test]
243    fn test_cache_single_thread() {
244        let source = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
245        let tmp_path = source.clone().join("tests/.cache_tmp");
246
247        if tmp_path.exists() {
248            fs::remove_dir_all(&tmp_path).unwrap();
249        }
250
251        let cache = Caches::new(Some(2048), tmp_path, 1);
252        let a_hash = SHA1::new(String::from("a").as_bytes());
253        let b_hash = SHA1::new(String::from("b").as_bytes());
254        let a = CacheObject {
255            info: CacheObjectInfo::BaseObject(ObjectType::Blob, a_hash),
256            data_decompressed: vec![0; 800],
257            mem_recorder: None,
258            offset: 0,
259            is_delta_in_pack: false,
260        };
261        let b = CacheObject {
262            info: CacheObjectInfo::BaseObject(ObjectType::Blob, b_hash),
263            data_decompressed: vec![0; 800],
264            mem_recorder: None,
265            offset: 0,
266            is_delta_in_pack: false,
267        };
268        // insert a
269        cache.insert(a.offset, a_hash, a.clone());
270        assert!(cache.hash_set.contains(&a_hash));
271        assert!(cache.try_get(a_hash).is_some());
272
273        // insert b, a should still be in cache
274        cache.insert(b.offset, b_hash, b.clone());
275        assert!(cache.hash_set.contains(&b_hash));
276        assert!(cache.try_get(b_hash).is_some());
277        assert!(cache.try_get(a_hash).is_some());
278
279        let c_hash = SHA1::new(String::from("c").as_bytes());
280        // insert c which will evict both a and b
281        let c = CacheObject {
282            info: CacheObjectInfo::BaseObject(ObjectType::Blob, c_hash),
283            data_decompressed: vec![0; 1700],
284            mem_recorder: None,
285            offset: 0,
286            is_delta_in_pack: false,
287        };
288        cache.insert(c.offset, c_hash, c.clone());
289        assert!(cache.try_get(a_hash).is_none());
290        assert!(cache.try_get(b_hash).is_none());
291        assert!(cache.try_get(c_hash).is_some());
292        assert!(cache.get_by_hash(c_hash).is_some());
293    }
294}