git_internal/internal/pack/
cache.rs1use std::{
5 fs, io,
6 path::{Path, PathBuf},
7 sync::{
8 Arc, Mutex, Once,
9 atomic::{AtomicBool, Ordering},
10 },
11 thread::sleep,
12};
13
14use dashmap::{DashMap, DashSet};
15use lru_mem::LruCache;
16use threadpool::ThreadPool;
17
18use crate::{
19 hash::ObjectHash,
20 internal::pack::cache_object::{ArcWrapper, CacheObject, FileLoadStore, MemSizeRecorder},
21 time_it,
22};
23
24const CACHE_LAYOUT_VERSION: &str = "rkyv-v1";
28
29pub trait _Cache {
33 fn new(mem_size: Option<usize>, tmp_path: PathBuf, thread_num: usize) -> Self
34 where
35 Self: Sized;
36 fn get_hash(&self, offset: usize) -> Option<ObjectHash>;
37 fn insert(&self, offset: usize, hash: ObjectHash, obj: CacheObject) -> Arc<CacheObject>;
38 fn get_by_offset(&self, offset: usize) -> Option<Arc<CacheObject>>;
39 fn get_by_hash(&self, h: ObjectHash) -> Option<Arc<CacheObject>>;
40 fn total_inserted(&self) -> usize;
41 fn memory_used(&self) -> usize;
42 fn clear(&self);
43}
44
45impl lru_mem::HeapSize for ObjectHash {
46 fn heap_size(&self) -> usize {
47 0
48 }
49}
50
51pub struct Caches {
55 map_offset: DashMap<usize, ObjectHash>, hash_set: DashSet<ObjectHash>, lru_cache: Mutex<LruCache<ObjectHash, ArcWrapper<CacheObject>>>,
62 mem_size: Option<usize>,
63 tmp_path: PathBuf,
64 path_prefixes: [Once; 256],
65 pool: Arc<ThreadPool>,
66 complete_signal: Arc<AtomicBool>,
67}
68
69impl Caches {
70 fn try_get(&self, hash: ObjectHash) -> Option<Arc<CacheObject>> {
72 let mut map = self.lru_cache.lock().unwrap();
73 map.get(&hash).map(|x| x.data.clone())
74 }
75
76 fn get_fallback(&self, hash: ObjectHash) -> io::Result<Arc<CacheObject>> {
79 let path = self.generate_temp_path(&self.tmp_path, hash);
80 let obj = {
82 loop {
83 match Self::read_from_temp(&path) {
84 Ok(x) => break x,
85 Err(e) if e.kind() == io::ErrorKind::NotFound => {
86 sleep(std::time::Duration::from_millis(10));
87 continue;
88 }
89 Err(e) => return Err(e), }
91 }
92 };
93
94 let mut map = self.lru_cache.lock().unwrap();
95 let obj = Arc::new(obj);
96 let mut x = ArcWrapper::new(
97 obj.clone(),
98 self.complete_signal.clone(),
99 Some(self.pool.clone()),
100 );
101 x.set_store_path(path);
102 let _ = map.insert(hash, x); Ok(obj)
104 }
105
106 fn generate_temp_path(&self, tmp_path: &Path, hash: ObjectHash) -> PathBuf {
108 let mut path =
110 PathBuf::with_capacity(self.tmp_path.capacity() + hash.to_string().len() + 5);
111 path.push(tmp_path);
112 path.push(CACHE_LAYOUT_VERSION);
113 let hash_str = hash._to_string();
114 path.push(&hash_str[..2]); self.path_prefixes[hash.as_ref()[0] as usize].call_once(|| {
116 if !path.exists() {
118 fs::create_dir_all(&path).unwrap();
119 }
120 });
121 path.push(hash_str);
122 path
123 }
124
125 fn read_from_temp(path: &Path) -> io::Result<CacheObject> {
127 let obj = CacheObject::f_load(path)?;
128 obj.record_mem_size();
131 Ok(obj)
132 }
133
134 pub fn queued_tasks(&self) -> usize {
136 self.pool.queued_count()
137 }
138
139 pub fn memory_used_index(&self) -> usize {
141 self.map_offset.capacity()
142 * (std::mem::size_of::<usize>() + std::mem::size_of::<ObjectHash>())
143 + self.hash_set.capacity() * (std::mem::size_of::<ObjectHash>())
144 }
145
146 pub fn remove_tmp_dir(&self) {
148 time_it!("Remove tmp dir", {
149 if self.tmp_path.exists() {
150 fs::remove_dir_all(&self.tmp_path).unwrap(); if let Some(parent) = self.tmp_path.parent() {
153 let is_cache_temp = parent
154 .file_name()
155 .and_then(|n| n.to_str())
156 .map(|n| n == ".cache_temp")
157 .unwrap_or(false);
158 if is_cache_temp {
159 let _ = fs::remove_dir(parent);
162 }
163 }
164 }
165 });
166 }
167}
168
169impl _Cache for Caches {
170 fn new(mem_size: Option<usize>, tmp_path: PathBuf, thread_num: usize) -> Self
173 where
174 Self: Sized,
175 {
176 if mem_size.is_some() {
178 fs::create_dir_all(&tmp_path).unwrap();
179 }
180
181 Caches {
182 map_offset: DashMap::new(),
183 hash_set: DashSet::new(),
184 lru_cache: Mutex::new(LruCache::new(mem_size.unwrap_or(usize::MAX))),
185 mem_size,
186 tmp_path,
187 path_prefixes: [const { Once::new() }; 256],
188 pool: Arc::new(ThreadPool::new(thread_num)),
189 complete_signal: Arc::new(AtomicBool::new(false)),
190 }
191 }
192
193 fn get_hash(&self, offset: usize) -> Option<ObjectHash> {
194 self.map_offset.get(&offset).map(|x| *x)
195 }
196
197 fn insert(&self, offset: usize, hash: ObjectHash, obj: CacheObject) -> Arc<CacheObject> {
198 let obj_arc = Arc::new(obj);
199 {
200 let mut map = self.lru_cache.lock().unwrap();
202 let mut a_obj = ArcWrapper::new(
203 obj_arc.clone(),
204 self.complete_signal.clone(),
205 Some(self.pool.clone()),
206 );
207 if self.mem_size.is_some() {
208 a_obj.set_store_path(self.generate_temp_path(&self.tmp_path, hash));
209 }
210 let _ = map.insert(hash, a_obj);
211 }
212 self.hash_set.insert(hash);
214 self.map_offset.insert(offset, hash);
215
216 obj_arc
217 }
218
219 fn get_by_offset(&self, offset: usize) -> Option<Arc<CacheObject>> {
221 match self.map_offset.get(&offset) {
222 Some(x) => self.get_by_hash(*x),
223 None => None,
224 }
225 }
226
227 fn get_by_hash(&self, hash: ObjectHash) -> Option<Arc<CacheObject>> {
229 if self.hash_set.contains(&hash) {
231 match self.try_get(hash) {
232 Some(x) => Some(x),
233 None => {
234 if self.mem_size.is_none() {
235 panic!("should not be here when mem_size is not set")
236 }
237 self.get_fallback(hash).ok()
238 }
239 }
240 } else {
241 None
242 }
243 }
244
245 fn total_inserted(&self) -> usize {
246 self.hash_set.len()
247 }
248 fn memory_used(&self) -> usize {
249 self.lru_cache.lock().unwrap().current_size() + self.memory_used_index()
250 }
251 fn clear(&self) {
252 time_it!("Caches clear", {
253 self.complete_signal.store(true, Ordering::Release);
254 self.pool.join();
255 self.lru_cache.lock().unwrap().clear();
256 self.hash_set.clear();
257 self.hash_set.shrink_to_fit();
258 self.map_offset.clear();
259 self.map_offset.shrink_to_fit();
260 });
261
262 assert_eq!(self.pool.queued_count(), 0);
263 assert_eq!(self.pool.active_count(), 0);
264 assert_eq!(self.lru_cache.lock().unwrap().len(), 0);
265 }
266}
267
268#[cfg(test)]
269mod test {
270 use std::{env, sync::Arc, thread};
271
272 use super::*;
273 use crate::{
274 hash::{HashKind, ObjectHash, set_hash_kind_for_test},
275 internal::{object::types::ObjectType, pack::cache_object::CacheObjectInfo},
276 };
277
278 fn make_obj(size: usize, hash: ObjectHash) -> CacheObject {
280 CacheObject {
281 info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash),
282 data_decompressed: vec![0; size],
283 mem_recorder: None,
284 offset: 0,
285 crc32: 0,
286 is_delta_in_pack: false,
287 }
288 }
289
290 #[test]
292 fn test_cache_single_thread() {
293 for (kind, cap, size_ab, size_c, tmp_dir) in [
294 (
295 HashKind::Sha1,
296 2048usize,
297 800usize,
298 1700usize,
299 "tests/.cache_tmp",
300 ),
301 (
302 HashKind::Sha256,
303 4096usize,
304 1500usize,
305 3000usize,
306 "tests/.cache_tmp_sha256",
307 ),
308 ] {
309 let _guard = set_hash_kind_for_test(kind);
310 let source = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
311 let tmp_path = source.clone().join(tmp_dir);
312 if tmp_path.exists() {
313 fs::remove_dir_all(&tmp_path).unwrap();
314 }
315
316 let cache = Caches::new(Some(cap), tmp_path, 1);
317 let a_hash = ObjectHash::new(String::from("a").as_bytes());
318 let b_hash = ObjectHash::new(String::from("b").as_bytes());
319 let c_hash = ObjectHash::new(String::from("c").as_bytes());
320
321 let a = make_obj(size_ab, a_hash);
322 let b = make_obj(size_ab, b_hash);
323 let c = make_obj(size_c, c_hash);
324
325 cache.insert(a.offset, a_hash, a.clone());
327 assert!(cache.hash_set.contains(&a_hash));
328 assert!(cache.try_get(a_hash).is_some());
329
330 cache.insert(b.offset, b_hash, b.clone());
332 assert!(cache.hash_set.contains(&b_hash));
333 assert!(cache.try_get(b_hash).is_some());
334 assert!(cache.try_get(a_hash).is_some());
335
336 cache.insert(c.offset, c_hash, c.clone());
338 assert!(cache.try_get(a_hash).is_none());
339 assert!(cache.try_get(b_hash).is_none());
340 assert!(cache.try_get(c_hash).is_some());
341 assert!(cache.get_by_hash(c_hash).is_some());
342 }
343 }
344
345 #[test]
347 fn test_cache_multi_thread_mixed_hash_kinds() {
348 let base = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
349 let tmp_path = base.join("tests/.cache_tmp_mixed");
350 if tmp_path.exists() {
351 fs::remove_dir_all(&tmp_path).unwrap();
352 }
353
354 let cache = Arc::new(Caches::new(Some(4096), tmp_path, 2));
355
356 let cache_sha1 = Arc::clone(&cache);
357 let handle_sha1 = thread::spawn(move || {
358 let _g = set_hash_kind_for_test(HashKind::Sha1);
359 let hash = ObjectHash::new(b"sha1-entry");
360 let obj = CacheObject {
361 info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash),
362 data_decompressed: vec![0; 800],
363 mem_recorder: None,
364 offset: 1,
365 crc32: 0,
366 is_delta_in_pack: false,
367 };
368 cache_sha1.insert(obj.offset, hash, obj.clone());
369 assert!(cache_sha1.hash_set.contains(&hash));
370 assert!(cache_sha1.try_get(hash).is_some());
371 });
372
373 let cache_sha256 = Arc::clone(&cache);
374 let handle_sha256 = thread::spawn(move || {
375 let _g = set_hash_kind_for_test(HashKind::Sha256);
376 let hash = ObjectHash::new(b"sha256-entry");
377 let obj = CacheObject {
378 info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash),
379 data_decompressed: vec![0; 1500],
380 mem_recorder: None,
381 offset: 2,
382 crc32: 0,
383 is_delta_in_pack: false,
384 };
385 cache_sha256.insert(obj.offset, hash, obj.clone());
386 assert!(cache_sha256.hash_set.contains(&hash));
387 assert!(cache_sha256.try_get(hash).is_some());
388 });
389
390 handle_sha1.join().unwrap();
391 handle_sha256.join().unwrap();
392
393 assert_eq!(cache.total_inserted(), 2);
394 }
395}