git_internal/internal/pack/
cache.rs1use std::{
5 fs, io,
6 path::{Path, PathBuf},
7 sync::{
8 Arc, Mutex, Once,
9 atomic::{AtomicBool, Ordering},
10 },
11 thread::sleep,
12};
13
14use dashmap::{DashMap, DashSet};
15use lru_mem::LruCache;
16use threadpool::ThreadPool;
17
18use crate::{
19 hash::ObjectHash,
20 internal::pack::cache_object::{ArcWrapper, CacheObject, FileLoadStore, MemSizeRecorder},
21 time_it,
22};
23
24pub trait _Cache {
28 fn new(mem_size: Option<usize>, tmp_path: PathBuf, thread_num: usize) -> Self
29 where
30 Self: Sized;
31 fn get_hash(&self, offset: usize) -> Option<ObjectHash>;
32 fn insert(&self, offset: usize, hash: ObjectHash, obj: CacheObject) -> Arc<CacheObject>;
33 fn get_by_offset(&self, offset: usize) -> Option<Arc<CacheObject>>;
34 fn get_by_hash(&self, h: ObjectHash) -> Option<Arc<CacheObject>>;
35 fn total_inserted(&self) -> usize;
36 fn memory_used(&self) -> usize;
37 fn clear(&self);
38}
39
40impl lru_mem::HeapSize for ObjectHash {
41 fn heap_size(&self) -> usize {
42 0
43 }
44}
45
46pub struct Caches {
50 map_offset: DashMap<usize, ObjectHash>, hash_set: DashSet<ObjectHash>, lru_cache: Mutex<LruCache<ObjectHash, ArcWrapper<CacheObject>>>,
57 mem_size: Option<usize>,
58 tmp_path: PathBuf,
59 path_prefixes: [Once; 256],
60 pool: Arc<ThreadPool>,
61 complete_signal: Arc<AtomicBool>,
62}
63
64impl Caches {
65 fn try_get(&self, hash: ObjectHash) -> Option<Arc<CacheObject>> {
67 let mut map = self.lru_cache.lock().unwrap();
68 map.get(&hash).map(|x| x.data.clone())
69 }
70
71 fn get_fallback(&self, hash: ObjectHash) -> io::Result<Arc<CacheObject>> {
74 let path = self.generate_temp_path(&self.tmp_path, hash);
75 let obj = {
77 loop {
78 match Self::read_from_temp(&path) {
79 Ok(x) => break x,
80 Err(e) if e.kind() == io::ErrorKind::NotFound => {
81 sleep(std::time::Duration::from_millis(10));
82 continue;
83 }
84 Err(e) => return Err(e), }
86 }
87 };
88
89 let mut map = self.lru_cache.lock().unwrap();
90 let obj = Arc::new(obj);
91 let mut x = ArcWrapper::new(
92 obj.clone(),
93 self.complete_signal.clone(),
94 Some(self.pool.clone()),
95 );
96 x.set_store_path(path);
97 let _ = map.insert(hash, x); Ok(obj)
99 }
100
101 fn generate_temp_path(&self, tmp_path: &Path, hash: ObjectHash) -> PathBuf {
103 let mut path =
105 PathBuf::with_capacity(self.tmp_path.capacity() + hash.to_string().len() + 5);
106 path.push(tmp_path);
107 let hash_str = hash._to_string();
108 path.push(&hash_str[..2]); self.path_prefixes[hash.as_ref()[0] as usize].call_once(|| {
110 if !path.exists() {
112 fs::create_dir_all(&path).unwrap();
113 }
114 });
115 path.push(hash_str);
116 path
117 }
118
119 fn read_from_temp(path: &Path) -> io::Result<CacheObject> {
121 let obj = CacheObject::f_load(path)?;
122 obj.record_mem_size();
125 Ok(obj)
126 }
127
128 pub fn queued_tasks(&self) -> usize {
130 self.pool.queued_count()
131 }
132
133 pub fn memory_used_index(&self) -> usize {
135 self.map_offset.capacity()
136 * (std::mem::size_of::<usize>() + std::mem::size_of::<ObjectHash>())
137 + self.hash_set.capacity() * (std::mem::size_of::<ObjectHash>())
138 }
139
140 pub fn remove_tmp_dir(&self) {
142 time_it!("Remove tmp dir", {
143 if self.tmp_path.exists() {
144 fs::remove_dir_all(&self.tmp_path).unwrap(); if let Some(parent) = self.tmp_path.parent() {
147 let is_cache_temp = parent
148 .file_name()
149 .and_then(|n| n.to_str())
150 .map(|n| n == ".cache_temp")
151 .unwrap_or(false);
152 if is_cache_temp {
153 let _ = fs::remove_dir(parent);
156 }
157 }
158 }
159 });
160 }
161}
162
163impl _Cache for Caches {
164 fn new(mem_size: Option<usize>, tmp_path: PathBuf, thread_num: usize) -> Self
167 where
168 Self: Sized,
169 {
170 if mem_size.is_some() {
172 fs::create_dir_all(&tmp_path).unwrap();
173 }
174
175 Caches {
176 map_offset: DashMap::new(),
177 hash_set: DashSet::new(),
178 lru_cache: Mutex::new(LruCache::new(mem_size.unwrap_or(usize::MAX))),
179 mem_size,
180 tmp_path,
181 path_prefixes: [const { Once::new() }; 256],
182 pool: Arc::new(ThreadPool::new(thread_num)),
183 complete_signal: Arc::new(AtomicBool::new(false)),
184 }
185 }
186
187 fn get_hash(&self, offset: usize) -> Option<ObjectHash> {
188 self.map_offset.get(&offset).map(|x| *x)
189 }
190
191 fn insert(&self, offset: usize, hash: ObjectHash, obj: CacheObject) -> Arc<CacheObject> {
192 let obj_arc = Arc::new(obj);
193 {
194 let mut map = self.lru_cache.lock().unwrap();
196 let mut a_obj = ArcWrapper::new(
197 obj_arc.clone(),
198 self.complete_signal.clone(),
199 Some(self.pool.clone()),
200 );
201 if self.mem_size.is_some() {
202 a_obj.set_store_path(self.generate_temp_path(&self.tmp_path, hash));
203 }
204 let _ = map.insert(hash, a_obj);
205 }
206 self.hash_set.insert(hash);
208 self.map_offset.insert(offset, hash);
209
210 obj_arc
211 }
212
213 fn get_by_offset(&self, offset: usize) -> Option<Arc<CacheObject>> {
215 match self.map_offset.get(&offset) {
216 Some(x) => self.get_by_hash(*x),
217 None => None,
218 }
219 }
220
221 fn get_by_hash(&self, hash: ObjectHash) -> Option<Arc<CacheObject>> {
223 if self.hash_set.contains(&hash) {
225 match self.try_get(hash) {
226 Some(x) => Some(x),
227 None => {
228 if self.mem_size.is_none() {
229 panic!("should not be here when mem_size is not set")
230 }
231 self.get_fallback(hash).ok()
232 }
233 }
234 } else {
235 None
236 }
237 }
238
239 fn total_inserted(&self) -> usize {
240 self.hash_set.len()
241 }
242 fn memory_used(&self) -> usize {
243 self.lru_cache.lock().unwrap().current_size() + self.memory_used_index()
244 }
245 fn clear(&self) {
246 time_it!("Caches clear", {
247 self.complete_signal.store(true, Ordering::Release);
248 self.pool.join();
249 self.lru_cache.lock().unwrap().clear();
250 self.hash_set.clear();
251 self.hash_set.shrink_to_fit();
252 self.map_offset.clear();
253 self.map_offset.shrink_to_fit();
254 });
255
256 assert_eq!(self.pool.queued_count(), 0);
257 assert_eq!(self.pool.active_count(), 0);
258 assert_eq!(self.lru_cache.lock().unwrap().len(), 0);
259 }
260}
261
262#[cfg(test)]
263mod test {
264 use std::{env, sync::Arc, thread};
265
266 use super::*;
267 use crate::{
268 hash::{HashKind, ObjectHash, set_hash_kind_for_test},
269 internal::{object::types::ObjectType, pack::cache_object::CacheObjectInfo},
270 };
271
272 fn make_obj(size: usize, hash: ObjectHash) -> CacheObject {
274 CacheObject {
275 info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash),
276 data_decompressed: vec![0; size],
277 mem_recorder: None,
278 offset: 0,
279 crc32: 0,
280 is_delta_in_pack: false,
281 }
282 }
283
284 #[test]
286 fn test_cache_single_thread() {
287 for (kind, cap, size_ab, size_c, tmp_dir) in [
288 (
289 HashKind::Sha1,
290 2048usize,
291 800usize,
292 1700usize,
293 "tests/.cache_tmp",
294 ),
295 (
296 HashKind::Sha256,
297 4096usize,
298 1500usize,
299 3000usize,
300 "tests/.cache_tmp_sha256",
301 ),
302 ] {
303 let _guard = set_hash_kind_for_test(kind);
304 let source = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
305 let tmp_path = source.clone().join(tmp_dir);
306 if tmp_path.exists() {
307 fs::remove_dir_all(&tmp_path).unwrap();
308 }
309
310 let cache = Caches::new(Some(cap), tmp_path, 1);
311 let a_hash = ObjectHash::new(String::from("a").as_bytes());
312 let b_hash = ObjectHash::new(String::from("b").as_bytes());
313 let c_hash = ObjectHash::new(String::from("c").as_bytes());
314
315 let a = make_obj(size_ab, a_hash);
316 let b = make_obj(size_ab, b_hash);
317 let c = make_obj(size_c, c_hash);
318
319 cache.insert(a.offset, a_hash, a.clone());
321 assert!(cache.hash_set.contains(&a_hash));
322 assert!(cache.try_get(a_hash).is_some());
323
324 cache.insert(b.offset, b_hash, b.clone());
326 assert!(cache.hash_set.contains(&b_hash));
327 assert!(cache.try_get(b_hash).is_some());
328 assert!(cache.try_get(a_hash).is_some());
329
330 cache.insert(c.offset, c_hash, c.clone());
332 assert!(cache.try_get(a_hash).is_none());
333 assert!(cache.try_get(b_hash).is_none());
334 assert!(cache.try_get(c_hash).is_some());
335 assert!(cache.get_by_hash(c_hash).is_some());
336 }
337 }
338
339 #[test]
341 fn test_cache_multi_thread_mixed_hash_kinds() {
342 let base = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
343 let tmp_path = base.join("tests/.cache_tmp_mixed");
344 if tmp_path.exists() {
345 fs::remove_dir_all(&tmp_path).unwrap();
346 }
347
348 let cache = Arc::new(Caches::new(Some(4096), tmp_path, 2));
349
350 let cache_sha1 = Arc::clone(&cache);
351 let handle_sha1 = thread::spawn(move || {
352 let _g = set_hash_kind_for_test(HashKind::Sha1);
353 let hash = ObjectHash::new(b"sha1-entry");
354 let obj = CacheObject {
355 info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash),
356 data_decompressed: vec![0; 800],
357 mem_recorder: None,
358 offset: 1,
359 crc32: 0,
360 is_delta_in_pack: false,
361 };
362 cache_sha1.insert(obj.offset, hash, obj.clone());
363 assert!(cache_sha1.hash_set.contains(&hash));
364 assert!(cache_sha1.try_get(hash).is_some());
365 });
366
367 let cache_sha256 = Arc::clone(&cache);
368 let handle_sha256 = thread::spawn(move || {
369 let _g = set_hash_kind_for_test(HashKind::Sha256);
370 let hash = ObjectHash::new(b"sha256-entry");
371 let obj = CacheObject {
372 info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash),
373 data_decompressed: vec![0; 1500],
374 mem_recorder: None,
375 offset: 2,
376 crc32: 0,
377 is_delta_in_pack: false,
378 };
379 cache_sha256.insert(obj.offset, hash, obj.clone());
380 assert!(cache_sha256.hash_set.contains(&hash));
381 assert!(cache_sha256.try_get(hash).is_some());
382 });
383
384 handle_sha1.join().unwrap();
385 handle_sha256.join().unwrap();
386
387 assert_eq!(cache.total_inserted(), 2);
388 }
389}