1use std::fs::OpenOptions;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::{fs, io};
6use std::{ops::Deref, sync::Arc};
7
8use lru_mem::{HeapSize, MemSize};
9use serde::{Deserialize, Serialize};
10use threadpool::ThreadPool;
11
12use crate::internal::pack::entry::Entry;
13use crate::internal::pack::utils;
14use crate::{hash::SHA1, internal::object::types::ObjectType};
15
16pub trait FileLoadStore: Serialize + for<'a> Deserialize<'a> {
21 fn f_load(path: &Path) -> Result<Self, io::Error>;
22 fn f_save(&self, path: &Path) -> Result<(), io::Error>;
23}
24
25impl<T: Serialize + for<'a> Deserialize<'a>> FileLoadStore for T {
27 fn f_load(path: &Path) -> Result<T, io::Error> {
28 let data = fs::read(path)?;
29 let obj: T = bincode::serde::decode_from_slice(&data, bincode::config::standard())
30 .map_err(io::Error::other)?
31 .0;
32 Ok(obj)
33 }
34 fn f_save(&self, path: &Path) -> Result<(), io::Error> {
35 if path.exists() {
36 return Ok(());
37 }
38 let data = bincode::serde::encode_to_vec(self, bincode::config::standard()).unwrap();
39 let path = path.with_extension("temp");
40 {
41 let mut file = OpenOptions::new()
42 .write(true)
43 .create_new(true)
44 .open(path.clone())?;
45 file.write_all(&data)?;
46 }
47 let final_path = path.with_extension("");
48 fs::rename(&path, final_path.clone())?;
49 Ok(())
50 }
51}
52
53#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
55pub(crate) enum CacheObjectInfo {
56 BaseObject(ObjectType, SHA1),
60 OffsetDelta(usize, usize),
63 OffsetZstdelta(usize, usize),
65 HashDelta(SHA1, usize),
68}
69
70impl CacheObjectInfo {
71 pub(crate) fn object_type(&self) -> ObjectType {
73 match self {
74 CacheObjectInfo::BaseObject(obj_type, _) => *obj_type,
75 CacheObjectInfo::OffsetDelta(_, _) => ObjectType::OffsetDelta,
76 CacheObjectInfo::OffsetZstdelta(_, _) => ObjectType::OffsetZstdelta,
77 CacheObjectInfo::HashDelta(_, _) => ObjectType::HashDelta,
78 }
79 }
80}
81
82#[derive(Debug, Serialize, Deserialize)]
83pub struct CacheObject {
84 pub(crate) info: CacheObjectInfo,
85 pub offset: usize,
86 pub data_decompressed: Vec<u8>,
87 pub mem_recorder: Option<Arc<AtomicUsize>>, }
89
90impl Clone for CacheObject {
91 fn clone(&self) -> Self {
92 let obj = CacheObject {
93 info: self.info.clone(),
94 offset: self.offset,
95 data_decompressed: self.data_decompressed.clone(),
96 mem_recorder: self.mem_recorder.clone(),
97 };
98 obj.record_mem_size();
99 obj
100 }
101}
102
103impl HeapSize for CacheObject {
107 fn heap_size(&self) -> usize {
117 match &self.info {
118 CacheObjectInfo::BaseObject(_, _) => self.data_decompressed.heap_size(),
119 CacheObjectInfo::OffsetDelta(_, delta_final_size)
120 | CacheObjectInfo::OffsetZstdelta(_, delta_final_size)
121 | CacheObjectInfo::HashDelta(_, delta_final_size) => {
122 self.data_decompressed.heap_size() + delta_final_size
133 }
134 }
135 }
136}
137
138impl Drop for CacheObject {
139 fn drop(&mut self) {
142 if let Some(mem_recorder) = &self.mem_recorder {
144 mem_recorder.fetch_sub((*self).mem_size(), Ordering::Release);
145 }
146 }
147}
148
149pub trait MemSizeRecorder: MemSize {
156 fn record_mem_size(&self);
157 fn set_mem_recorder(&mut self, mem_size: Arc<AtomicUsize>);
158 }
160
161impl MemSizeRecorder for CacheObject {
162 fn record_mem_size(&self) {
165 if let Some(mem_recorder) = &self.mem_recorder {
166 mem_recorder.fetch_add(self.mem_size(), Ordering::Release);
167 }
168 }
169
170 fn set_mem_recorder(&mut self, mem_recorder: Arc<AtomicUsize>) {
171 self.mem_recorder = Some(mem_recorder);
172 }
173
174 }
178
179impl CacheObject {
180 pub fn new_for_undeltified(obj_type: ObjectType, data: Vec<u8>, offset: usize) -> Self {
182 let hash = utils::calculate_object_hash(obj_type, &data);
183 CacheObject {
184 info: CacheObjectInfo::BaseObject(obj_type, hash),
185 offset,
186 data_decompressed: data,
187 mem_recorder: None,
188 }
189 }
190
191 pub fn object_type(&self) -> ObjectType {
193 self.info.object_type()
194 }
195
196 pub fn base_object_hash(&self) -> Option<SHA1> {
200 match &self.info {
201 CacheObjectInfo::BaseObject(_, hash) => Some(*hash),
202 _ => None,
203 }
204 }
205
206 pub fn offset_delta(&self) -> Option<usize> {
210 match &self.info {
211 CacheObjectInfo::OffsetDelta(offset, _) => Some(*offset),
212 _ => None,
213 }
214 }
215
216 pub fn hash_delta(&self) -> Option<SHA1> {
220 match &self.info {
221 CacheObjectInfo::HashDelta(hash, _) => Some(*hash),
222 _ => None,
223 }
224 }
225
226 pub fn to_entry(&self) -> Entry {
228 match self.info {
229 CacheObjectInfo::BaseObject(obj_type, hash) => Entry {
230 obj_type,
231 data: self.data_decompressed.clone(),
232 hash,
233 chain_len: 0,
234 },
235 _ => {
236 unreachable!("delta object should not persist!")
237 }
238 }
239 }
240}
241
242pub trait ArcWrapperBounds:
244 HeapSize + Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static
245{
246}
247impl<T: HeapSize + Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static> ArcWrapperBounds
250 for T
251{
252}
253
254pub struct ArcWrapper<T: ArcWrapperBounds> {
258 pub data: Arc<T>,
259 complete_signal: Arc<AtomicBool>,
260 pool: Option<Arc<ThreadPool>>,
261 pub store_path: Option<PathBuf>, }
263impl<T: ArcWrapperBounds> ArcWrapper<T> {
264 pub fn new(data: Arc<T>, share_flag: Arc<AtomicBool>, pool: Option<Arc<ThreadPool>>) -> Self {
266 ArcWrapper {
267 data,
268 complete_signal: share_flag,
269 pool,
270 store_path: None,
271 }
272 }
273 pub fn set_store_path(&mut self, path: PathBuf) {
274 self.store_path = Some(path);
275 }
276}
277
278impl<T: ArcWrapperBounds> HeapSize for ArcWrapper<T> {
279 fn heap_size(&self) -> usize {
280 self.data.heap_size()
281 }
282}
283
284impl<T: ArcWrapperBounds> Clone for ArcWrapper<T> {
285 fn clone(&self) -> Self {
287 ArcWrapper {
288 data: self.data.clone(),
289 complete_signal: self.complete_signal.clone(),
290 pool: self.pool.clone(),
291 store_path: None,
292 }
293 }
294}
295
296impl<T: ArcWrapperBounds> Deref for ArcWrapper<T> {
297 type Target = Arc<T>;
298 fn deref(&self) -> &Self::Target {
299 &self.data
300 }
301}
302impl<T: ArcWrapperBounds> Drop for ArcWrapper<T> {
303 fn drop(&mut self) {
306 if !self.complete_signal.load(Ordering::Acquire)
307 && let Some(path) = &self.store_path
308 {
309 match &self.pool {
310 Some(pool) => {
311 let data_copy = self.data.clone();
312 let path_copy = path.clone();
313 let complete_signal = self.complete_signal.clone();
314 while pool.queued_count() > 2000 {
317 std::thread::yield_now();
318 }
319 pool.execute(move || {
320 if !complete_signal.load(Ordering::Acquire) {
321 let res = data_copy.f_save(&path_copy);
322 if let Err(e) = res {
323 println!("[f_save] {path_copy:?} error: {e:?}");
324 }
325 }
326 });
327 }
328 None => {
329 let res = self.data.f_save(path);
330 if let Err(e) = res {
331 println!("[f_save] {path:?} error: {e:?}");
332 }
333 }
334 }
335 }
336 }
337}
338#[cfg(test)]
339mod test {
340 use std::{fs, sync::Mutex};
341
342 use lru_mem::LruCache;
343
344 use super::*;
345 #[test]
346 #[ignore = "only in single thread"]
347 fn test_heap_size_record() {
349 let mut obj = CacheObject {
350 info: CacheObjectInfo::BaseObject(ObjectType::Blob, SHA1::default()),
351 offset: 0,
352 data_decompressed: vec![0; 1024],
353 mem_recorder: None,
354 };
355 let mem = Arc::new(AtomicUsize::default());
356 assert_eq!(mem.load(Ordering::Relaxed), 0);
357 obj.set_mem_recorder(mem.clone());
358 obj.record_mem_size();
359 assert_eq!(mem.load(Ordering::Relaxed), 1120);
360 drop(obj);
361 assert_eq!(mem.load(Ordering::Relaxed), 0);
362 }
363
364 #[test]
365 fn test_cache_object_with_same_size() {
366 let a = CacheObject {
367 info: CacheObjectInfo::BaseObject(ObjectType::Blob, SHA1::default()),
368 offset: 0,
369 data_decompressed: vec![0; 1024],
370 mem_recorder: None,
371 };
372 assert!(a.heap_size() == 1024);
373
374 let b = ArcWrapper::new(Arc::new(a.clone()), Arc::new(AtomicBool::new(false)), None);
376 assert!(b.heap_size() == 1024);
377 }
378 #[test]
379 #[ignore]
380 fn test_cache_object_with_lru() {
381 let mut cache = LruCache::new(2048);
382
383 let hash_a = SHA1::default();
384 let hash_b = SHA1::new(b"b"); let a = CacheObject {
386 info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash_a),
387 offset: 0,
388 data_decompressed: vec![0; 1024],
389 mem_recorder: None,
390 };
391 println!("a.heap_size() = {}", a.heap_size());
392
393 let b = CacheObject {
394 info: CacheObjectInfo::BaseObject(ObjectType::Blob, hash_b),
395 offset: 0,
396 data_decompressed: vec![0; (1024.0 * 1.5) as usize],
397 mem_recorder: None,
398 };
399 {
400 let r = cache.insert(
401 hash_a.to_string(),
402 ArcWrapper::new(Arc::new(a.clone()), Arc::new(AtomicBool::new(true)), None),
403 );
404 assert!(r.is_ok())
405 }
406 {
407 let r = cache.try_insert(
408 hash_b.to_string(),
409 ArcWrapper::new(Arc::new(b.clone()), Arc::new(AtomicBool::new(true)), None),
410 );
411 assert!(r.is_err());
412 if let Err(lru_mem::TryInsertError::WouldEjectLru { .. }) = r {
413 } else {
415 panic!("Expected WouldEjectLru error");
416 }
417 let r = cache.insert(
419 hash_b.to_string(),
420 ArcWrapper::new(Arc::new(b.clone()), Arc::new(AtomicBool::new(true)), None),
421 );
422 assert!(r.is_ok());
423 }
424 {
425 let r = cache.get(&hash_a.to_string());
427 assert!(r.is_some());
428 }
429 }
430
431 #[derive(Serialize, Deserialize)]
432 struct Test {
433 a: usize,
434 }
435 impl Drop for Test {
436 fn drop(&mut self) {
437 println!("drop Test");
438 }
439 }
440 impl HeapSize for Test {
441 fn heap_size(&self) -> usize {
442 self.a
443 }
444 }
445 #[test]
446 fn test_lru_drop() {
447 println!("insert a");
448 let cache = LruCache::new(2048);
449 let cache = Arc::new(Mutex::new(cache));
450 {
451 let mut c = cache.as_ref().lock().unwrap();
452 let _ = c.insert(
453 "a",
454 ArcWrapper::new(
455 Arc::new(Test { a: 1024 }),
456 Arc::new(AtomicBool::new(true)),
457 None,
458 ),
459 );
460 }
461 println!("insert b, a should be ejected");
462 {
463 let mut c = cache.as_ref().lock().unwrap();
464 let _ = c.insert(
465 "b",
466 ArcWrapper::new(
467 Arc::new(Test { a: 1200 }),
468 Arc::new(AtomicBool::new(true)),
469 None,
470 ),
471 );
472 }
473 let b = {
474 let mut c = cache.as_ref().lock().unwrap();
475 c.get("b").cloned()
476 };
477 println!("insert c, b should not be ejected");
478 {
479 let mut c = cache.as_ref().lock().unwrap();
480 let _ = c.insert(
481 "c",
482 ArcWrapper::new(
483 Arc::new(Test { a: 1200 }),
484 Arc::new(AtomicBool::new(true)),
485 None,
486 ),
487 );
488 }
489 println!("user b: {}", b.as_ref().unwrap().a);
490 println!("test over, enject all");
491 }
492
493 #[test]
494 fn test_cache_object_serialize() {
495 let a = CacheObject {
496 info: CacheObjectInfo::BaseObject(ObjectType::Blob, SHA1::default()),
497 offset: 0,
498 data_decompressed: vec![0; 1024],
499 mem_recorder: None,
500 };
501 let s = bincode::serde::encode_to_vec(&a, bincode::config::standard()).unwrap();
502 let b: CacheObject = bincode::serde::decode_from_slice(&s, bincode::config::standard())
503 .unwrap()
504 .0;
505 assert_eq!(a.info, b.info);
506 assert_eq!(a.data_decompressed, b.data_decompressed);
507 assert_eq!(a.offset, b.offset);
508 }
509
510 #[test]
511 fn test_arc_wrapper_drop_store() {
512 let mut path = PathBuf::from(".cache_temp/test_arc_wrapper_drop_store");
513 fs::create_dir_all(&path).unwrap();
514 path.push("test_obj");
515 let mut a = ArcWrapper::new(Arc::new(1024), Arc::new(AtomicBool::new(false)), None);
516 a.set_store_path(path.clone());
517 drop(a);
518
519 assert!(path.exists());
520 path.pop();
521 fs::remove_dir_all(path).unwrap();
522 }
523
524 #[test]
525 fn test_arc_wrapper_with_lru() {
527 let mut cache = LruCache::new(1500);
528 let path = PathBuf::from(".cache_temp/test_arc_wrapper_with_lru");
529 let _ = fs::remove_dir_all(&path);
530 fs::create_dir_all(&path).unwrap();
531 let shared_flag = Arc::new(AtomicBool::new(false));
532
533 let a_path = path.join("a");
535 {
536 let mut a = ArcWrapper::new(Arc::new(Test { a: 1024 }), shared_flag.clone(), None);
537 a.set_store_path(a_path.clone());
538 let b = ArcWrapper::new(Arc::new(1024), shared_flag.clone(), None);
539 assert!(b.store_path.is_none());
540
541 println!("insert a with heap size: {:?}", a.heap_size());
542 let rt = cache.insert("a", a);
543 if let Err(e) = rt {
544 panic!("{}", format!("insert a failed: {:?}", e.to_string()));
545 }
546 println!("after insert a, cache used = {}", cache.current_size());
547 }
548 assert!(!a_path.exists());
549
550 let b_path = path.join("b");
551 {
553 let mut b = ArcWrapper::new(Arc::new(Test { a: 996 }), shared_flag.clone(), None);
554 b.set_store_path(b_path.clone());
555 let rt = cache.insert("b", b);
556 if let Err(e) = rt {
557 panic!("{}", format!("insert a failed: {:?}", e.to_string()));
558 }
559 println!("after insert b, cache used = {}", cache.current_size());
560 }
561 assert!(a_path.exists());
562 assert!(!b_path.exists());
563 shared_flag.store(true, Ordering::Release);
564 fs::remove_dir_all(path).unwrap();
565 }
567}