1use std::{
5 io::{self, BufRead, Cursor, ErrorKind, Read},
6 path::PathBuf,
7 sync::{
8 Arc,
9 atomic::{AtomicUsize, Ordering},
10 },
11 thread::{self, JoinHandle},
12 time::Instant,
13};
14
15use axum::Error;
16use bytes::Bytes;
17use flate2::bufread::ZlibDecoder;
18use futures_util::{Stream, StreamExt};
19use threadpool::ThreadPool;
20use tokio::sync::mpsc::UnboundedSender;
21use uuid::Uuid;
22
23use super::cache_object::CacheObjectInfo;
24use crate::{
25 errors::GitError,
26 hash::{ObjectHash, get_hash_kind, set_hash_kind},
27 internal::{
28 metadata::{EntryMeta, MetaAttached},
29 object::types::ObjectType,
30 pack::{
31 DEFAULT_TMP_DIR, Pack,
32 cache::{_Cache, Caches},
33 cache_object::{CacheObject, MemSizeRecorder},
34 channel_reader::StreamBufReader,
35 entry::Entry,
36 utils,
37 waitlist::Waitlist,
38 wrapper::Wrapper,
39 },
40 },
41 utils::CountingReader,
42 zstdelta,
43};
44
45struct CrcCountingReader<'a, R> {
48 inner: R,
49 bytes_read: u64,
50 crc: &'a mut crc32fast::Hasher,
51}
52impl<R: Read> Read for CrcCountingReader<'_, R> {
53 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
54 let n = self.inner.read(buf)?;
55 self.bytes_read += n as u64;
56 self.crc.update(&buf[..n]);
57 Ok(n)
58 }
59}
60impl<R: BufRead> BufRead for CrcCountingReader<'_, R> {
61 fn fill_buf(&mut self) -> io::Result<&[u8]> {
62 self.inner.fill_buf()
63 }
64 fn consume(&mut self, amt: usize) {
65 let buf = self.inner.fill_buf().unwrap_or(&[]);
66 self.crc.update(&buf[..amt.min(buf.len())]);
67 self.bytes_read += amt as u64;
68 self.inner.consume(amt);
69 }
70}
71
72struct SharedParams {
74 pub pool: Arc<ThreadPool>,
75 pub waitlist: Arc<Waitlist>,
76 pub caches: Arc<Caches>,
77 pub cache_objs_mem_size: Arc<AtomicUsize>,
78 pub callback: Arc<dyn Fn(MetaAttached<Entry, EntryMeta>) + Sync + Send>,
79}
80
81impl Drop for Pack {
82 fn drop(&mut self) {
83 if self.clean_tmp {
84 self.caches.remove_tmp_dir();
85 }
86 }
87}
88
89impl Pack {
90 pub fn new(
100 thread_num: Option<usize>,
101 mem_limit: Option<usize>,
102 temp_path: Option<PathBuf>,
103 clean_tmp: bool,
104 ) -> Self {
105 let mut temp_path = temp_path.unwrap_or(PathBuf::from(DEFAULT_TMP_DIR));
106 loop {
108 let sub_dir = Uuid::new_v4().to_string()[..8].to_string();
109 temp_path.push(sub_dir);
110 if !temp_path.exists() {
111 break;
112 }
113 temp_path.pop();
114 }
115 let thread_num = thread_num.unwrap_or_else(num_cpus::get);
116 let cache_mem_size = mem_limit.map(|mem_limit| mem_limit * 4 / 5);
117 Pack {
118 number: 0,
119 signature: ObjectHash::default(),
120 objects: Vec::new(),
121 pool: Arc::new(ThreadPool::new(thread_num)),
122 waitlist: Arc::new(Waitlist::new()),
123 caches: Arc::new(Caches::new(cache_mem_size, temp_path, thread_num)),
124 mem_limit,
125 cache_objs_mem: Arc::new(AtomicUsize::default()),
126 clean_tmp,
127 }
128 }
129
130 pub fn check_header(pack: &mut impl BufRead) -> Result<(u32, Vec<u8>), GitError> {
154 let mut header_data = Vec::new();
156
157 let mut magic = [0; 4];
159 let result = pack.read_exact(&mut magic);
161 match result {
162 Ok(_) => {
163 header_data.extend_from_slice(&magic);
165
166 if magic != *b"PACK" {
168 return Err(GitError::InvalidPackHeader(format!(
170 "{},{},{},{}",
171 magic[0], magic[1], magic[2], magic[3]
172 )));
173 }
174 }
175 Err(e) => {
176 return Err(GitError::InvalidPackFile(format!(
178 "Error reading magic identifier: {e}"
179 )));
180 }
181 }
182
183 let mut version_bytes = [0; 4];
185 let result = pack.read_exact(&mut version_bytes); match result {
187 Ok(_) => {
188 header_data.extend_from_slice(&version_bytes);
190
191 let version = u32::from_be_bytes(version_bytes);
193 if version != 2 {
194 return Err(GitError::InvalidPackFile(format!(
196 "Version Number is {version}, not 2"
197 )));
198 }
199 }
200 Err(e) => {
201 return Err(GitError::InvalidPackFile(format!(
203 "Error reading version number: {e}"
204 )));
205 }
206 }
207
208 let mut object_num_bytes = [0; 4];
210 let result = pack.read_exact(&mut object_num_bytes);
212 match result {
213 Ok(_) => {
214 header_data.extend_from_slice(&object_num_bytes);
216 let object_num = u32::from_be_bytes(object_num_bytes);
218 Ok((object_num, header_data))
220 }
221 Err(e) => {
222 Err(GitError::InvalidPackFile(format!(
224 "Error reading object number: {e}"
225 )))
226 }
227 }
228 }
229
230 pub fn decompress_data(
242 pack: &mut (impl BufRead + Send),
243 expected_size: usize,
244 ) -> Result<(Vec<u8>, usize), GitError> {
245 let mut buf = Vec::with_capacity(expected_size);
247
248 let mut counting_reader = CountingReader::new(pack);
249 let mut deflate = ZlibDecoder::new(&mut counting_reader);
252 match deflate.read_to_end(&mut buf) {
254 Ok(_) => {
255 if buf.len() != expected_size {
257 Err(GitError::InvalidPackFile(format!(
258 "The object size {} does not match the expected size {}",
259 buf.len(),
260 expected_size
261 )))
262 } else {
263 let actual_input_bytes = counting_reader.bytes_read as usize;
265 Ok((buf, actual_input_bytes))
266 }
267 }
268 Err(e) => {
269 Err(GitError::InvalidPackFile(format!(
271 "Decompression error: {e}"
272 )))
273 }
274 }
275 }
276
277 pub fn decode_pack_object(
289 pack: &mut (impl BufRead + Send),
290 offset: &mut usize,
291 ) -> Result<CacheObject, GitError> {
292 let init_offset = *offset;
293 let mut hasher = crc32fast::Hasher::new();
294 let mut reader = CrcCountingReader {
295 inner: pack,
296 bytes_read: 0,
297 crc: &mut hasher,
298 };
299
300 let (type_bits, size) = match utils::read_type_and_varint_size(&mut reader, offset) {
303 Ok(result) => result,
304 Err(e) => {
305 return Err(GitError::InvalidPackFile(format!("Read error: {e}")));
308 }
309 };
310
311 let t = ObjectType::from_u8(type_bits)?;
313
314 match t {
315 ObjectType::Commit | ObjectType::Tree | ObjectType::Blob | ObjectType::Tag => {
316 let (data, raw_size) = Pack::decompress_data(&mut reader, size)?;
317 *offset += raw_size;
318 let crc32 = hasher.finalize();
319 Ok(CacheObject::new_for_undeltified(
320 t,
321 data,
322 init_offset,
323 crc32,
324 ))
325 }
326 ObjectType::OffsetDelta | ObjectType::OffsetZstdelta => {
327 let (delta_offset, bytes) = utils::read_offset_encoding(&mut reader).unwrap();
328 *offset += bytes;
329
330 let (data, raw_size) = Pack::decompress_data(&mut reader, size)?;
331 *offset += raw_size;
332
333 let base_offset = init_offset
335 .checked_sub(delta_offset as usize)
336 .ok_or_else(|| {
337 GitError::InvalidObjectInfo("Invalid OffsetDelta offset".to_string())
338 })
339 .unwrap();
340
341 let mut reader = Cursor::new(&data);
342 let (_, final_size) = utils::read_delta_object_size(&mut reader)?;
343
344 let obj_info = match t {
345 ObjectType::OffsetDelta => {
346 CacheObjectInfo::OffsetDelta(base_offset, final_size)
347 }
348 ObjectType::OffsetZstdelta => {
349 CacheObjectInfo::OffsetZstdelta(base_offset, final_size)
350 }
351 _ => unreachable!(),
352 };
353 let crc32 = hasher.finalize();
354 Ok(CacheObject {
355 info: obj_info,
356 offset: init_offset,
357 crc32,
358 data_decompressed: data,
359 mem_recorder: None,
360 is_delta_in_pack: true,
361 })
362 }
363 ObjectType::HashDelta => {
364 let ref_sha = ObjectHash::from_stream(&mut reader).unwrap();
366 *offset += get_hash_kind().size();
368
369 let (data, raw_size) = Pack::decompress_data(&mut reader, size)?;
370 *offset += raw_size;
371
372 let mut reader = Cursor::new(&data);
373 let (_, final_size) = utils::read_delta_object_size(&mut reader)?;
374
375 let crc32 = hasher.finalize();
376
377 Ok(CacheObject {
378 info: CacheObjectInfo::HashDelta(ref_sha, final_size),
379 offset: init_offset,
380 crc32,
381 data_decompressed: data,
382 mem_recorder: None,
383 is_delta_in_pack: true,
384 })
385 }
386 }
387 }
388
389 pub fn decode<F, C>(
396 &mut self,
397 pack: &mut (impl BufRead + Send),
398 callback: F,
399 pack_id_callback: Option<C>,
400 ) -> Result<(), GitError>
401 where
402 F: Fn(MetaAttached<Entry, EntryMeta>) + Sync + Send + 'static,
403 C: FnOnce(ObjectHash) + Send + 'static,
404 {
405 let time = Instant::now();
406 let mut last_update_time = time.elapsed().as_millis();
407 let log_info = |_i: usize, pack: &Pack| {
408 tracing::info!(
409 "time {:.2} s \t decode: {:?} \t dec-num: {} \t cah-num: {} \t Objs: {} MB \t CacheUsed: {} MB",
410 time.elapsed().as_millis() as f64 / 1000.0,
411 _i,
412 pack.pool.queued_count(),
413 pack.caches.queued_tasks(),
414 pack.cache_objs_mem_used() / 1024 / 1024,
415 pack.caches.memory_used() / 1024 / 1024
416 );
417 };
418 let callback = Arc::new(callback);
419
420 let caches = self.caches.clone();
421 let mut reader = Wrapper::new(io::BufReader::new(pack));
422
423 let result = Pack::check_header(&mut reader);
424 match result {
425 Ok((object_num, _)) => {
426 self.number = object_num as usize;
427 }
428 Err(e) => {
429 return Err(e);
430 }
431 }
432 tracing::info!("The pack file has {} objects", self.number);
433 let mut offset: usize = 12;
434 let mut i = 0;
435 while i < self.number {
436 if i % 1000 == 0 {
438 let time_now = time.elapsed().as_millis();
439 if time_now - last_update_time > 1000 {
440 log_info(i, self);
441 last_update_time = time_now;
442 }
443 }
444 while self.pool.queued_count() > 2000
447 || self
448 .mem_limit
449 .map(|limit| self.memory_used() > limit)
450 .unwrap_or(false)
451 {
452 thread::yield_now();
453 }
454 let r: Result<CacheObject, GitError> =
455 Pack::decode_pack_object(&mut reader, &mut offset);
456 match r {
457 Ok(mut obj) => {
458 obj.set_mem_recorder(self.cache_objs_mem.clone());
459 obj.record_mem_size();
460
461 let params = Arc::new(SharedParams {
463 pool: self.pool.clone(),
464 waitlist: self.waitlist.clone(),
465 caches: self.caches.clone(),
466 cache_objs_mem_size: self.cache_objs_mem.clone(),
467 callback: callback.clone(),
468 });
469
470 let caches = caches.clone();
471 let waitlist = self.waitlist.clone();
472 let kind = get_hash_kind();
473 self.pool.execute(move || {
474 set_hash_kind(kind);
475 match obj.info {
476 CacheObjectInfo::BaseObject(_, _) => {
477 Self::cache_obj_and_process_waitlist(params, obj);
478 }
479 CacheObjectInfo::OffsetDelta(base_offset, _)
480 | CacheObjectInfo::OffsetZstdelta(base_offset, _) => {
481 if let Some(base_obj) = caches.get_by_offset(base_offset) {
482 Self::process_delta(params, obj, base_obj);
483 } else {
484 waitlist.insert_offset(base_offset, obj);
487 if let Some(base_obj) = caches.get_by_offset(base_offset) {
489 Self::process_waitlist(params, base_obj);
490 }
491 }
492 }
493 CacheObjectInfo::HashDelta(base_ref, _) => {
494 if let Some(base_obj) = caches.get_by_hash(base_ref) {
495 Self::process_delta(params, obj, base_obj);
496 } else {
497 waitlist.insert_ref(base_ref, obj);
498 if let Some(base_obj) = caches.get_by_hash(base_ref) {
499 Self::process_waitlist(params, base_obj);
500 }
501 }
502 }
503 }
504 });
505 }
506 Err(e) => {
507 return Err(e);
508 }
509 }
510 i += 1;
511 }
512 log_info(i, self);
513 let render_hash = reader.final_hash();
514 self.signature = ObjectHash::from_stream(&mut reader).unwrap();
515
516 if render_hash != self.signature {
517 return Err(GitError::InvalidPackFile(format!(
518 "The pack file hash {} does not match the trailer hash {}",
519 render_hash, self.signature
520 )));
521 }
522
523 let end = utils::is_eof(&mut reader);
524 if !end {
525 return Err(GitError::InvalidPackFile(
526 "The pack file is not at the end".to_string(),
527 ));
528 }
529
530 self.pool.join(); if let Some(pack_callback) = pack_id_callback {
534 pack_callback(self.signature);
535 }
536 assert_eq!(self.waitlist.map_offset.len(), 0);
539 assert_eq!(self.waitlist.map_ref.len(), 0);
540 assert_eq!(self.number, caches.total_inserted());
541 tracing::info!(
542 "The pack file has been decoded successfully, takes: [ {:?} ]",
543 time.elapsed()
544 );
545 self.caches.clear(); assert_eq!(self.cache_objs_mem_used(), 0); Ok(())
554 }
555
556 pub fn decode_async(
559 mut self,
560 mut pack: impl BufRead + Send + 'static,
561 sender: UnboundedSender<Entry>,
562 ) -> JoinHandle<Pack> {
563 let kind = get_hash_kind();
564 thread::spawn(move || {
565 set_hash_kind(kind);
566 self.decode(
567 &mut pack,
568 move |entry| {
569 if let Err(e) = sender.send(entry.inner) {
570 eprintln!("Channel full, failed to send entry: {e:?}");
571 }
572 },
573 None::<fn(ObjectHash)>,
574 )
575 .unwrap();
576 self
577 })
578 }
579
580 pub async fn decode_stream(
582 mut self,
583 mut stream: impl Stream<Item = Result<Bytes, Error>> + Unpin + Send + 'static,
584 sender: UnboundedSender<MetaAttached<Entry, EntryMeta>>,
585 pack_hash_send: Option<UnboundedSender<ObjectHash>>,
586 ) -> Self {
587 let kind = get_hash_kind();
588 let (tx, rx) = std::sync::mpsc::channel();
589 let mut reader = StreamBufReader::new(rx);
590 tokio::spawn(async move {
591 while let Some(chunk) = stream.next().await {
592 let data = chunk.unwrap().to_vec();
593 if let Err(e) = tx.send(data) {
594 eprintln!("Sending Error: {e:?}");
595 break;
596 }
597 }
598 });
599 tokio::task::spawn_blocking(move || {
602 set_hash_kind(kind);
603 self.decode(
604 &mut reader,
605 move |entry: MetaAttached<Entry, EntryMeta>| {
606 if let Err(e) = sender.send(entry) {
608 eprintln!("unbound channel Sending Error: {e:?}");
609 }
610 },
611 Some(move |pack_id: ObjectHash| {
612 if let Some(pack_id_send) = pack_hash_send
613 && let Err(e) = pack_id_send.send(pack_id)
614 {
615 eprintln!("unbound channel Sending Error: {e:?}");
616 }
617 }),
618 )
619 .unwrap();
620 self
621 })
622 .await
623 .unwrap()
624 }
625
626 fn memory_used(&self) -> usize {
628 self.cache_objs_mem_used() + self.caches.memory_used_index()
629 }
630
631 fn cache_objs_mem_used(&self) -> usize {
633 self.cache_objs_mem.load(Ordering::Acquire)
634 }
635
636 fn process_delta(
639 shared_params: Arc<SharedParams>,
640 delta_obj: CacheObject,
641 base_obj: Arc<CacheObject>,
642 ) {
643 shared_params.pool.clone().execute(move || {
644 let mut new_obj = match delta_obj.info {
645 CacheObjectInfo::OffsetDelta(_, _) | CacheObjectInfo::HashDelta(_, _) => {
646 Pack::rebuild_delta(delta_obj, base_obj)
647 }
648 CacheObjectInfo::OffsetZstdelta(_, _) => {
649 Pack::rebuild_zstdelta(delta_obj, base_obj)
650 }
651 _ => unreachable!(),
652 };
653
654 new_obj.set_mem_recorder(shared_params.cache_objs_mem_size.clone());
655 new_obj.record_mem_size();
656 Self::cache_obj_and_process_waitlist(shared_params, new_obj); });
658 }
659
660 fn cache_obj_and_process_waitlist(shared_params: Arc<SharedParams>, new_obj: CacheObject) {
662 (shared_params.callback)(new_obj.to_entry_metadata());
663 let new_obj = shared_params.caches.insert(
664 new_obj.offset,
665 new_obj.base_object_hash().unwrap(),
666 new_obj,
667 );
668 Self::process_waitlist(shared_params, new_obj);
669 }
670
671 fn process_waitlist(shared_params: Arc<SharedParams>, base_obj: Arc<CacheObject>) {
672 let wait_objs = shared_params
673 .waitlist
674 .take(base_obj.offset, base_obj.base_object_hash().unwrap());
675 for obj in wait_objs {
676 Self::process_delta(shared_params.clone(), obj, base_obj.clone());
678 }
679 }
680
681 pub fn rebuild_delta(delta_obj: CacheObject, base_obj: Arc<CacheObject>) -> CacheObject {
684 const COPY_INSTRUCTION_FLAG: u8 = 1 << 7;
685 const COPY_OFFSET_BYTES: u8 = 4;
686 const COPY_SIZE_BYTES: u8 = 3;
687 const COPY_ZERO_SIZE: usize = 0x10000;
688
689 let mut stream = Cursor::new(&delta_obj.data_decompressed);
690
691 let (base_size, result_size) = utils::read_delta_object_size(&mut stream).unwrap();
694
695 let base_info = &base_obj.data_decompressed;
697 assert_eq!(base_info.len(), base_size, "Base object size mismatch");
698
699 let mut result = Vec::with_capacity(result_size);
700
701 loop {
702 let instruction = match utils::read_bytes(&mut stream) {
704 Ok([instruction]) => instruction,
705 Err(err) if err.kind() == ErrorKind::UnexpectedEof => break,
706 Err(err) => {
707 panic!(
708 "{}",
709 GitError::DeltaObjectError(format!("Wrong instruction in delta :{err}"))
710 );
711 }
712 };
713
714 if instruction & COPY_INSTRUCTION_FLAG == 0 {
715 if instruction == 0 {
717 panic!(
719 "{}",
720 GitError::DeltaObjectError(String::from("Invalid data instruction"))
721 );
722 }
723
724 let mut data = vec![0; instruction as usize];
726 stream.read_exact(&mut data).unwrap();
727 result.extend_from_slice(&data);
728 } else {
729 let mut nonzero_bytes = instruction;
734 let offset =
735 utils::read_partial_int(&mut stream, COPY_OFFSET_BYTES, &mut nonzero_bytes)
736 .unwrap();
737 let mut size =
738 utils::read_partial_int(&mut stream, COPY_SIZE_BYTES, &mut nonzero_bytes)
739 .unwrap();
740 if size == 0 {
741 size = COPY_ZERO_SIZE;
743 }
744 let base_data = base_info.get(offset..(offset + size)).ok_or_else(|| {
746 GitError::DeltaObjectError("Invalid copy instruction".to_string())
747 });
748
749 match base_data {
750 Ok(data) => result.extend_from_slice(data),
751 Err(e) => panic!("{}", e),
752 }
753 }
754 }
755 assert_eq!(result_size, result.len(), "Result size mismatch");
756
757 let hash = utils::calculate_object_hash(base_obj.object_type(), &result);
758 CacheObject {
760 info: CacheObjectInfo::BaseObject(base_obj.object_type(), hash),
761 offset: delta_obj.offset,
762 crc32: delta_obj.crc32,
763 data_decompressed: result,
764 mem_recorder: None,
765 is_delta_in_pack: delta_obj.is_delta_in_pack,
766 } }
769 pub fn rebuild_zstdelta(delta_obj: CacheObject, base_obj: Arc<CacheObject>) -> CacheObject {
770 let result = zstdelta::apply(&base_obj.data_decompressed, &delta_obj.data_decompressed)
771 .expect("Failed to apply zstdelta");
772 let hash = utils::calculate_object_hash(base_obj.object_type(), &result);
773 CacheObject {
774 info: CacheObjectInfo::BaseObject(base_obj.object_type(), hash),
775 offset: delta_obj.offset,
776 crc32: delta_obj.crc32,
777 data_decompressed: result,
778 mem_recorder: None,
779 is_delta_in_pack: delta_obj.is_delta_in_pack,
780 } }
783}
784
785#[cfg(test)]
786mod tests {
787 use std::{
788 env, fs,
789 io::{BufReader, Cursor, prelude::*},
790 path::PathBuf,
791 sync::{
792 Arc,
793 atomic::{AtomicUsize, Ordering},
794 },
795 };
796
797 use flate2::{Compression, write::ZlibEncoder};
798 use futures_util::TryStreamExt;
799 use tokio_util::io::ReaderStream;
800
801 use crate::{
802 hash::{HashKind, ObjectHash, set_hash_kind},
803 internal::pack::{Pack, tests::init_logger},
804 };
805
806 #[tokio::test]
807 async fn test_pack_check_header() {
808 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
809 source.push("tests/data/packs/medium-sha1.pack");
810
811 let f = fs::File::open(source).unwrap();
812 let mut buf_reader = BufReader::new(f);
813 let (object_num, _) = Pack::check_header(&mut buf_reader).unwrap();
814
815 assert_eq!(object_num, 35031);
816 }
817
818 #[test]
819 fn test_decompress_data() {
820 let data = b"Hello, world!"; let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
822 encoder.write_all(data).unwrap();
823 let compressed_data = encoder.finish().unwrap();
824 let compressed_size = compressed_data.len();
825
826 let mut cursor: Cursor<Vec<u8>> = Cursor::new(compressed_data);
828 let expected_size = data.len();
829
830 let result = Pack::decompress_data(&mut cursor, expected_size);
832 match result {
833 Ok((decompressed_data, bytes_read)) => {
834 assert_eq!(bytes_read, compressed_size);
835 assert_eq!(decompressed_data, data);
836 }
837 Err(e) => panic!("Decompression failed: {e:?}"),
838 }
839 }
840
841 fn run_decode_no_delta(rel_path: &str, kind: HashKind) {
843 let _guard = set_hash_kind(kind);
844 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
845 source.push(rel_path);
846
847 let tmp = PathBuf::from("/tmp/.cache_temp");
848
849 let f = fs::File::open(source).unwrap();
850 let mut buffered = BufReader::new(f);
851 let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
852 p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
853 .unwrap();
854 }
855 #[test]
856 fn test_pack_decode_without_delta() {
857 run_decode_no_delta("tests/data/packs/small-sha1.pack", HashKind::Sha1);
858 run_decode_no_delta("tests/data/packs/small-sha256.pack", HashKind::Sha256);
859 }
860
861 fn run_decode_with_ref_delta(rel_path: &str, kind: HashKind) {
863 let _guard = set_hash_kind(kind);
864 init_logger();
865
866 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
867 source.push(rel_path);
868
869 let tmp = PathBuf::from("/tmp/.cache_temp");
870
871 let f = fs::File::open(source).unwrap();
872 let mut buffered = BufReader::new(f);
873 let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
874 p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
875 .unwrap();
876 }
877 #[test]
878 fn test_pack_decode_with_ref_delta() {
879 run_decode_with_ref_delta("tests/data/packs/ref-delta-sha1.pack", HashKind::Sha1);
880 run_decode_with_ref_delta("tests/data/packs/ref-delta-sha256.pack", HashKind::Sha256);
881 }
882
883 fn run_decode_no_mem_limit(rel_path: &str, kind: HashKind) {
885 let _guard = set_hash_kind(kind);
886 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
887 source.push(rel_path);
888
889 let tmp = PathBuf::from("/tmp/.cache_temp");
890
891 let f = fs::File::open(source).unwrap();
892 let mut buffered = BufReader::new(f);
893 let mut p = Pack::new(None, None, Some(tmp), true);
894 p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
895 .unwrap();
896 }
897 #[test]
898 fn test_pack_decode_no_mem_limit() {
899 run_decode_no_mem_limit("tests/data/packs/small-sha1.pack", HashKind::Sha1);
900 run_decode_no_mem_limit("tests/data/packs/small-sha256.pack", HashKind::Sha256);
901 }
902
903 async fn run_decode_large_with_delta(rel_path: &str, kind: HashKind) {
905 let _guard = set_hash_kind(kind);
906 init_logger();
907 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
908 source.push(rel_path);
909
910 let tmp = PathBuf::from("/tmp/.cache_temp");
911
912 let f = fs::File::open(source).unwrap();
913 let mut buffered = BufReader::new(f);
914 let mut p = Pack::new(
915 Some(4),
916 Some(1024 * 1024 * 100), Some(tmp.clone()),
918 true,
919 );
920 let rt = p.decode(
921 &mut buffered,
922 |_obj| {
923 },
925 None::<fn(ObjectHash)>,
926 );
927 if let Err(e) = rt {
928 fs::remove_dir_all(tmp).unwrap();
929 panic!("Error: {e:?}");
930 }
931 }
932 #[tokio::test]
933 async fn test_pack_decode_with_large_file_with_delta_without_ref() {
934 run_decode_large_with_delta("tests/data/packs/medium-sha1.pack", HashKind::Sha1).await;
935 run_decode_large_with_delta("tests/data/packs/medium-sha256.pack", HashKind::Sha256).await;
936 } async fn run_decode_large_stream(rel_path: &str, kind: HashKind) {
940 let _guard = set_hash_kind(kind);
941 init_logger();
942 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
943 source.push(rel_path);
944
945 let tmp = PathBuf::from("/tmp/.cache_temp");
946 let f = tokio::fs::File::open(source).await.unwrap();
947 let stream = ReaderStream::new(f).map_err(axum::Error::new);
948 let p = Pack::new(Some(4), Some(1024 * 1024 * 100), Some(tmp.clone()), true);
949
950 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
951 let handle = tokio::spawn(async move { p.decode_stream(stream, tx, None).await });
952 let count = Arc::new(AtomicUsize::new(0));
953 let count_c = count.clone();
954 let consume = tokio::spawn(async move {
956 let mut cnt = 0;
957 while let Some(_entry) = rx.recv().await {
958 cnt += 1;
959 }
960 tracing::info!("Received: {}", cnt);
961 count_c.store(cnt, Ordering::Release);
962 });
963 let p = handle.await.unwrap();
964 consume.await.unwrap();
965 assert_eq!(count.load(Ordering::Acquire), p.number);
966 assert_eq!(p.number, 35031);
967 }
968 #[tokio::test]
969 async fn test_decode_large_file_stream() {
970 run_decode_large_stream("tests/data/packs/medium-sha1.pack", HashKind::Sha1).await;
971 run_decode_large_stream("tests/data/packs/medium-sha256.pack", HashKind::Sha256).await;
972 }
973
974 async fn run_decode_large_file_async(rel_path: &str, kind: HashKind) {
976 let _guard = set_hash_kind(kind);
977 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
978 source.push(rel_path);
979
980 let tmp = PathBuf::from("/tmp/.cache_temp");
981 let f = fs::File::open(source).unwrap();
982 let buffered = BufReader::new(f);
983 let p = Pack::new(Some(4), Some(1024 * 1024 * 100), Some(tmp.clone()), true);
984
985 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
986 let handle = p.decode_async(buffered, tx); let mut cnt = 0;
988 while let Some(_entry) = rx.recv().await {
989 cnt += 1; }
991 let p = handle.join().unwrap();
992 assert_eq!(cnt, p.number);
993 }
994 #[tokio::test]
995 async fn test_decode_large_file_async() {
996 run_decode_large_file_async("tests/data/packs/medium-sha1.pack", HashKind::Sha1).await;
997 run_decode_large_file_async("tests/data/packs/medium-sha256.pack", HashKind::Sha256).await;
998 }
999
1000 fn run_decode_with_delta_no_ref(rel_path: &str, kind: HashKind) {
1002 let _guard = set_hash_kind(kind);
1003 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1004 source.push(rel_path);
1005
1006 let tmp = PathBuf::from("/tmp/.cache_temp");
1007
1008 let f = fs::File::open(source).unwrap();
1009 let mut buffered = BufReader::new(f);
1010 let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
1011 p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
1012 .unwrap();
1013 }
1014 #[test]
1015 fn test_pack_decode_with_delta_without_ref() {
1016 run_decode_with_delta_no_ref("tests/data/packs/medium-sha1.pack", HashKind::Sha1);
1017 run_decode_with_delta_no_ref("tests/data/packs/medium-sha256.pack", HashKind::Sha256);
1018 }
1019
1020 #[test] fn test_pack_decode_multi_task_with_large_file_with_delta_without_ref() {
1022 let rt = tokio::runtime::Builder::new_current_thread()
1023 .enable_all()
1024 .build()
1025 .unwrap();
1026 rt.block_on(async move {
1027 for (kind, path) in [
1029 (HashKind::Sha1, "tests/data/packs/medium-sha1.pack"),
1030 (HashKind::Sha256, "tests/data/packs/medium-sha256.pack"),
1031 ] {
1032 let f1 = run_decode_large_with_delta(path, kind);
1033 let f2 = run_decode_large_with_delta(path, kind);
1034 let _ = futures::future::join(f1, f2).await;
1035 }
1036 });
1037 }
1038}