git_internal/internal/pack/
cache.rs1use std::path::Path;
2use std::path::PathBuf;
3use std::sync::Once;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex};
6use std::thread::sleep;
7use std::{fs, io};
8
9use dashmap::{DashMap, DashSet};
10use lru_mem::LruCache;
11use threadpool::ThreadPool;
12
13use crate::hash::ObjectHash;
14use crate::internal::pack::cache_object::{
15 ArcWrapper, CacheObject, FileLoadStore, MemSizeRecorder,
16};
17use crate::time_it;
18
19pub trait _Cache {
20 fn new(mem_size: Option<usize>, tmp_path: PathBuf, thread_num: usize) -> Self
21 where
22 Self: Sized;
23 fn get_hash(&self, offset: usize) -> Option<ObjectHash>;
24 fn insert(&self, offset: usize, hash: ObjectHash, obj: CacheObject) -> Arc<CacheObject>;
25 fn get_by_offset(&self, offset: usize) -> Option<Arc<CacheObject>>;
26 fn get_by_hash(&self, h: ObjectHash) -> Option<Arc<CacheObject>>;
27 fn total_inserted(&self) -> usize;
28 fn memory_used(&self) -> usize;
29 fn clear(&self);
30}
31
32impl lru_mem::HeapSize for ObjectHash {
33 fn heap_size(&self) -> usize {
34 0
35 }
36}
37
38pub struct Caches {
39 map_offset: DashMap<usize, ObjectHash>, hash_set: DashSet<ObjectHash>, lru_cache: Mutex<LruCache<ObjectHash, ArcWrapper<CacheObject>>>,
46 mem_size: Option<usize>,
47 tmp_path: PathBuf,
48 path_prefixes: [Once; 256],
49 pool: Arc<ThreadPool>,
50 complete_signal: Arc<AtomicBool>,
51}
52
53impl Caches {
54 fn try_get(&self, hash: ObjectHash) -> Option<Arc<CacheObject>> {
56 let mut map = self.lru_cache.lock().unwrap();
57 map.get(&hash).map(|x| x.data.clone())
58 }
59
60 fn get_fallback(&self, hash: ObjectHash) -> io::Result<Arc<CacheObject>> {
63 let path = self.generate_temp_path(&self.tmp_path, hash);
64 let obj = {
66 loop {
67 match Self::read_from_temp(&path) {
68 Ok(x) => break x,
69 Err(e) if e.kind() == io::ErrorKind::NotFound => {
70 sleep(std::time::Duration::from_millis(10));
71 continue;
72 }
73 Err(e) => return Err(e), }
75 }
76 };
77
78 let mut map = self.lru_cache.lock().unwrap();
79 let obj = Arc::new(obj);
80 let mut x = ArcWrapper::new(
81 obj.clone(),
82 self.complete_signal.clone(),
83 Some(self.pool.clone()),
84 );
85 x.set_store_path(path);
86 let _ = map.insert(hash, x); Ok(obj)
88 }
89
90 fn generate_temp_path(&self, tmp_path: &Path, hash: ObjectHash) -> PathBuf {
92 let mut path =
94 PathBuf::with_capacity(self.tmp_path.capacity() + hash.to_string().len() + 5);
95 path.push(tmp_path);
96 let hash_str = hash._to_string();
97 path.push(&hash_str[..2]); self.path_prefixes[hash.as_ref()[0] as usize].call_once(|| {
99 if !path.exists() {
101 fs::create_dir_all(&path).unwrap();
102 }
103 });
104 path.push(hash_str);
105 path
106 }
107
108 fn read_from_temp(path: &Path) -> io::Result<CacheObject> {
109 let obj = CacheObject::f_load(path)?;
110 obj.record_mem_size();
113 Ok(obj)
114 }
115
116 pub fn queued_tasks(&self) -> usize {
117 self.pool.queued_count()
118 }
119
120 pub fn memory_used_index(&self) -> usize {
122 self.map_offset.capacity()
123 * (std::mem::size_of::<usize>() + std::mem::size_of::<ObjectHash>())
124 + self.hash_set.capacity() * (std::mem::size_of::<ObjectHash>())
125 }
126
127 pub fn remove_tmp_dir(&self) {
129 time_it!("Remove tmp dir", {
130 if self.tmp_path.exists() {
131 fs::remove_dir_all(&self.tmp_path).unwrap(); }
133 });
134 }
135}
136
137impl _Cache for Caches {
138 fn new(mem_size: Option<usize>, tmp_path: PathBuf, thread_num: usize) -> Self
141 where
142 Self: Sized,
143 {
144 if mem_size.is_some() {
146 fs::create_dir_all(&tmp_path).unwrap();
147 }
148
149 Caches {
150 map_offset: DashMap::new(),
151 hash_set: DashSet::new(),
152 lru_cache: Mutex::new(LruCache::new(mem_size.unwrap_or(usize::MAX))),
153 mem_size,
154 tmp_path,
155 path_prefixes: [const { Once::new() }; 256],
156 pool: Arc::new(ThreadPool::new(thread_num)),
157 complete_signal: Arc::new(AtomicBool::new(false)),
158 }
159 }
160
161 fn get_hash(&self, offset: usize) -> Option<ObjectHash> {
162 self.map_offset.get(&offset).map(|x| *x)
163 }
164
165 fn insert(&self, offset: usize, hash: ObjectHash, obj: CacheObject) -> Arc<CacheObject> {
166 let obj_arc = Arc::new(obj);
167 {
168 let mut map = self.lru_cache.lock().unwrap();
170 let mut a_obj = ArcWrapper::new(
171 obj_arc.clone(),
172 self.complete_signal.clone(),
173 Some(self.pool.clone()),
174 );
175 if self.mem_size.is_some() {
176 a_obj.set_store_path(self.generate_temp_path(&self.tmp_path, hash));
177 }
178 let _ = map.insert(hash, a_obj);
179 }
180 self.hash_set.insert(hash);
182 self.map_offset.insert(offset, hash);
183
184 obj_arc
185 }
186
187 fn get_by_offset(&self, offset: usize) -> Option<Arc<CacheObject>> {
188 match self.map_offset.get(&offset) {
189 Some(x) => self.get_by_hash(*x),
190 None => None,
191 }
192 }
193
194 fn get_by_hash(&self, hash: ObjectHash) -> Option<Arc<CacheObject>> {
195 if self.hash_set.contains(&hash) {
197 match self.try_get(hash) {
198 Some(x) => Some(x),
199 None => {
200 if self.mem_size.is_none() {
201 panic!("should not be here when mem_size is not set")
202 }
203 self.get_fallback(hash).ok()
204 }
205 }
206 } else {
207 None
208 }
209 }
210
211 fn total_inserted(&self) -> usize {
212 self.hash_set.len()
213 }
214 fn memory_used(&self) -> usize {
215 self.lru_cache.lock().unwrap().current_size() + self.memory_used_index()
216 }
217 fn clear(&self) {
218 time_it!("Caches clear", {
219 self.complete_signal.store(true, Ordering::Release);
220 self.pool.join();
221 self.lru_cache.lock().unwrap().clear();
222 self.hash_set.clear();
223 self.hash_set.shrink_to_fit();
224 self.map_offset.clear();
225 self.map_offset.shrink_to_fit();
226 });
227
228 assert_eq!(self.pool.queued_count(), 0);
229 assert_eq!(self.pool.active_count(), 0);
230 assert_eq!(self.lru_cache.lock().unwrap().len(), 0);
231 }
232}
233
234#[cfg(test)]
235mod test {
236 use std::env;
237
238 use super::*;
239 use crate::{
240 hash::{HashKind, ObjectHash, set_hash_kind_for_test},
241 internal::{object::types::ObjectType, pack::cache_object::CacheObjectInfo},
242 };
243 use std::sync::Arc;
244 use std::thread;
245
246 #[test]
247 fn test_cache_single_thread() {
248 let _guard = set_hash_kind_for_test(HashKind::Sha1);
249 let source = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
250 let tmp_path = source.clone().join("tests/.cache_tmp");
251
252 if tmp_path.exists() {
253 fs::remove_dir_all(&tmp_path).unwrap();
254 }
255
256 let cache = Caches::new(Some(2048), tmp_path, 1);
257 let a_hash = ObjectHash::new(String::from("a").as_bytes());
258 let b_hash = ObjectHash::new(String::from("b").as_bytes());
259 let a = CacheObject {
260 info: CacheObjectInfo::BaseObject(ObjectType::Blob, a_hash),
261 data_decompressed: vec![0; 800],
262 mem_recorder: None,
263 offset: 0,
264 is_delta_in_pack: false,
265 };
266 let b = CacheObject {
267 info: CacheObjectInfo::BaseObject(ObjectType::Blob, b_hash),
268 data_decompressed: vec![0; 800],
269 mem_recorder: None,
270 offset: 0,
271 is_delta_in_pack: false,
272 };
273 cache.insert(a.offset, a_hash, a.clone());
275 assert!(cache.hash_set.contains(&a_hash));
276 assert!(cache.try_get(a_hash).is_some());
277
278 cache.insert(b.offset, b_hash, b.clone());
280 assert!(cache.hash_set.contains(&b_hash));
281 assert!(cache.try_get(b_hash).is_some());
282 assert!(cache.try_get(a_hash).is_some());
283
284 let c_hash = ObjectHash::new(String::from("c").as_bytes());
285 let c = CacheObject {
287 info: CacheObjectInfo::BaseObject(ObjectType::Blob, c_hash),
288 data_decompressed: vec![0; 1700],
289 mem_recorder: None,
290 offset: 0,
291 is_delta_in_pack: false,
292 };
293 cache.insert(c.offset, c_hash, c.clone());
294 assert!(cache.try_get(a_hash).is_none());
295 assert!(cache.try_get(b_hash).is_none());
296 assert!(cache.try_get(c_hash).is_some());
297 assert!(cache.get_by_hash(c_hash).is_some());
298 }
299 #[test]
300 fn test_cache_single_thread_sha256() {
301 let _guard = set_hash_kind_for_test(HashKind::Sha256);
302 let source = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
303 let tmp_path = source.clone().join("tests/.cache_tmp_sha256");
304
305 if tmp_path.exists() {
306 fs::remove_dir_all(&tmp_path).unwrap();
307 }
308 let cache = Caches::new(Some(4096), tmp_path, 1);
309 let a_hash = ObjectHash::new(String::from("a").as_bytes());
310 let b_hash = ObjectHash::new(String::from("b").as_bytes());
311 let a = CacheObject {
312 info: CacheObjectInfo::BaseObject(ObjectType::Blob, a_hash),
313 data_decompressed: vec![0; 1500],
314 mem_recorder: None,
315 offset: 0,
316 is_delta_in_pack: false,
317 };
318 let b = CacheObject {
319 info: CacheObjectInfo::BaseObject(ObjectType::Blob, b_hash),
320 data_decompressed: vec![0; 1500],
321 mem_recorder: None,
322 offset: 0,
323 is_delta_in_pack: false,
324 };
325 cache.insert(a.offset, a_hash, a.clone());
327 assert!(cache.hash_set.contains(&a_hash));
328 assert!(cache.try_get(a_hash).is_some());
329 cache.insert(b.offset, b_hash, b.clone());
331 assert!(cache.hash_set.contains(&b_hash));
332 assert!(cache.try_get(b_hash).is_some());
333 assert!(cache.try_get(a_hash).is_some());
334 let c_hash = ObjectHash::new(String::from("c").as_bytes());
335 let c = CacheObject {
337 info: CacheObjectInfo::BaseObject(ObjectType::Blob, c_hash),
338 data_decompressed: vec![0; 3000],
339 mem_recorder: None,
340 offset: 0,
341 is_delta_in_pack: false,
342 };
343 cache.insert(c.offset, c_hash, c.clone());
344 assert!(cache.try_get(a_hash).is_none());
345 assert!(cache.try_get(b_hash).is_none());
346 assert!(cache.try_get(c_hash).is_some());
347 assert!(cache.get_by_hash(c_hash).is_some());
348 }
349 #[test]
350 fn test_cache_multi_thread_mixed_hash_kinds() {
352 let base = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
353 let tmp_path = base.join("tests/.cache_tmp_mixed");
354 if tmp_path.exists() {
355 fs::remove_dir_all(&tmp_path).unwrap();
356 }
357
358 let cache = Arc::new(Caches::new(Some(4096), tmp_path, 2));
359
360 let cache_sha1 = Arc::clone(&cache);
361 let handle_sha1 = thread::spawn(move || {
362 let _g = set_hash_kind_for_test(HashKind::Sha1);
363 let hash = ObjectHash::new(b"sha1-entry");
364 let obj = CacheObject {
365 info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash),
366 data_decompressed: vec![0; 800],
367 mem_recorder: None,
368 offset: 1,
369 is_delta_in_pack: false,
370 };
371 cache_sha1.insert(obj.offset, hash, obj.clone());
372 assert!(cache_sha1.hash_set.contains(&hash));
373 assert!(cache_sha1.try_get(hash).is_some());
374 });
375
376 let cache_sha256 = Arc::clone(&cache);
377 let handle_sha256 = thread::spawn(move || {
378 let _g = set_hash_kind_for_test(HashKind::Sha256);
379 let hash = ObjectHash::new(b"sha256-entry");
380 let obj = CacheObject {
381 info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash),
382 data_decompressed: vec![0; 1500],
383 mem_recorder: None,
384 offset: 2,
385 is_delta_in_pack: false,
386 };
387 cache_sha256.insert(obj.offset, hash, obj.clone());
388 assert!(cache_sha256.hash_set.contains(&hash));
389 assert!(cache_sha256.try_get(hash).is_some());
390 });
391
392 handle_sha1.join().unwrap();
393 handle_sha256.join().unwrap();
394
395 assert_eq!(cache.total_inserted(), 2);
396 }
397}