git_internal/internal/pack/
cache.rs1use std::path::Path;
2use std::path::PathBuf;
3use std::sync::Once;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex};
6use std::thread::sleep;
7use std::{fs, io};
8
9use dashmap::{DashMap, DashSet};
10use lru_mem::LruCache;
11use threadpool::ThreadPool;
12
13use crate::hash::SHA1;
14use crate::internal::pack::cache_object::{
15 ArcWrapper, CacheObject, FileLoadStore, MemSizeRecorder,
16};
17use crate::time_it;
18
19pub trait _Cache {
20 fn new(mem_size: Option<usize>, tmp_path: PathBuf, thread_num: usize) -> Self
21 where
22 Self: Sized;
23 fn get_hash(&self, offset: usize) -> Option<SHA1>;
24 fn insert(&self, offset: usize, hash: SHA1, obj: CacheObject) -> Arc<CacheObject>;
25 fn get_by_offset(&self, offset: usize) -> Option<Arc<CacheObject>>;
26 fn get_by_hash(&self, h: SHA1) -> Option<Arc<CacheObject>>;
27 fn total_inserted(&self) -> usize;
28 fn memory_used(&self) -> usize;
29 fn clear(&self);
30}
31
32impl lru_mem::HeapSize for SHA1 {
33 fn heap_size(&self) -> usize {
34 0
35 }
36}
37
38pub struct Caches {
39 map_offset: DashMap<usize, SHA1>, hash_set: DashSet<SHA1>, lru_cache: Mutex<LruCache<SHA1, ArcWrapper<CacheObject>>>,
46 mem_size: Option<usize>,
47 tmp_path: PathBuf,
48 path_prefixes: [Once; 256],
49 pool: Arc<ThreadPool>,
50 complete_signal: Arc<AtomicBool>,
51}
52
53impl Caches {
54 fn try_get(&self, hash: SHA1) -> Option<Arc<CacheObject>> {
56 let mut map = self.lru_cache.lock().unwrap();
57 map.get(&hash).map(|x| x.data.clone())
58 }
59
60 fn get_fallback(&self, hash: SHA1) -> io::Result<Arc<CacheObject>> {
63 let path = self.generate_temp_path(&self.tmp_path, hash);
64 let obj = {
66 loop {
67 match Self::read_from_temp(&path) {
68 Ok(x) => break x,
69 Err(e) if e.kind() == io::ErrorKind::NotFound => {
70 sleep(std::time::Duration::from_millis(10));
71 continue;
72 }
73 Err(e) => return Err(e), }
75 }
76 };
77
78 let mut map = self.lru_cache.lock().unwrap();
79 let obj = Arc::new(obj);
80 let mut x = ArcWrapper::new(
81 obj.clone(),
82 self.complete_signal.clone(),
83 Some(self.pool.clone()),
84 );
85 x.set_store_path(path);
86 let _ = map.insert(hash, x); Ok(obj)
88 }
89
90 fn generate_temp_path(&self, tmp_path: &Path, hash: SHA1) -> PathBuf {
92 let mut path = PathBuf::with_capacity(self.tmp_path.capacity() + SHA1::SIZE * 2 + 5);
94 path.push(tmp_path);
95 let hash_str = hash._to_string();
96 path.push(&hash_str[..2]); self.path_prefixes[hash.as_ref()[0] as usize].call_once(|| {
98 if !path.exists() {
100 fs::create_dir_all(&path).unwrap();
101 }
102 });
103 path.push(hash_str);
104 path
105 }
106
107 fn read_from_temp(path: &Path) -> io::Result<CacheObject> {
108 let obj = CacheObject::f_load(path)?;
109 obj.record_mem_size();
112 Ok(obj)
113 }
114
115 pub fn queued_tasks(&self) -> usize {
116 self.pool.queued_count()
117 }
118
119 pub fn memory_used_index(&self) -> usize {
121 self.map_offset.capacity() * (std::mem::size_of::<usize>() + std::mem::size_of::<SHA1>())
122 + self.hash_set.capacity() * (std::mem::size_of::<SHA1>())
123 }
124
125 pub fn remove_tmp_dir(&self) {
127 time_it!("Remove tmp dir", {
128 if self.tmp_path.exists() {
129 fs::remove_dir_all(&self.tmp_path).unwrap(); }
131 });
132 }
133}
134
135impl _Cache for Caches {
136 fn new(mem_size: Option<usize>, tmp_path: PathBuf, thread_num: usize) -> Self
139 where
140 Self: Sized,
141 {
142 if mem_size.is_some() {
144 fs::create_dir_all(&tmp_path).unwrap();
145 }
146
147 Caches {
148 map_offset: DashMap::new(),
149 hash_set: DashSet::new(),
150 lru_cache: Mutex::new(LruCache::new(mem_size.unwrap_or(usize::MAX))),
151 mem_size,
152 tmp_path,
153 path_prefixes: [const { Once::new() }; 256],
154 pool: Arc::new(ThreadPool::new(thread_num)),
155 complete_signal: Arc::new(AtomicBool::new(false)),
156 }
157 }
158
159 fn get_hash(&self, offset: usize) -> Option<SHA1> {
160 self.map_offset.get(&offset).map(|x| *x)
161 }
162
163 fn insert(&self, offset: usize, hash: SHA1, obj: CacheObject) -> Arc<CacheObject> {
164 let obj_arc = Arc::new(obj);
165 {
166 let mut map = self.lru_cache.lock().unwrap();
168 let mut a_obj = ArcWrapper::new(
169 obj_arc.clone(),
170 self.complete_signal.clone(),
171 Some(self.pool.clone()),
172 );
173 if self.mem_size.is_some() {
174 a_obj.set_store_path(self.generate_temp_path(&self.tmp_path, hash));
175 }
176 let _ = map.insert(hash, a_obj);
177 }
178 self.hash_set.insert(hash);
180 self.map_offset.insert(offset, hash);
181
182 obj_arc
183 }
184
185 fn get_by_offset(&self, offset: usize) -> Option<Arc<CacheObject>> {
186 match self.map_offset.get(&offset) {
187 Some(x) => self.get_by_hash(*x),
188 None => None,
189 }
190 }
191
192 fn get_by_hash(&self, hash: SHA1) -> Option<Arc<CacheObject>> {
193 if self.hash_set.contains(&hash) {
195 match self.try_get(hash) {
196 Some(x) => Some(x),
197 None => {
198 if self.mem_size.is_none() {
199 panic!("should not be here when mem_size is not set")
200 }
201 self.get_fallback(hash).ok()
202 }
203 }
204 } else {
205 None
206 }
207 }
208
209 fn total_inserted(&self) -> usize {
210 self.hash_set.len()
211 }
212 fn memory_used(&self) -> usize {
213 self.lru_cache.lock().unwrap().current_size() + self.memory_used_index()
214 }
215 fn clear(&self) {
216 time_it!("Caches clear", {
217 self.complete_signal.store(true, Ordering::Release);
218 self.pool.join();
219 self.lru_cache.lock().unwrap().clear();
220 self.hash_set.clear();
221 self.hash_set.shrink_to_fit();
222 self.map_offset.clear();
223 self.map_offset.shrink_to_fit();
224 });
225
226 assert_eq!(self.pool.queued_count(), 0);
227 assert_eq!(self.pool.active_count(), 0);
228 assert_eq!(self.lru_cache.lock().unwrap().len(), 0);
229 }
230}
231
232#[cfg(test)]
233mod test {
234 use std::env;
235
236 use super::*;
237 use crate::{
238 hash::SHA1,
239 internal::{object::types::ObjectType, pack::cache_object::CacheObjectInfo},
240 };
241
242 #[test]
243 fn test_cache_single_thread() {
244 let source = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
245 let tmp_path = source.clone().join("tests/.cache_tmp");
246
247 if tmp_path.exists() {
248 fs::remove_dir_all(&tmp_path).unwrap();
249 }
250
251 let cache = Caches::new(Some(2048), tmp_path, 1);
252 let a_hash = SHA1::new(String::from("a").as_bytes());
253 let b_hash = SHA1::new(String::from("b").as_bytes());
254 let a = CacheObject {
255 info: CacheObjectInfo::BaseObject(ObjectType::Blob, a_hash),
256 data_decompressed: vec![0; 800],
257 mem_recorder: None,
258 offset: 0,
259 is_delta_in_pack: false,
260 };
261 let b = CacheObject {
262 info: CacheObjectInfo::BaseObject(ObjectType::Blob, b_hash),
263 data_decompressed: vec![0; 800],
264 mem_recorder: None,
265 offset: 0,
266 is_delta_in_pack: false,
267 };
268 cache.insert(a.offset, a_hash, a.clone());
270 assert!(cache.hash_set.contains(&a_hash));
271 assert!(cache.try_get(a_hash).is_some());
272
273 cache.insert(b.offset, b_hash, b.clone());
275 assert!(cache.hash_set.contains(&b_hash));
276 assert!(cache.try_get(b_hash).is_some());
277 assert!(cache.try_get(a_hash).is_some());
278
279 let c_hash = SHA1::new(String::from("c").as_bytes());
280 let c = CacheObject {
282 info: CacheObjectInfo::BaseObject(ObjectType::Blob, c_hash),
283 data_decompressed: vec![0; 1700],
284 mem_recorder: None,
285 offset: 0,
286 is_delta_in_pack: false,
287 };
288 cache.insert(c.offset, c_hash, c.clone());
289 assert!(cache.try_get(a_hash).is_none());
290 assert!(cache.try_get(b_hash).is_none());
291 assert!(cache.try_get(c_hash).is_some());
292 assert!(cache.get_by_hash(c_hash).is_some());
293 }
294}