1use std::{
5 io::{self, BufRead, Cursor, ErrorKind, Read},
6 path::PathBuf,
7 sync::{
8 Arc,
9 atomic::{AtomicUsize, Ordering},
10 },
11 thread::{self, JoinHandle},
12 time::Instant,
13};
14
15use axum::Error;
16use bytes::Bytes;
17use flate2::bufread::ZlibDecoder;
18use futures_util::{Stream, StreamExt};
19use threadpool::ThreadPool;
20use tokio::sync::mpsc::UnboundedSender;
21use uuid::Uuid;
22
23use crate::{
24 errors::GitError,
25 hash::{ObjectHash, get_hash_kind, set_hash_kind},
26 internal::{
27 metadata::{EntryMeta, MetaAttached},
28 object::types::ObjectType,
29 pack::{
30 DEFAULT_TMP_DIR, Pack,
31 cache::{_Cache, Caches},
32 cache_object::{CacheObject, CacheObjectInfo, MemSizeRecorder},
33 channel_reader::StreamBufReader,
34 entry::Entry,
35 utils,
36 waitlist::Waitlist,
37 wrapper::Wrapper,
38 },
39 },
40 utils::CountingReader,
41 zstdelta,
42};
43
44struct CrcCountingReader<'a, R> {
47 inner: R,
48 bytes_read: u64,
49 crc: &'a mut crc32fast::Hasher,
50}
51impl<R: Read> Read for CrcCountingReader<'_, R> {
52 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
53 let n = self.inner.read(buf)?;
54 self.bytes_read += n as u64;
55 self.crc.update(&buf[..n]);
56 Ok(n)
57 }
58}
59impl<R: BufRead> BufRead for CrcCountingReader<'_, R> {
60 fn fill_buf(&mut self) -> io::Result<&[u8]> {
61 self.inner.fill_buf()
62 }
63 fn consume(&mut self, amt: usize) {
64 let buf = self.inner.fill_buf().unwrap_or(&[]);
65 self.crc.update(&buf[..amt.min(buf.len())]);
66 self.bytes_read += amt as u64;
67 self.inner.consume(amt);
68 }
69}
70
71struct SharedParams {
73 pub pool: Arc<ThreadPool>,
74 pub waitlist: Arc<Waitlist>,
75 pub caches: Arc<Caches>,
76 pub cache_objs_mem_size: Arc<AtomicUsize>,
77 pub callback: Arc<dyn Fn(MetaAttached<Entry, EntryMeta>) + Sync + Send>,
78}
79
80impl Drop for Pack {
81 fn drop(&mut self) {
82 if self.clean_tmp {
83 self.caches.remove_tmp_dir();
84 }
85 }
86}
87
88impl Pack {
89 pub fn new(
99 thread_num: Option<usize>,
100 mem_limit: Option<usize>,
101 temp_path: Option<PathBuf>,
102 clean_tmp: bool,
103 ) -> Self {
104 let mut temp_path = temp_path.unwrap_or(PathBuf::from(DEFAULT_TMP_DIR));
105 loop {
107 let sub_dir = Uuid::new_v4().to_string()[..8].to_string();
108 temp_path.push(sub_dir);
109 if !temp_path.exists() {
110 break;
111 }
112 temp_path.pop();
113 }
114 let thread_num = thread_num.unwrap_or_else(num_cpus::get);
115 let cache_mem_size = mem_limit.map(|mem_limit| {
116 ((mem_limit as u128) * 4 / 5) as usize
118 });
119 Pack {
120 number: 0,
121 signature: ObjectHash::default(),
122 objects: Vec::new(),
123 pool: Arc::new(ThreadPool::new(thread_num)),
124 waitlist: Arc::new(Waitlist::new()),
125 caches: Arc::new(Caches::new(cache_mem_size, temp_path, thread_num)),
126 mem_limit,
127 cache_objs_mem: Arc::new(AtomicUsize::default()),
128 clean_tmp,
129 }
130 }
131
132 pub fn check_header(pack: &mut impl BufRead) -> Result<(u32, Vec<u8>), GitError> {
156 let mut header_data = Vec::new();
158
159 let mut magic = [0; 4];
161 let result = pack.read_exact(&mut magic);
163 match result {
164 Ok(_) => {
165 header_data.extend_from_slice(&magic);
167
168 if magic != *b"PACK" {
170 return Err(GitError::InvalidPackHeader(format!(
172 "{},{},{},{}",
173 magic[0], magic[1], magic[2], magic[3]
174 )));
175 }
176 }
177 Err(e) => {
178 return Err(GitError::InvalidPackFile(format!(
180 "Error reading magic identifier: {e}"
181 )));
182 }
183 }
184
185 let mut version_bytes = [0; 4];
187 let result = pack.read_exact(&mut version_bytes); match result {
189 Ok(_) => {
190 header_data.extend_from_slice(&version_bytes);
192
193 let version = u32::from_be_bytes(version_bytes);
195 if version != 2 {
196 return Err(GitError::InvalidPackFile(format!(
198 "Version Number is {version}, not 2"
199 )));
200 }
201 }
202 Err(e) => {
203 return Err(GitError::InvalidPackFile(format!(
205 "Error reading version number: {e}"
206 )));
207 }
208 }
209
210 let mut object_num_bytes = [0; 4];
212 let result = pack.read_exact(&mut object_num_bytes);
214 match result {
215 Ok(_) => {
216 header_data.extend_from_slice(&object_num_bytes);
218 let object_num = u32::from_be_bytes(object_num_bytes);
220 Ok((object_num, header_data))
222 }
223 Err(e) => {
224 Err(GitError::InvalidPackFile(format!(
226 "Error reading object number: {e}"
227 )))
228 }
229 }
230 }
231
232 pub fn decompress_data(
244 pack: &mut (impl BufRead + Send),
245 expected_size: usize,
246 ) -> Result<(Vec<u8>, usize), GitError> {
247 let mut buf = Vec::with_capacity(expected_size);
249
250 let mut counting_reader = CountingReader::new(pack);
251 let mut deflate = ZlibDecoder::new(&mut counting_reader);
254 match deflate.read_to_end(&mut buf) {
256 Ok(_) => {
257 if buf.len() != expected_size {
259 Err(GitError::InvalidPackFile(format!(
260 "The object size {} does not match the expected size {}",
261 buf.len(),
262 expected_size
263 )))
264 } else {
265 let actual_input_bytes = counting_reader.bytes_read as usize;
267 Ok((buf, actual_input_bytes))
268 }
269 }
270 Err(e) => {
271 Err(GitError::InvalidPackFile(format!(
273 "Decompression error: {e}"
274 )))
275 }
276 }
277 }
278
279 pub fn decode_pack_object(
291 pack: &mut (impl BufRead + Send),
292 offset: &mut usize,
293 ) -> Result<Option<CacheObject>, GitError> {
294 let init_offset = *offset;
295 let mut hasher = crc32fast::Hasher::new();
296 let mut reader = CrcCountingReader {
297 inner: pack,
298 bytes_read: 0,
299 crc: &mut hasher,
300 };
301
302 let (type_bits, size) = match utils::read_type_and_varint_size(&mut reader, offset) {
305 Ok(result) => result,
306 Err(e) => {
307 return Err(GitError::InvalidPackFile(format!("Read error: {e}")));
310 }
311 };
312
313 let t = ObjectType::from_pack_type_u8(type_bits)?;
315
316 match t {
317 ObjectType::Commit | ObjectType::Tree | ObjectType::Blob | ObjectType::Tag => {
318 let (data, raw_size) = Pack::decompress_data(&mut reader, size)?;
319 *offset += raw_size;
320 let crc32 = hasher.finalize();
321 Ok(Some(CacheObject::new_for_undeltified(
322 t,
323 data,
324 init_offset,
325 crc32,
326 )))
327 }
328 ObjectType::ContextSnapshot
329 | ObjectType::Decision
330 | ObjectType::Evidence
331 | ObjectType::PatchSet
332 | ObjectType::Plan
333 | ObjectType::Provenance
334 | ObjectType::Run
335 | ObjectType::Task
336 | ObjectType::ToolInvocation => {
337 Ok(None)
339 }
340 ObjectType::OffsetDelta | ObjectType::OffsetZstdelta => {
341 let (delta_offset, bytes) = utils::read_offset_encoding(&mut reader).unwrap();
342 *offset += bytes;
343
344 let (data, raw_size) = Pack::decompress_data(&mut reader, size)?;
345 *offset += raw_size;
346
347 let base_offset = init_offset
349 .checked_sub(delta_offset as usize)
350 .ok_or_else(|| {
351 GitError::InvalidObjectInfo("Invalid OffsetDelta offset".to_string())
352 })
353 .unwrap();
354
355 let mut reader = Cursor::new(&data);
356 let (_, final_size) = utils::read_delta_object_size(&mut reader)?;
357
358 let obj_info = match t {
359 ObjectType::OffsetDelta => {
360 CacheObjectInfo::OffsetDelta(base_offset, final_size)
361 }
362 ObjectType::OffsetZstdelta => {
363 CacheObjectInfo::OffsetZstdelta(base_offset, final_size)
364 }
365 _ => unreachable!(),
366 };
367 let crc32 = hasher.finalize();
368 Ok(Some(CacheObject {
369 info: obj_info,
370 offset: init_offset,
371 crc32,
372 data_decompressed: data,
373 mem_recorder: None,
374 is_delta_in_pack: true,
375 }))
376 }
377 ObjectType::HashDelta => {
378 let ref_sha = ObjectHash::from_stream(&mut reader).unwrap();
380 *offset += get_hash_kind().size();
382
383 let (data, raw_size) = Pack::decompress_data(&mut reader, size)?;
384 *offset += raw_size;
385
386 let mut reader = Cursor::new(&data);
387 let (_, final_size) = utils::read_delta_object_size(&mut reader)?;
388
389 let crc32 = hasher.finalize();
390
391 Ok(Some(CacheObject {
392 info: CacheObjectInfo::HashDelta(ref_sha, final_size),
393 offset: init_offset,
394 crc32,
395 data_decompressed: data,
396 mem_recorder: None,
397 is_delta_in_pack: true,
398 }))
399 }
400 }
401 }
402
403 pub fn decode<F, C>(
410 &mut self,
411 pack: &mut (impl BufRead + Send),
412 callback: F,
413 pack_id_callback: Option<C>,
414 ) -> Result<(), GitError>
415 where
416 F: Fn(MetaAttached<Entry, EntryMeta>) + Sync + Send + 'static,
417 C: FnOnce(ObjectHash) + Send + 'static,
418 {
419 let time = Instant::now();
420 let mut last_update_time = time.elapsed().as_millis();
421 let log_info = |_i: usize, pack: &Pack| {
422 tracing::info!(
423 "time {:.2} s \t decode: {:?} \t dec-num: {} \t cah-num: {} \t Objs: {} MB \t CacheUsed: {} MB",
424 time.elapsed().as_millis() as f64 / 1000.0,
425 _i,
426 pack.pool.queued_count(),
427 pack.caches.queued_tasks(),
428 pack.cache_objs_mem_used() / 1024 / 1024,
429 pack.caches.memory_used() / 1024 / 1024
430 );
431 };
432 let callback = Arc::new(callback);
433
434 let caches = self.caches.clone();
435 let mut reader = Wrapper::new(io::BufReader::new(pack));
436
437 let result = Pack::check_header(&mut reader);
438 match result {
439 Ok((object_num, _)) => {
440 self.number = object_num as usize;
441 }
442 Err(e) => {
443 return Err(e);
444 }
445 }
446 tracing::info!("The pack file has {} objects", self.number);
447 let mut offset: usize = 12;
448 let mut i = 0;
449 while i < self.number {
450 if i % 1000 == 0 {
452 let time_now = time.elapsed().as_millis();
453 if time_now - last_update_time > 1000 {
454 log_info(i, self);
455 last_update_time = time_now;
456 }
457 }
458 while self.pool.queued_count() > 2000
461 || self
462 .mem_limit
463 .map(|limit| self.memory_used() > limit)
464 .unwrap_or(false)
465 {
466 thread::yield_now();
467 }
468 let r: Result<Option<CacheObject>, GitError> =
469 Pack::decode_pack_object(&mut reader, &mut offset);
470 match r {
471 Ok(Some(mut obj)) => {
472 obj.set_mem_recorder(self.cache_objs_mem.clone());
473 obj.record_mem_size();
474
475 let params = Arc::new(SharedParams {
477 pool: self.pool.clone(),
478 waitlist: self.waitlist.clone(),
479 caches: self.caches.clone(),
480 cache_objs_mem_size: self.cache_objs_mem.clone(),
481 callback: callback.clone(),
482 });
483
484 let caches = caches.clone();
485 let waitlist = self.waitlist.clone();
486 let kind = get_hash_kind();
487 self.pool.execute(move || {
488 set_hash_kind(kind);
489 match obj.info {
490 CacheObjectInfo::BaseObject(_, _) => {
491 Self::cache_obj_and_process_waitlist(params, obj);
492 }
493 CacheObjectInfo::OffsetDelta(base_offset, _)
494 | CacheObjectInfo::OffsetZstdelta(base_offset, _) => {
495 if let Some(base_obj) = caches.get_by_offset(base_offset) {
496 Self::process_delta(params, obj, base_obj);
497 } else {
498 waitlist.insert_offset(base_offset, obj);
501 if let Some(base_obj) = caches.get_by_offset(base_offset) {
503 Self::process_waitlist(params, base_obj);
504 }
505 }
506 }
507 CacheObjectInfo::HashDelta(base_ref, _) => {
508 if let Some(base_obj) = caches.get_by_hash(base_ref) {
509 Self::process_delta(params, obj, base_obj);
510 } else {
511 waitlist.insert_ref(base_ref, obj);
512 if let Some(base_obj) = caches.get_by_hash(base_ref) {
513 Self::process_waitlist(params, base_obj);
514 }
515 }
516 }
517 }
518 });
519 }
520 Ok(None) => {}
521 Err(e) => {
522 return Err(e);
523 }
524 }
525 i += 1;
526 }
527 log_info(i, self);
528 let render_hash = reader.final_hash();
529 self.signature = ObjectHash::from_stream(&mut reader).unwrap();
530
531 if render_hash != self.signature {
532 return Err(GitError::InvalidPackFile(format!(
533 "The pack file hash {} does not match the trailer hash {}",
534 render_hash, self.signature
535 )));
536 }
537
538 let end = utils::is_eof(&mut reader);
539 if !end {
540 return Err(GitError::InvalidPackFile(
541 "The pack file is not at the end".to_string(),
542 ));
543 }
544
545 self.pool.join(); if let Some(pack_callback) = pack_id_callback {
549 pack_callback(self.signature);
550 }
551 assert_eq!(self.waitlist.map_offset.len(), 0);
554 assert_eq!(self.waitlist.map_ref.len(), 0);
555 assert!(self.number >= caches.total_inserted());
557 tracing::info!(
558 "The pack file has been decoded successfully, takes: [ {:?} ]",
559 time.elapsed()
560 );
561 self.caches.clear(); assert_eq!(self.cache_objs_mem_used(), 0); Ok(())
570 }
571
572 pub fn decode_async(
575 mut self,
576 mut pack: impl BufRead + Send + 'static,
577 sender: UnboundedSender<Entry>,
578 ) -> JoinHandle<Pack> {
579 let kind = get_hash_kind();
580 thread::spawn(move || {
581 set_hash_kind(kind);
582 self.decode(
583 &mut pack,
584 move |entry| {
585 if let Err(e) = sender.send(entry.inner) {
586 eprintln!("Channel full, failed to send entry: {e:?}");
587 }
588 },
589 None::<fn(ObjectHash)>,
590 )
591 .unwrap();
592 self
593 })
594 }
595
596 pub async fn decode_stream(
598 mut self,
599 mut stream: impl Stream<Item = Result<Bytes, Error>> + Unpin + Send + 'static,
600 sender: UnboundedSender<MetaAttached<Entry, EntryMeta>>,
601 pack_hash_send: Option<UnboundedSender<ObjectHash>>,
602 ) -> Self {
603 let kind = get_hash_kind();
604 let (tx, rx) = std::sync::mpsc::channel();
605 let mut reader = StreamBufReader::new(rx);
606 tokio::spawn(async move {
607 while let Some(chunk) = stream.next().await {
608 let data = chunk.unwrap().to_vec();
609 if let Err(e) = tx.send(data) {
610 eprintln!("Sending Error: {e:?}");
611 break;
612 }
613 }
614 });
615 tokio::task::spawn_blocking(move || {
618 set_hash_kind(kind);
619 self.decode(
620 &mut reader,
621 move |entry: MetaAttached<Entry, EntryMeta>| {
622 if let Err(e) = sender.send(entry) {
624 eprintln!("unbound channel Sending Error: {e:?}");
625 }
626 },
627 Some(move |pack_id: ObjectHash| {
628 if let Some(pack_id_send) = pack_hash_send
629 && let Err(e) = pack_id_send.send(pack_id)
630 {
631 eprintln!("unbound channel Sending Error: {e:?}");
632 }
633 }),
634 )
635 .unwrap();
636 self
637 })
638 .await
639 .unwrap()
640 }
641
642 fn memory_used(&self) -> usize {
644 self.cache_objs_mem_used() + self.caches.memory_used_index()
645 }
646
647 fn cache_objs_mem_used(&self) -> usize {
649 self.cache_objs_mem.load(Ordering::Acquire)
650 }
651
652 fn process_delta(
655 shared_params: Arc<SharedParams>,
656 delta_obj: CacheObject,
657 base_obj: Arc<CacheObject>,
658 ) {
659 shared_params.pool.clone().execute(move || {
660 let mut new_obj = match delta_obj.info {
661 CacheObjectInfo::OffsetDelta(_, _) | CacheObjectInfo::HashDelta(_, _) => {
662 Pack::rebuild_delta(delta_obj, base_obj)
663 }
664 CacheObjectInfo::OffsetZstdelta(_, _) => {
665 Pack::rebuild_zstdelta(delta_obj, base_obj)
666 }
667 _ => unreachable!(),
668 };
669
670 new_obj.set_mem_recorder(shared_params.cache_objs_mem_size.clone());
671 new_obj.record_mem_size();
672 Self::cache_obj_and_process_waitlist(shared_params, new_obj); });
674 }
675
676 fn cache_obj_and_process_waitlist(shared_params: Arc<SharedParams>, new_obj: CacheObject) {
678 (shared_params.callback)(new_obj.to_entry_metadata());
679 let new_obj = shared_params.caches.insert(
680 new_obj.offset,
681 new_obj.base_object_hash().unwrap(),
682 new_obj,
683 );
684 Self::process_waitlist(shared_params, new_obj);
685 }
686
687 fn process_waitlist(shared_params: Arc<SharedParams>, base_obj: Arc<CacheObject>) {
688 let wait_objs = shared_params
689 .waitlist
690 .take(base_obj.offset, base_obj.base_object_hash().unwrap());
691 for obj in wait_objs {
692 Self::process_delta(shared_params.clone(), obj, base_obj.clone());
694 }
695 }
696
697 pub fn rebuild_delta(delta_obj: CacheObject, base_obj: Arc<CacheObject>) -> CacheObject {
700 const COPY_INSTRUCTION_FLAG: u8 = 1 << 7;
701 const COPY_OFFSET_BYTES: u8 = 4;
702 const COPY_SIZE_BYTES: u8 = 3;
703 const COPY_ZERO_SIZE: usize = 0x10000;
704
705 let mut stream = Cursor::new(&delta_obj.data_decompressed);
706
707 let (base_size, result_size) = utils::read_delta_object_size(&mut stream).unwrap();
710
711 let base_info = &base_obj.data_decompressed;
713 assert_eq!(base_info.len(), base_size, "Base object size mismatch");
714
715 let mut result = Vec::with_capacity(result_size);
716
717 loop {
718 let instruction = match utils::read_bytes(&mut stream) {
720 Ok([instruction]) => instruction,
721 Err(err) if err.kind() == ErrorKind::UnexpectedEof => break,
722 Err(err) => {
723 panic!(
724 "{}",
725 GitError::DeltaObjectError(format!("Wrong instruction in delta :{err}"))
726 );
727 }
728 };
729
730 if instruction & COPY_INSTRUCTION_FLAG == 0 {
731 if instruction == 0 {
733 panic!(
735 "{}",
736 GitError::DeltaObjectError(String::from("Invalid data instruction"))
737 );
738 }
739
740 let mut data = vec![0; instruction as usize];
742 stream.read_exact(&mut data).unwrap();
743 result.extend_from_slice(&data);
744 } else {
745 let mut nonzero_bytes = instruction;
750 let offset =
751 utils::read_partial_int(&mut stream, COPY_OFFSET_BYTES, &mut nonzero_bytes)
752 .unwrap();
753 let mut size =
754 utils::read_partial_int(&mut stream, COPY_SIZE_BYTES, &mut nonzero_bytes)
755 .unwrap();
756 if size == 0 {
757 size = COPY_ZERO_SIZE;
759 }
760 let base_data = base_info.get(offset..(offset + size)).ok_or_else(|| {
762 GitError::DeltaObjectError("Invalid copy instruction".to_string())
763 });
764
765 match base_data {
766 Ok(data) => result.extend_from_slice(data),
767 Err(e) => panic!("{}", e),
768 }
769 }
770 }
771 assert_eq!(result_size, result.len(), "Result size mismatch");
772
773 let hash = utils::calculate_object_hash(base_obj.object_type(), &result);
774 CacheObject {
776 info: CacheObjectInfo::BaseObject(base_obj.object_type(), hash),
777 offset: delta_obj.offset,
778 crc32: delta_obj.crc32,
779 data_decompressed: result,
780 mem_recorder: None,
781 is_delta_in_pack: delta_obj.is_delta_in_pack,
782 } }
785 pub fn rebuild_zstdelta(delta_obj: CacheObject, base_obj: Arc<CacheObject>) -> CacheObject {
786 let result = zstdelta::apply(&base_obj.data_decompressed, &delta_obj.data_decompressed)
787 .expect("Failed to apply zstdelta");
788 let hash = utils::calculate_object_hash(base_obj.object_type(), &result);
789 CacheObject {
790 info: CacheObjectInfo::BaseObject(base_obj.object_type(), hash),
791 offset: delta_obj.offset,
792 crc32: delta_obj.crc32,
793 data_decompressed: result,
794 mem_recorder: None,
795 is_delta_in_pack: delta_obj.is_delta_in_pack,
796 } }
799}
800
801#[cfg(test)]
802mod tests {
803 use std::{
804 env, fs,
805 io::{BufReader, Cursor, prelude::*},
806 path::PathBuf,
807 sync::{
808 Arc,
809 atomic::{AtomicUsize, Ordering},
810 },
811 };
812
813 use flate2::{Compression, write::ZlibEncoder};
814 use futures_util::TryStreamExt;
815 use tokio_util::io::ReaderStream;
816
817 use crate::{
818 hash::{HashKind, ObjectHash, set_hash_kind_for_test},
819 internal::pack::{Pack, tests::init_logger},
820 };
821
822 #[tokio::test]
823 async fn test_pack_check_header() {
824 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
825 source.push("tests/data/packs/medium-sha1.pack");
826
827 let f = fs::File::open(source).unwrap();
828 let mut buf_reader = BufReader::new(f);
829 let (object_num, _) = Pack::check_header(&mut buf_reader).unwrap();
830
831 assert_eq!(object_num, 35031);
832 }
833
834 #[test]
835 fn test_decompress_data() {
836 let data = b"Hello, world!"; let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
838 encoder.write_all(data).unwrap();
839 let compressed_data = encoder.finish().unwrap();
840 let compressed_size = compressed_data.len();
841
842 let mut cursor: Cursor<Vec<u8>> = Cursor::new(compressed_data);
844 let expected_size = data.len();
845
846 let result = Pack::decompress_data(&mut cursor, expected_size);
848 match result {
849 Ok((decompressed_data, bytes_read)) => {
850 assert_eq!(bytes_read, compressed_size);
851 assert_eq!(decompressed_data, data);
852 }
853 Err(e) => panic!("Decompression failed: {e:?}"),
854 }
855 }
856
857 #[test]
858 #[cfg(target_pointer_width = "32")]
859 fn test_pack_new_mem_limit_no_overflow_32bit() {
860 let mem_limit = 1_200_000_000usize;
864 let tmp = PathBuf::from("/tmp/.cache_temp");
865 let result = std::panic::catch_unwind(|| {
866 let _p = Pack::new(Some(1), Some(mem_limit), Some(tmp), true);
867 });
868 assert!(result.is_ok(), "Pack::new should not panic on 32-bit");
869 }
870
871 fn run_decode_no_delta(rel_path: &str, kind: HashKind) {
873 let _guard = set_hash_kind_for_test(kind);
874 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
875 source.push(rel_path);
876
877 let tmp = PathBuf::from("/tmp/.cache_temp");
878
879 let f = fs::File::open(source).unwrap();
880 let mut buffered = BufReader::new(f);
881 let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
882 p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
883 .unwrap();
884 }
885 #[test]
886 fn test_pack_decode_without_delta() {
887 run_decode_no_delta("tests/data/packs/small-sha1.pack", HashKind::Sha1);
888 run_decode_no_delta("tests/data/packs/small-sha256.pack", HashKind::Sha256);
889 }
890
891 fn run_decode_with_ref_delta(rel_path: &str, kind: HashKind) {
893 let _guard = set_hash_kind_for_test(kind);
894 init_logger();
895
896 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
897 source.push(rel_path);
898
899 let tmp = PathBuf::from("/tmp/.cache_temp");
900
901 let f = fs::File::open(source).unwrap();
902 let mut buffered = BufReader::new(f);
903 let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
904 p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
905 .unwrap();
906 }
907 #[test]
908 fn test_pack_decode_with_ref_delta() {
909 run_decode_with_ref_delta("tests/data/packs/ref-delta-sha1.pack", HashKind::Sha1);
910 run_decode_with_ref_delta("tests/data/packs/ref-delta-sha256.pack", HashKind::Sha256);
911 }
912
913 fn run_decode_no_mem_limit(rel_path: &str, kind: HashKind) {
915 let _guard = set_hash_kind_for_test(kind);
916 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
917 source.push(rel_path);
918
919 let tmp = PathBuf::from("/tmp/.cache_temp");
920
921 let f = fs::File::open(source).unwrap();
922 let mut buffered = BufReader::new(f);
923 let mut p = Pack::new(None, None, Some(tmp), true);
924 p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
925 .unwrap();
926 }
927 #[test]
928 fn test_pack_decode_no_mem_limit() {
929 run_decode_no_mem_limit("tests/data/packs/small-sha1.pack", HashKind::Sha1);
930 run_decode_no_mem_limit("tests/data/packs/small-sha256.pack", HashKind::Sha256);
931 }
932
933 async fn run_decode_large_with_delta(rel_path: &str, kind: HashKind) {
935 let _guard = set_hash_kind_for_test(kind);
936 init_logger();
937 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
938 source.push(rel_path);
939
940 let tmp = PathBuf::from("/tmp/.cache_temp");
941
942 let f = fs::File::open(source).unwrap();
943 let mut buffered = BufReader::new(f);
944 let mut p = Pack::new(
945 Some(4),
946 Some(1024 * 1024 * 100), Some(tmp.clone()),
948 true,
949 );
950 let rt = p.decode(
951 &mut buffered,
952 |_obj| {
953 },
955 None::<fn(ObjectHash)>,
956 );
957 if let Err(e) = rt {
958 fs::remove_dir_all(tmp).unwrap();
959 panic!("Error: {e:?}");
960 }
961 }
962 #[tokio::test]
963 async fn test_pack_decode_with_large_file_with_delta_without_ref() {
964 run_decode_large_with_delta("tests/data/packs/medium-sha1.pack", HashKind::Sha1).await;
965 run_decode_large_with_delta("tests/data/packs/medium-sha256.pack", HashKind::Sha256).await;
966 } async fn run_decode_large_stream(rel_path: &str, kind: HashKind) {
970 let _guard = set_hash_kind_for_test(kind);
971 init_logger();
972 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
973 source.push(rel_path);
974
975 let tmp = PathBuf::from("/tmp/.cache_temp");
976 let f = tokio::fs::File::open(source).await.unwrap();
977 let stream = ReaderStream::new(f).map_err(axum::Error::new);
978 let p = Pack::new(Some(4), Some(1024 * 1024 * 100), Some(tmp.clone()), true);
979
980 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
981 let handle = tokio::spawn(async move { p.decode_stream(stream, tx, None).await });
982 let count = Arc::new(AtomicUsize::new(0));
983 let count_c = count.clone();
984 let consume = tokio::spawn(async move {
986 let mut cnt = 0;
987 while let Some(_entry) = rx.recv().await {
988 cnt += 1;
989 }
990 tracing::info!("Received: {}", cnt);
991 count_c.store(cnt, Ordering::Release);
992 });
993 let p = handle.await.unwrap();
994 consume.await.unwrap();
995 assert_eq!(count.load(Ordering::Acquire), p.number);
996 assert_eq!(p.number, 35031);
997 }
998 #[tokio::test]
999 async fn test_decode_large_file_stream() {
1000 run_decode_large_stream("tests/data/packs/medium-sha1.pack", HashKind::Sha1).await;
1001 run_decode_large_stream("tests/data/packs/medium-sha256.pack", HashKind::Sha256).await;
1002 }
1003
1004 async fn run_decode_large_file_async(rel_path: &str, kind: HashKind) {
1006 let _guard = set_hash_kind_for_test(kind);
1007 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1008 source.push(rel_path);
1009
1010 let tmp = PathBuf::from("/tmp/.cache_temp");
1011 let f = fs::File::open(source).unwrap();
1012 let buffered = BufReader::new(f);
1013 let p = Pack::new(Some(4), Some(1024 * 1024 * 100), Some(tmp.clone()), true);
1014
1015 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1016 let handle = p.decode_async(buffered, tx); let mut cnt = 0;
1018 while let Some(_entry) = rx.recv().await {
1019 cnt += 1; }
1021 let p = handle.join().unwrap();
1022 assert_eq!(cnt, p.number);
1023 }
1024 #[tokio::test]
1025 async fn test_decode_large_file_async() {
1026 run_decode_large_file_async("tests/data/packs/medium-sha1.pack", HashKind::Sha1).await;
1027 run_decode_large_file_async("tests/data/packs/medium-sha256.pack", HashKind::Sha256).await;
1028 }
1029
1030 fn run_decode_with_delta_no_ref(rel_path: &str, kind: HashKind) {
1032 let _guard = set_hash_kind_for_test(kind);
1033 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1034 source.push(rel_path);
1035
1036 let tmp = PathBuf::from("/tmp/.cache_temp");
1037
1038 let f = fs::File::open(source).unwrap();
1039 let mut buffered = BufReader::new(f);
1040 let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
1041 p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
1042 .unwrap();
1043 }
1044 #[test]
1045 fn test_pack_decode_with_delta_without_ref() {
1046 run_decode_with_delta_no_ref("tests/data/packs/medium-sha1.pack", HashKind::Sha1);
1047 run_decode_with_delta_no_ref("tests/data/packs/medium-sha256.pack", HashKind::Sha256);
1048 }
1049
1050 #[test] fn test_pack_decode_multi_task_with_large_file_with_delta_without_ref() {
1052 let rt = tokio::runtime::Builder::new_current_thread()
1053 .enable_all()
1054 .build()
1055 .unwrap();
1056 rt.block_on(async move {
1057 for (kind, path) in [
1059 (HashKind::Sha1, "tests/data/packs/medium-sha1.pack"),
1060 (HashKind::Sha256, "tests/data/packs/medium-sha256.pack"),
1061 ] {
1062 let f1 = run_decode_large_with_delta(path, kind);
1063 let f2 = run_decode_large_with_delta(path, kind);
1064 let _ = futures::future::join(f1, f2).await;
1065 }
1066 });
1067 }
1068}