1use std::{
5 fs,
6 fs::OpenOptions,
7 io,
8 io::Write,
9 ops::Deref,
10 path::{Path, PathBuf},
11 sync::{
12 Arc,
13 atomic::{AtomicBool, AtomicUsize, Ordering},
14 },
15};
16
17use lru_mem::{HeapSize, MemSize};
18use serde::{Deserialize, Serialize};
19use threadpool::ThreadPool;
20
21use crate::{
22 hash::ObjectHash,
23 internal::{
24 metadata::{EntryMeta, MetaAttached},
25 object::types::ObjectType,
26 pack::{entry::Entry, utils},
27 },
28};
29
30pub trait FileLoadStore: Serialize + for<'a> Deserialize<'a> {
34 fn f_load(path: &Path) -> Result<Self, io::Error>;
35 fn f_save(&self, path: &Path) -> Result<(), io::Error>;
36}
37
38impl<T: Serialize + for<'a> Deserialize<'a>> FileLoadStore for T {
40 fn f_load(path: &Path) -> Result<T, io::Error> {
42 let data = fs::read(path)?;
43 let obj: T = bincode::serde::decode_from_slice(&data, bincode::config::standard())
44 .map_err(io::Error::other)?
45 .0;
46 Ok(obj)
47 }
48 fn f_save(&self, path: &Path) -> Result<(), io::Error> {
49 if path.exists() {
50 return Ok(());
51 }
52 let data = bincode::serde::encode_to_vec(self, bincode::config::standard()).unwrap();
53 let path = path.with_extension("temp");
54 {
55 let mut file = OpenOptions::new()
56 .write(true)
57 .create_new(true)
58 .open(path.clone())?;
59 file.write_all(&data)?;
60 }
61 let final_path = path.with_extension("");
62 fs::rename(&path, final_path.clone())?;
63 Ok(())
64 }
65}
66
67#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
69pub(crate) enum CacheObjectInfo {
70 BaseObject(ObjectType, ObjectHash),
74 OffsetDelta(usize, usize),
77 OffsetZstdelta(usize, usize),
79 HashDelta(ObjectHash, usize),
82}
83
84impl CacheObjectInfo {
85 pub(crate) fn object_type(&self) -> ObjectType {
87 match self {
88 CacheObjectInfo::BaseObject(obj_type, _) => *obj_type,
89 CacheObjectInfo::OffsetDelta(_, _) => ObjectType::OffsetDelta,
90 CacheObjectInfo::OffsetZstdelta(_, _) => ObjectType::OffsetZstdelta,
91 CacheObjectInfo::HashDelta(_, _) => ObjectType::HashDelta,
92 }
93 }
94}
95
96#[derive(Debug, Serialize, Deserialize)]
98pub struct CacheObject {
99 pub(crate) info: CacheObjectInfo,
100 pub offset: usize,
101 pub crc32: u32,
102 pub data_decompressed: Vec<u8>,
103 pub mem_recorder: Option<Arc<AtomicUsize>>, pub is_delta_in_pack: bool,
105}
106
107impl Clone for CacheObject {
108 fn clone(&self) -> Self {
109 let obj = CacheObject {
110 info: self.info.clone(),
111 offset: self.offset,
112 crc32: self.crc32,
113 data_decompressed: self.data_decompressed.clone(),
114 mem_recorder: self.mem_recorder.clone(),
115 is_delta_in_pack: self.is_delta_in_pack,
116 };
117 obj.record_mem_size();
118 obj
119 }
120}
121
122impl HeapSize for CacheObject {
126 fn heap_size(&self) -> usize {
136 match &self.info {
137 CacheObjectInfo::BaseObject(_, _) => self.data_decompressed.heap_size(),
138 CacheObjectInfo::OffsetDelta(_, delta_final_size)
139 | CacheObjectInfo::OffsetZstdelta(_, delta_final_size)
140 | CacheObjectInfo::HashDelta(_, delta_final_size) => {
141 self.data_decompressed.heap_size() + delta_final_size
152 }
153 }
154 }
155}
156
157impl Drop for CacheObject {
158 fn drop(&mut self) {
161 if let Some(mem_recorder) = &self.mem_recorder {
163 mem_recorder.fetch_sub((*self).mem_size(), Ordering::Release);
164 }
165 }
166}
167
168pub trait MemSizeRecorder: MemSize {
175 fn record_mem_size(&self);
176 fn set_mem_recorder(&mut self, mem_size: Arc<AtomicUsize>);
177 }
179
180impl MemSizeRecorder for CacheObject {
181 fn record_mem_size(&self) {
184 if let Some(mem_recorder) = &self.mem_recorder {
185 mem_recorder.fetch_add(self.mem_size(), Ordering::Release);
186 }
187 }
188
189 fn set_mem_recorder(&mut self, mem_recorder: Arc<AtomicUsize>) {
190 self.mem_recorder = Some(mem_recorder);
191 }
192
193 }
197
198impl CacheObject {
199 pub fn new_for_undeltified(
201 obj_type: ObjectType,
202 data: Vec<u8>,
203 offset: usize,
204 crc32: u32,
205 ) -> Self {
206 let hash = utils::calculate_object_hash(obj_type, &data);
207 CacheObject {
208 info: CacheObjectInfo::BaseObject(obj_type, hash),
209 offset,
210 crc32,
211 data_decompressed: data,
212 mem_recorder: None,
213 is_delta_in_pack: false,
214 }
215 }
216
217 pub fn object_type(&self) -> ObjectType {
219 self.info.object_type()
220 }
221
222 pub fn base_object_hash(&self) -> Option<ObjectHash> {
226 match &self.info {
227 CacheObjectInfo::BaseObject(_, hash) => Some(*hash),
228 _ => None,
229 }
230 }
231
232 pub fn offset_delta(&self) -> Option<usize> {
236 match &self.info {
237 CacheObjectInfo::OffsetDelta(offset, _) => Some(*offset),
238 _ => None,
239 }
240 }
241
242 pub fn hash_delta(&self) -> Option<ObjectHash> {
246 match &self.info {
247 CacheObjectInfo::HashDelta(hash, _) => Some(*hash),
248 _ => None,
249 }
250 }
251
252 pub fn to_entry(&self) -> Entry {
254 match self.info {
255 CacheObjectInfo::BaseObject(obj_type, hash) => Entry {
256 obj_type,
257 data: self.data_decompressed.clone(),
258 hash,
259 chain_len: 0,
260 },
261 _ => {
262 unreachable!("delta object should not persist!")
263 }
264 }
265 }
266
267 pub fn to_entry_metadata(&self) -> MetaAttached<Entry, EntryMeta> {
269 match self.info {
270 CacheObjectInfo::BaseObject(obj_type, hash) => {
271 let entry = Entry {
272 obj_type,
273 data: self.data_decompressed.clone(),
274 hash,
275 chain_len: 0,
276 };
277 let meta = EntryMeta {
278 pack_offset: Some(self.offset),
280 crc32: Some(self.crc32),
281 is_delta: Some(self.is_delta_in_pack),
282 ..Default::default()
283 };
284 MetaAttached { inner: entry, meta }
285 }
286
287 _ => {
288 unreachable!("delta object should not persist!")
289 }
290 }
291 }
292}
293
294pub trait ArcWrapperBounds:
296 HeapSize + Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static
297{
298}
299impl<T: HeapSize + Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static> ArcWrapperBounds
302 for T
303{
304}
305
306pub struct ArcWrapper<T: ArcWrapperBounds> {
310 pub data: Arc<T>,
311 complete_signal: Arc<AtomicBool>,
312 pool: Option<Arc<ThreadPool>>,
313 pub store_path: Option<PathBuf>, }
315impl<T: ArcWrapperBounds> ArcWrapper<T> {
316 pub fn new(data: Arc<T>, share_flag: Arc<AtomicBool>, pool: Option<Arc<ThreadPool>>) -> Self {
318 ArcWrapper {
319 data,
320 complete_signal: share_flag,
321 pool,
322 store_path: None,
323 }
324 }
325 pub fn set_store_path(&mut self, path: PathBuf) {
327 self.store_path = Some(path);
328 }
329}
330
331impl<T: ArcWrapperBounds> HeapSize for ArcWrapper<T> {
332 fn heap_size(&self) -> usize {
333 self.data.heap_size()
334 }
335}
336
337impl<T: ArcWrapperBounds> Clone for ArcWrapper<T> {
338 fn clone(&self) -> Self {
340 ArcWrapper {
341 data: self.data.clone(),
342 complete_signal: self.complete_signal.clone(),
343 pool: self.pool.clone(),
344 store_path: None,
345 }
346 }
347}
348
349impl<T: ArcWrapperBounds> Deref for ArcWrapper<T> {
350 type Target = Arc<T>;
351 fn deref(&self) -> &Self::Target {
352 &self.data
353 }
354}
355impl<T: ArcWrapperBounds> Drop for ArcWrapper<T> {
356 fn drop(&mut self) {
359 if !self.complete_signal.load(Ordering::Acquire)
360 && let Some(path) = &self.store_path
361 {
362 match &self.pool {
363 Some(pool) => {
364 let data_copy = self.data.clone();
365 let path_copy = path.clone();
366 let complete_signal = self.complete_signal.clone();
367 while pool.queued_count() > 2000 {
370 std::thread::yield_now();
371 }
372 pool.execute(move || {
373 if !complete_signal.load(Ordering::Acquire) {
374 let res = data_copy.f_save(&path_copy);
375 if let Err(e) = res {
376 println!("[f_save] {path_copy:?} error: {e:?}");
377 }
378 }
379 });
380 }
381 None => {
382 let res = self.data.f_save(path);
383 if let Err(e) = res {
384 println!("[f_save] {path:?} error: {e:?}");
385 }
386 }
387 }
388 }
389 }
390}
391#[cfg(test)]
392mod test {
393 use std::{fs, sync::Mutex};
394
395 use lru_mem::LruCache;
396
397 use super::*;
398 use crate::hash::{HashKind, set_hash_kind_for_test};
399
400 fn make_obj(size: usize) -> CacheObject {
402 CacheObject {
403 info: CacheObjectInfo::BaseObject(ObjectType::Blob, ObjectHash::default()),
404 offset: 0,
405 crc32: 0,
406 data_decompressed: vec![0; size],
407 mem_recorder: None,
408 is_delta_in_pack: false,
409 }
410 }
411
412 #[test]
414 fn test_heap_size_record() {
415 for (kind, size) in [(HashKind::Sha1, 1024usize), (HashKind::Sha256, 2048usize)] {
416 let _guard = set_hash_kind_for_test(kind);
417 let mut obj = make_obj(size);
418 let mem = Arc::new(AtomicUsize::default());
419 assert_eq!(mem.load(Ordering::Relaxed), 0);
420 obj.set_mem_recorder(mem.clone());
421 obj.record_mem_size();
422 assert_eq!(mem.load(Ordering::Relaxed), obj.mem_size());
423 drop(obj);
424 assert_eq!(mem.load(Ordering::Relaxed), 0);
425 }
426 }
427
428 #[test]
430 fn test_cache_object_with_same_size() {
431 for (kind, size) in [(HashKind::Sha1, 1024usize), (HashKind::Sha256, 2048usize)] {
432 let _guard = set_hash_kind_for_test(kind);
433 let a = make_obj(size);
434 assert_eq!(a.heap_size(), size);
435
436 let b = ArcWrapper::new(Arc::new(a.clone()), Arc::new(AtomicBool::new(false)), None);
437 assert_eq!(b.heap_size(), size);
438 }
439 }
440
441 #[test]
443 fn test_cache_object_with_lru() {
444 for (kind, cap, size_a, size_b) in [
445 (
446 HashKind::Sha1,
447 2048usize,
448 1024usize,
449 (1024.0 * 1.5) as usize,
450 ),
451 (
452 HashKind::Sha256,
453 4096usize,
454 2048usize,
455 (2048.0 * 1.5) as usize,
456 ),
457 ] {
458 let _guard = set_hash_kind_for_test(kind);
459 let mut cache = LruCache::new(cap);
460
461 let hash_a = ObjectHash::default();
462 let hash_b = ObjectHash::new(b"b"); let a = make_obj(size_a);
464 let b = make_obj(size_b);
465
466 {
467 let r = cache.insert(
468 hash_a.to_string(),
469 ArcWrapper::new(Arc::new(a.clone()), Arc::new(AtomicBool::new(true)), None),
470 );
471 assert!(r.is_ok());
472 }
473
474 {
475 let r = cache.try_insert(
476 hash_b.to_string(),
477 ArcWrapper::new(Arc::new(b.clone()), Arc::new(AtomicBool::new(true)), None),
478 );
479 assert!(r.is_err());
480 if let Err(lru_mem::TryInsertError::WouldEjectLru { .. }) = r {
481 } else {
483 panic!("Expected WouldEjectLru error");
484 }
485
486 let r = cache.insert(
487 hash_b.to_string(),
488 ArcWrapper::new(Arc::new(b.clone()), Arc::new(AtomicBool::new(true)), None),
489 );
490 assert!(r.is_ok());
491 }
492
493 {
494 let r = cache.get(&hash_a.to_string());
495 assert!(r.is_none());
496 }
497 }
498 }
499
500 #[derive(Serialize, Deserialize)]
502 struct Test {
503 a: usize,
504 }
505 impl Drop for Test {
506 fn drop(&mut self) {
507 println!("drop Test");
508 }
509 }
510 impl HeapSize for Test {
511 fn heap_size(&self) -> usize {
512 self.a
513 }
514 }
515 #[test]
516 fn test_lru_drop() {
517 println!("insert a");
518 let cache = LruCache::new(2048);
519 let cache = Arc::new(Mutex::new(cache));
520 {
521 let mut c = cache.as_ref().lock().unwrap();
522 let _ = c.insert(
523 "a",
524 ArcWrapper::new(
525 Arc::new(Test { a: 1024 }),
526 Arc::new(AtomicBool::new(true)),
527 None,
528 ),
529 );
530 }
531 println!("insert b, a should be ejected");
532 {
533 let mut c = cache.as_ref().lock().unwrap();
534 let _ = c.insert(
535 "b",
536 ArcWrapper::new(
537 Arc::new(Test { a: 1200 }),
538 Arc::new(AtomicBool::new(true)),
539 None,
540 ),
541 );
542 }
543 let b = {
544 let mut c = cache.as_ref().lock().unwrap();
545 c.get("b").cloned()
546 };
547 println!("insert c, b should not be ejected");
548 {
549 let mut c = cache.as_ref().lock().unwrap();
550 let _ = c.insert(
551 "c",
552 ArcWrapper::new(
553 Arc::new(Test { a: 1200 }),
554 Arc::new(AtomicBool::new(true)),
555 None,
556 ),
557 );
558 }
559 println!("user b: {}", b.as_ref().unwrap().a);
560 println!("test over, enject all");
561 }
562
563 #[test]
564 fn test_cache_object_serialize() {
565 for (kind, size) in [(HashKind::Sha1, 1024usize), (HashKind::Sha256, 2048usize)] {
566 let _guard = set_hash_kind_for_test(kind);
567 let a = make_obj(size);
568 let s = bincode::serde::encode_to_vec(&a, bincode::config::standard()).unwrap();
569 let b: CacheObject = bincode::serde::decode_from_slice(&s, bincode::config::standard())
570 .unwrap()
571 .0;
572 assert_eq!(a.info, b.info);
573 assert_eq!(a.data_decompressed, b.data_decompressed);
574 assert_eq!(a.offset, b.offset);
575 }
576 }
577
578 #[test]
579 fn test_arc_wrapper_drop_store() {
580 let mut path = PathBuf::from(".cache_temp/test_arc_wrapper_drop_store");
581 fs::create_dir_all(&path).unwrap();
582 path.push("test_obj");
583 let mut a = ArcWrapper::new(Arc::new(1024), Arc::new(AtomicBool::new(false)), None);
584 a.set_store_path(path.clone());
585 drop(a);
586
587 assert!(path.exists());
588 path.pop();
589 fs::remove_dir_all(&path).unwrap();
590 let _ = fs::remove_dir(".cache_temp");
592 }
593
594 #[test]
595 fn test_arc_wrapper_with_lru() {
597 let mut cache = LruCache::new(1500);
598 let path = PathBuf::from(".cache_temp/test_arc_wrapper_with_lru");
599 let _ = fs::remove_dir_all(&path);
600 fs::create_dir_all(&path).unwrap();
601 let shared_flag = Arc::new(AtomicBool::new(false));
602
603 let a_path = path.join("a");
605 {
606 let mut a = ArcWrapper::new(Arc::new(Test { a: 1024 }), shared_flag.clone(), None);
607 a.set_store_path(a_path.clone());
608 let b = ArcWrapper::new(Arc::new(1024), shared_flag.clone(), None);
609 assert!(b.store_path.is_none());
610
611 println!("insert a with heap size: {:?}", a.heap_size());
612 let rt = cache.insert("a", a);
613 if let Err(e) = rt {
614 panic!("{}", format!("insert a failed: {:?}", e.to_string()));
615 }
616 println!("after insert a, cache used = {}", cache.current_size());
617 }
618 assert!(!a_path.exists());
619
620 let b_path = path.join("b");
621 {
623 let mut b = ArcWrapper::new(Arc::new(Test { a: 996 }), shared_flag.clone(), None);
624 b.set_store_path(b_path.clone());
625 let rt = cache.insert("b", b);
626 if let Err(e) = rt {
627 panic!("{}", format!("insert a failed: {:?}", e.to_string()));
628 }
629 println!("after insert b, cache used = {}", cache.current_size());
630 }
631 assert!(a_path.exists());
632 assert!(!b_path.exists());
633 shared_flag.store(true, Ordering::Release);
634 fs::remove_dir_all(path).unwrap();
635 let _ = fs::remove_dir(".cache_temp");
637 }
639}