git_internal/internal/pack/
cache.rs1use std::{
5 fs, io,
6 path::{Path, PathBuf},
7 sync::{
8 Arc, Mutex, Once,
9 atomic::{AtomicBool, Ordering},
10 },
11 thread::sleep,
12};
13
14use dashmap::{DashMap, DashSet};
15use lru_mem::LruCache;
16use threadpool::ThreadPool;
17
18use crate::{
19 hash::ObjectHash,
20 internal::pack::cache_object::{ArcWrapper, CacheObject, FileLoadStore, MemSizeRecorder},
21 time_it,
22};
23
24pub trait _Cache {
28 fn new(mem_size: Option<usize>, tmp_path: PathBuf, thread_num: usize) -> Self
29 where
30 Self: Sized;
31 fn get_hash(&self, offset: usize) -> Option<ObjectHash>;
32 fn insert(&self, offset: usize, hash: ObjectHash, obj: CacheObject) -> Arc<CacheObject>;
33 fn get_by_offset(&self, offset: usize) -> Option<Arc<CacheObject>>;
34 fn get_by_hash(&self, h: ObjectHash) -> Option<Arc<CacheObject>>;
35 fn total_inserted(&self) -> usize;
36 fn memory_used(&self) -> usize;
37 fn clear(&self);
38}
39
40impl lru_mem::HeapSize for ObjectHash {
41 fn heap_size(&self) -> usize {
42 0
43 }
44}
45
46pub struct Caches {
50 map_offset: DashMap<usize, ObjectHash>, hash_set: DashSet<ObjectHash>, lru_cache: Mutex<LruCache<ObjectHash, ArcWrapper<CacheObject>>>,
57 mem_size: Option<usize>,
58 tmp_path: PathBuf,
59 path_prefixes: [Once; 256],
60 pool: Arc<ThreadPool>,
61 complete_signal: Arc<AtomicBool>,
62}
63
64impl Caches {
65 fn try_get(&self, hash: ObjectHash) -> Option<Arc<CacheObject>> {
67 let mut map = self.lru_cache.lock().unwrap();
68 map.get(&hash).map(|x| x.data.clone())
69 }
70
71 fn get_fallback(&self, hash: ObjectHash) -> io::Result<Arc<CacheObject>> {
74 let path = self.generate_temp_path(&self.tmp_path, hash);
75 let obj = {
77 loop {
78 match Self::read_from_temp(&path) {
79 Ok(x) => break x,
80 Err(e) if e.kind() == io::ErrorKind::NotFound => {
81 sleep(std::time::Duration::from_millis(10));
82 continue;
83 }
84 Err(e) => return Err(e), }
86 }
87 };
88
89 let mut map = self.lru_cache.lock().unwrap();
90 let obj = Arc::new(obj);
91 let mut x = ArcWrapper::new(
92 obj.clone(),
93 self.complete_signal.clone(),
94 Some(self.pool.clone()),
95 );
96 x.set_store_path(path);
97 let _ = map.insert(hash, x); Ok(obj)
99 }
100
101 fn generate_temp_path(&self, tmp_path: &Path, hash: ObjectHash) -> PathBuf {
103 let mut path =
105 PathBuf::with_capacity(self.tmp_path.capacity() + hash.to_string().len() + 5);
106 path.push(tmp_path);
107 let hash_str = hash._to_string();
108 path.push(&hash_str[..2]); self.path_prefixes[hash.as_ref()[0] as usize].call_once(|| {
110 if !path.exists() {
112 fs::create_dir_all(&path).unwrap();
113 }
114 });
115 path.push(hash_str);
116 path
117 }
118
119 fn read_from_temp(path: &Path) -> io::Result<CacheObject> {
121 let obj = CacheObject::f_load(path)?;
122 obj.record_mem_size();
125 Ok(obj)
126 }
127
128 pub fn queued_tasks(&self) -> usize {
130 self.pool.queued_count()
131 }
132
133 pub fn memory_used_index(&self) -> usize {
135 self.map_offset.capacity()
136 * (std::mem::size_of::<usize>() + std::mem::size_of::<ObjectHash>())
137 + self.hash_set.capacity() * (std::mem::size_of::<ObjectHash>())
138 }
139
140 pub fn remove_tmp_dir(&self) {
142 time_it!("Remove tmp dir", {
143 if self.tmp_path.exists() {
144 fs::remove_dir_all(&self.tmp_path).unwrap(); }
146 });
147 }
148}
149
150impl _Cache for Caches {
151 fn new(mem_size: Option<usize>, tmp_path: PathBuf, thread_num: usize) -> Self
154 where
155 Self: Sized,
156 {
157 if mem_size.is_some() {
159 fs::create_dir_all(&tmp_path).unwrap();
160 }
161
162 Caches {
163 map_offset: DashMap::new(),
164 hash_set: DashSet::new(),
165 lru_cache: Mutex::new(LruCache::new(mem_size.unwrap_or(usize::MAX))),
166 mem_size,
167 tmp_path,
168 path_prefixes: [const { Once::new() }; 256],
169 pool: Arc::new(ThreadPool::new(thread_num)),
170 complete_signal: Arc::new(AtomicBool::new(false)),
171 }
172 }
173
174 fn get_hash(&self, offset: usize) -> Option<ObjectHash> {
175 self.map_offset.get(&offset).map(|x| *x)
176 }
177
178 fn insert(&self, offset: usize, hash: ObjectHash, obj: CacheObject) -> Arc<CacheObject> {
179 let obj_arc = Arc::new(obj);
180 {
181 let mut map = self.lru_cache.lock().unwrap();
183 let mut a_obj = ArcWrapper::new(
184 obj_arc.clone(),
185 self.complete_signal.clone(),
186 Some(self.pool.clone()),
187 );
188 if self.mem_size.is_some() {
189 a_obj.set_store_path(self.generate_temp_path(&self.tmp_path, hash));
190 }
191 let _ = map.insert(hash, a_obj);
192 }
193 self.hash_set.insert(hash);
195 self.map_offset.insert(offset, hash);
196
197 obj_arc
198 }
199
200 fn get_by_offset(&self, offset: usize) -> Option<Arc<CacheObject>> {
202 match self.map_offset.get(&offset) {
203 Some(x) => self.get_by_hash(*x),
204 None => None,
205 }
206 }
207
208 fn get_by_hash(&self, hash: ObjectHash) -> Option<Arc<CacheObject>> {
210 if self.hash_set.contains(&hash) {
212 match self.try_get(hash) {
213 Some(x) => Some(x),
214 None => {
215 if self.mem_size.is_none() {
216 panic!("should not be here when mem_size is not set")
217 }
218 self.get_fallback(hash).ok()
219 }
220 }
221 } else {
222 None
223 }
224 }
225
226 fn total_inserted(&self) -> usize {
227 self.hash_set.len()
228 }
229 fn memory_used(&self) -> usize {
230 self.lru_cache.lock().unwrap().current_size() + self.memory_used_index()
231 }
232 fn clear(&self) {
233 time_it!("Caches clear", {
234 self.complete_signal.store(true, Ordering::Release);
235 self.pool.join();
236 self.lru_cache.lock().unwrap().clear();
237 self.hash_set.clear();
238 self.hash_set.shrink_to_fit();
239 self.map_offset.clear();
240 self.map_offset.shrink_to_fit();
241 });
242
243 assert_eq!(self.pool.queued_count(), 0);
244 assert_eq!(self.pool.active_count(), 0);
245 assert_eq!(self.lru_cache.lock().unwrap().len(), 0);
246 }
247}
248
249#[cfg(test)]
250mod test {
251 use std::{env, sync::Arc, thread};
252
253 use super::*;
254 use crate::{
255 hash::{HashKind, ObjectHash, set_hash_kind_for_test},
256 internal::{object::types::ObjectType, pack::cache_object::CacheObjectInfo},
257 };
258
259 fn make_obj(size: usize, hash: ObjectHash) -> CacheObject {
261 CacheObject {
262 info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash),
263 data_decompressed: vec![0; size],
264 mem_recorder: None,
265 offset: 0,
266 crc32: 0,
267 is_delta_in_pack: false,
268 }
269 }
270
271 #[test]
273 fn test_cache_single_thread() {
274 for (kind, cap, size_ab, size_c, tmp_dir) in [
275 (
276 HashKind::Sha1,
277 2048usize,
278 800usize,
279 1700usize,
280 "tests/.cache_tmp",
281 ),
282 (
283 HashKind::Sha256,
284 4096usize,
285 1500usize,
286 3000usize,
287 "tests/.cache_tmp_sha256",
288 ),
289 ] {
290 let _guard = set_hash_kind_for_test(kind);
291 let source = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
292 let tmp_path = source.clone().join(tmp_dir);
293 if tmp_path.exists() {
294 fs::remove_dir_all(&tmp_path).unwrap();
295 }
296
297 let cache = Caches::new(Some(cap), tmp_path, 1);
298 let a_hash = ObjectHash::new(String::from("a").as_bytes());
299 let b_hash = ObjectHash::new(String::from("b").as_bytes());
300 let c_hash = ObjectHash::new(String::from("c").as_bytes());
301
302 let a = make_obj(size_ab, a_hash);
303 let b = make_obj(size_ab, b_hash);
304 let c = make_obj(size_c, c_hash);
305
306 cache.insert(a.offset, a_hash, a.clone());
308 assert!(cache.hash_set.contains(&a_hash));
309 assert!(cache.try_get(a_hash).is_some());
310
311 cache.insert(b.offset, b_hash, b.clone());
313 assert!(cache.hash_set.contains(&b_hash));
314 assert!(cache.try_get(b_hash).is_some());
315 assert!(cache.try_get(a_hash).is_some());
316
317 cache.insert(c.offset, c_hash, c.clone());
319 assert!(cache.try_get(a_hash).is_none());
320 assert!(cache.try_get(b_hash).is_none());
321 assert!(cache.try_get(c_hash).is_some());
322 assert!(cache.get_by_hash(c_hash).is_some());
323 }
324 }
325
326 #[test]
328 fn test_cache_multi_thread_mixed_hash_kinds() {
329 let base = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
330 let tmp_path = base.join("tests/.cache_tmp_mixed");
331 if tmp_path.exists() {
332 fs::remove_dir_all(&tmp_path).unwrap();
333 }
334
335 let cache = Arc::new(Caches::new(Some(4096), tmp_path, 2));
336
337 let cache_sha1 = Arc::clone(&cache);
338 let handle_sha1 = thread::spawn(move || {
339 let _g = set_hash_kind_for_test(HashKind::Sha1);
340 let hash = ObjectHash::new(b"sha1-entry");
341 let obj = CacheObject {
342 info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash),
343 data_decompressed: vec![0; 800],
344 mem_recorder: None,
345 offset: 1,
346 crc32: 0,
347 is_delta_in_pack: false,
348 };
349 cache_sha1.insert(obj.offset, hash, obj.clone());
350 assert!(cache_sha1.hash_set.contains(&hash));
351 assert!(cache_sha1.try_get(hash).is_some());
352 });
353
354 let cache_sha256 = Arc::clone(&cache);
355 let handle_sha256 = thread::spawn(move || {
356 let _g = set_hash_kind_for_test(HashKind::Sha256);
357 let hash = ObjectHash::new(b"sha256-entry");
358 let obj = CacheObject {
359 info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash),
360 data_decompressed: vec![0; 1500],
361 mem_recorder: None,
362 offset: 2,
363 crc32: 0,
364 is_delta_in_pack: false,
365 };
366 cache_sha256.insert(obj.offset, hash, obj.clone());
367 assert!(cache_sha256.hash_set.contains(&hash));
368 assert!(cache_sha256.try_get(hash).is_some());
369 });
370
371 handle_sha1.join().unwrap();
372 handle_sha256.join().unwrap();
373
374 assert_eq!(cache.total_inserted(), 2);
375 }
376}