1use std::io::{self, BufRead, Cursor, ErrorKind, Read};
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::thread::{self, JoinHandle};
6use std::time::Instant;
7
8use axum::Error;
9use bytes::Bytes;
10use flate2::bufread::ZlibDecoder;
11use futures_util::{Stream, StreamExt};
12use threadpool::ThreadPool;
13use tokio::sync::mpsc::UnboundedSender;
14use uuid::Uuid;
15
16use crate::errors::GitError;
17use crate::hash::{ObjectHash, get_hash_kind, set_hash_kind};
18
19use crate::internal::metadata::{EntryMeta, MetaAttached};
20use crate::zstdelta;
21
22use crate::internal::object::types::ObjectType;
23
24use super::cache_object::CacheObjectInfo;
25use crate::internal::pack::cache::_Cache;
26use crate::internal::pack::cache::Caches;
27use crate::internal::pack::cache_object::{CacheObject, MemSizeRecorder};
28use crate::internal::pack::channel_reader::StreamBufReader;
29use crate::internal::pack::entry::Entry;
30use crate::internal::pack::waitlist::Waitlist;
31use crate::internal::pack::wrapper::Wrapper;
32use crate::internal::pack::{DEFAULT_TMP_DIR, Pack, utils};
33use crate::utils::CountingReader;
34
35struct SharedParams {
37 pub pool: Arc<ThreadPool>,
38 pub waitlist: Arc<Waitlist>,
39 pub caches: Arc<Caches>,
40 pub cache_objs_mem_size: Arc<AtomicUsize>,
41 pub callback: Arc<dyn Fn(MetaAttached<Entry, EntryMeta>) + Sync + Send>,
42}
43
44impl Drop for Pack {
45 fn drop(&mut self) {
46 if self.clean_tmp {
47 self.caches.remove_tmp_dir();
48 }
49 }
50}
51
52impl Pack {
53 pub fn new(
63 thread_num: Option<usize>,
64 mem_limit: Option<usize>,
65 temp_path: Option<PathBuf>,
66 clean_tmp: bool,
67 ) -> Self {
68 let mut temp_path = temp_path.unwrap_or(PathBuf::from(DEFAULT_TMP_DIR));
69 loop {
71 let sub_dir = Uuid::new_v4().to_string()[..8].to_string();
72 temp_path.push(sub_dir);
73 if !temp_path.exists() {
74 break;
75 }
76 temp_path.pop();
77 }
78 let thread_num = thread_num.unwrap_or_else(num_cpus::get);
79 let cache_mem_size = mem_limit.map(|mem_limit| mem_limit * 4 / 5);
80 Pack {
81 number: 0,
82 signature: ObjectHash::default(),
83 objects: Vec::new(),
84 pool: Arc::new(ThreadPool::new(thread_num)),
85 waitlist: Arc::new(Waitlist::new()),
86 caches: Arc::new(Caches::new(cache_mem_size, temp_path, thread_num)),
87 mem_limit,
88 cache_objs_mem: Arc::new(AtomicUsize::default()),
89 clean_tmp,
90 }
91 }
92
93 pub fn check_header(pack: &mut impl BufRead) -> Result<(u32, Vec<u8>), GitError> {
117 let mut header_data = Vec::new();
119
120 let mut magic = [0; 4];
122 let result = pack.read_exact(&mut magic);
124 match result {
125 Ok(_) => {
126 header_data.extend_from_slice(&magic);
128
129 if magic != *b"PACK" {
131 return Err(GitError::InvalidPackHeader(format!(
133 "{},{},{},{}",
134 magic[0], magic[1], magic[2], magic[3]
135 )));
136 }
137 }
138 Err(e) => {
139 return Err(GitError::InvalidPackFile(format!(
141 "Error reading magic identifier: {e}"
142 )));
143 }
144 }
145
146 let mut version_bytes = [0; 4];
148 let result = pack.read_exact(&mut version_bytes); match result {
150 Ok(_) => {
151 header_data.extend_from_slice(&version_bytes);
153
154 let version = u32::from_be_bytes(version_bytes);
156 if version != 2 {
157 return Err(GitError::InvalidPackFile(format!(
159 "Version Number is {version}, not 2"
160 )));
161 }
162 }
163 Err(e) => {
164 return Err(GitError::InvalidPackFile(format!(
166 "Error reading version number: {e}"
167 )));
168 }
169 }
170
171 let mut object_num_bytes = [0; 4];
173 let result = pack.read_exact(&mut object_num_bytes);
175 match result {
176 Ok(_) => {
177 header_data.extend_from_slice(&object_num_bytes);
179 let object_num = u32::from_be_bytes(object_num_bytes);
181 Ok((object_num, header_data))
183 }
184 Err(e) => {
185 Err(GitError::InvalidPackFile(format!(
187 "Error reading object number: {e}"
188 )))
189 }
190 }
191 }
192
193 pub fn decompress_data(
205 pack: &mut (impl BufRead + Send),
206 expected_size: usize,
207 ) -> Result<(Vec<u8>, usize), GitError> {
208 let mut buf = Vec::with_capacity(expected_size);
210
211 let mut counting_reader = CountingReader::new(pack);
212 let mut deflate = ZlibDecoder::new(&mut counting_reader);
215 match deflate.read_to_end(&mut buf) {
217 Ok(_) => {
218 if buf.len() != expected_size {
220 Err(GitError::InvalidPackFile(format!(
221 "The object size {} does not match the expected size {}",
222 buf.len(),
223 expected_size
224 )))
225 } else {
226 let actual_input_bytes = counting_reader.bytes_read as usize;
228 Ok((buf, actual_input_bytes))
229 }
230 }
231 Err(e) => {
232 Err(GitError::InvalidPackFile(format!(
234 "Decompression error: {e}"
235 )))
236 }
237 }
238 }
239
240 pub fn decode_pack_object(
252 pack: &mut (impl BufRead + Send),
253 offset: &mut usize,
254 ) -> Result<CacheObject, GitError> {
255 let init_offset = *offset;
256
257 let (type_bits, size) = match utils::read_type_and_varint_size(pack, offset) {
259 Ok(result) => result,
260 Err(e) => {
261 return Err(GitError::InvalidPackFile(format!("Read error: {e}")));
264 }
265 };
266
267 let t = ObjectType::from_u8(type_bits)?;
269
270 match t {
271 ObjectType::Commit | ObjectType::Tree | ObjectType::Blob | ObjectType::Tag => {
272 let (data, raw_size) = Pack::decompress_data(pack, size)?;
273 *offset += raw_size;
274 Ok(CacheObject::new_for_undeltified(t, data, init_offset))
275 }
276 ObjectType::OffsetDelta | ObjectType::OffsetZstdelta => {
277 let (delta_offset, bytes) = utils::read_offset_encoding(pack).unwrap();
278 *offset += bytes;
279
280 let (data, raw_size) = Pack::decompress_data(pack, size)?;
281 *offset += raw_size;
282
283 let base_offset = init_offset
285 .checked_sub(delta_offset as usize)
286 .ok_or_else(|| {
287 GitError::InvalidObjectInfo("Invalid OffsetDelta offset".to_string())
288 })
289 .unwrap();
290
291 let mut reader = Cursor::new(&data);
292 let (_, final_size) = utils::read_delta_object_size(&mut reader)?;
293
294 let obj_info = match t {
295 ObjectType::OffsetDelta => {
296 CacheObjectInfo::OffsetDelta(base_offset, final_size)
297 }
298 ObjectType::OffsetZstdelta => {
299 CacheObjectInfo::OffsetZstdelta(base_offset, final_size)
300 }
301 _ => unreachable!(),
302 };
303 Ok(CacheObject {
304 info: obj_info,
305 offset: init_offset,
306 data_decompressed: data,
307 mem_recorder: None,
308 is_delta_in_pack: true,
309 })
310 }
311 ObjectType::HashDelta => {
312 let ref_sha = ObjectHash::from_stream(pack).unwrap();
314 *offset += get_hash_kind().size();
316
317 let (data, raw_size) = Pack::decompress_data(pack, size)?;
318 *offset += raw_size;
319
320 let mut reader = Cursor::new(&data);
321 let (_, final_size) = utils::read_delta_object_size(&mut reader)?;
322
323 Ok(CacheObject {
324 info: CacheObjectInfo::HashDelta(ref_sha, final_size),
325 offset: init_offset,
326 data_decompressed: data,
327 mem_recorder: None,
328 is_delta_in_pack: true,
329 })
330 }
331 }
332 }
333
334 pub fn decode<F, C>(
341 &mut self,
342 pack: &mut (impl BufRead + Send),
343 callback: F,
344 pack_id_callback: Option<C>,
345 ) -> Result<(), GitError>
346 where
347 F: Fn(MetaAttached<Entry, EntryMeta>) + Sync + Send + 'static,
348 C: FnOnce(ObjectHash) + Send + 'static,
349 {
350 let time = Instant::now();
351 let mut last_update_time = time.elapsed().as_millis();
352 let log_info = |_i: usize, pack: &Pack| {
353 tracing::info!(
354 "time {:.2} s \t decode: {:?} \t dec-num: {} \t cah-num: {} \t Objs: {} MB \t CacheUsed: {} MB",
355 time.elapsed().as_millis() as f64 / 1000.0,
356 _i,
357 pack.pool.queued_count(),
358 pack.caches.queued_tasks(),
359 pack.cache_objs_mem_used() / 1024 / 1024,
360 pack.caches.memory_used() / 1024 / 1024
361 );
362 };
363 let callback = Arc::new(callback);
364
365 let caches = self.caches.clone();
366 let mut reader = Wrapper::new(io::BufReader::new(pack));
367
368 let result = Pack::check_header(&mut reader);
369 match result {
370 Ok((object_num, _)) => {
371 self.number = object_num as usize;
372 }
373 Err(e) => {
374 return Err(e);
375 }
376 }
377 tracing::info!("The pack file has {} objects", self.number);
378 let mut offset: usize = 12;
379 let mut i = 0;
380 while i < self.number {
381 if i % 1000 == 0 {
383 let time_now = time.elapsed().as_millis();
384 if time_now - last_update_time > 1000 {
385 log_info(i, self);
386 last_update_time = time_now;
387 }
388 }
389 while self.pool.queued_count() > 2000
392 || self
393 .mem_limit
394 .map(|limit| self.memory_used() > limit)
395 .unwrap_or(false)
396 {
397 thread::yield_now();
398 }
399 let r: Result<CacheObject, GitError> =
400 Pack::decode_pack_object(&mut reader, &mut offset);
401 match r {
402 Ok(mut obj) => {
403 obj.set_mem_recorder(self.cache_objs_mem.clone());
404 obj.record_mem_size();
405
406 let params = Arc::new(SharedParams {
408 pool: self.pool.clone(),
409 waitlist: self.waitlist.clone(),
410 caches: self.caches.clone(),
411 cache_objs_mem_size: self.cache_objs_mem.clone(),
412 callback: callback.clone(),
413 });
414
415 let caches = caches.clone();
416 let waitlist = self.waitlist.clone();
417 let kind = get_hash_kind();
418 self.pool.execute(move || {
419 set_hash_kind(kind);
420 match obj.info {
421 CacheObjectInfo::BaseObject(_, _) => {
422 Self::cache_obj_and_process_waitlist(params, obj);
423 }
424 CacheObjectInfo::OffsetDelta(base_offset, _)
425 | CacheObjectInfo::OffsetZstdelta(base_offset, _) => {
426 if let Some(base_obj) = caches.get_by_offset(base_offset) {
427 Self::process_delta(params, obj, base_obj);
428 } else {
429 waitlist.insert_offset(base_offset, obj);
432 if let Some(base_obj) = caches.get_by_offset(base_offset) {
434 Self::process_waitlist(params, base_obj);
435 }
436 }
437 }
438 CacheObjectInfo::HashDelta(base_ref, _) => {
439 if let Some(base_obj) = caches.get_by_hash(base_ref) {
440 Self::process_delta(params, obj, base_obj);
441 } else {
442 waitlist.insert_ref(base_ref, obj);
443 if let Some(base_obj) = caches.get_by_hash(base_ref) {
444 Self::process_waitlist(params, base_obj);
445 }
446 }
447 }
448 }
449 });
450 }
451 Err(e) => {
452 return Err(e);
453 }
454 }
455 i += 1;
456 }
457 log_info(i, self);
458 let render_hash = reader.final_hash();
459 self.signature = ObjectHash::from_stream(&mut reader).unwrap();
460
461 if render_hash != self.signature {
462 return Err(GitError::InvalidPackFile(format!(
463 "The pack file hash {} does not match the trailer hash {}",
464 render_hash, self.signature
465 )));
466 }
467
468 let end = utils::is_eof(&mut reader);
469 if !end {
470 return Err(GitError::InvalidPackFile(
471 "The pack file is not at the end".to_string(),
472 ));
473 }
474
475 self.pool.join(); if let Some(pack_callback) = pack_id_callback {
479 pack_callback(self.signature);
480 }
481 assert_eq!(self.waitlist.map_offset.len(), 0);
484 assert_eq!(self.waitlist.map_ref.len(), 0);
485 assert_eq!(self.number, caches.total_inserted());
486 tracing::info!(
487 "The pack file has been decoded successfully, takes: [ {:?} ]",
488 time.elapsed()
489 );
490 self.caches.clear(); assert_eq!(self.cache_objs_mem_used(), 0); Ok(())
499 }
500
501 pub fn decode_async(
504 mut self,
505 mut pack: impl BufRead + Send + 'static,
506 sender: UnboundedSender<Entry>,
507 ) -> JoinHandle<Pack> {
508 let kind = get_hash_kind();
509 thread::spawn(move || {
510 set_hash_kind(kind);
511 self.decode(
512 &mut pack,
513 move |entry| {
514 if let Err(e) = sender.send(entry.inner) {
515 eprintln!("Channel full, failed to send entry: {e:?}");
516 }
517 },
518 None::<fn(ObjectHash)>,
519 )
520 .unwrap();
521 self
522 })
523 }
524
525 pub async fn decode_stream(
527 mut self,
528 mut stream: impl Stream<Item = Result<Bytes, Error>> + Unpin + Send + 'static,
529 sender: UnboundedSender<MetaAttached<Entry, EntryMeta>>,
530 pack_hash_send: Option<UnboundedSender<ObjectHash>>,
531 ) -> Self {
532 let kind = get_hash_kind();
533 let (tx, rx) = std::sync::mpsc::channel();
534 let mut reader = StreamBufReader::new(rx);
535 tokio::spawn(async move {
536 while let Some(chunk) = stream.next().await {
537 let data = chunk.unwrap().to_vec();
538 if let Err(e) = tx.send(data) {
539 eprintln!("Sending Error: {e:?}");
540 break;
541 }
542 }
543 });
544 tokio::task::spawn_blocking(move || {
547 set_hash_kind(kind);
548 self.decode(
549 &mut reader,
550 move |entry: MetaAttached<Entry, EntryMeta>| {
551 if let Err(e) = sender.send(entry) {
553 eprintln!("unbound channel Sending Error: {e:?}");
554 }
555 },
556 Some(move |pack_id: ObjectHash| {
557 if let Some(pack_id_send) = pack_hash_send
558 && let Err(e) = pack_id_send.send(pack_id)
559 {
560 eprintln!("unbound channel Sending Error: {e:?}");
561 }
562 }),
563 )
564 .unwrap();
565 self
566 })
567 .await
568 .unwrap()
569 }
570
571 fn memory_used(&self) -> usize {
573 self.cache_objs_mem_used() + self.caches.memory_used_index()
574 }
575
576 fn cache_objs_mem_used(&self) -> usize {
578 self.cache_objs_mem.load(Ordering::Acquire)
579 }
580
581 fn process_delta(
584 shared_params: Arc<SharedParams>,
585 delta_obj: CacheObject,
586 base_obj: Arc<CacheObject>,
587 ) {
588 shared_params.pool.clone().execute(move || {
589 let mut new_obj = match delta_obj.info {
590 CacheObjectInfo::OffsetDelta(_, _) | CacheObjectInfo::HashDelta(_, _) => {
591 Pack::rebuild_delta(delta_obj, base_obj)
592 }
593 CacheObjectInfo::OffsetZstdelta(_, _) => {
594 Pack::rebuild_zstdelta(delta_obj, base_obj)
595 }
596 _ => unreachable!(),
597 };
598
599 new_obj.set_mem_recorder(shared_params.cache_objs_mem_size.clone());
600 new_obj.record_mem_size();
601 Self::cache_obj_and_process_waitlist(shared_params, new_obj); });
603 }
604
605 fn cache_obj_and_process_waitlist(shared_params: Arc<SharedParams>, new_obj: CacheObject) {
607 (shared_params.callback)(new_obj.to_entry_metadata());
608 let new_obj = shared_params.caches.insert(
609 new_obj.offset,
610 new_obj.base_object_hash().unwrap(),
611 new_obj,
612 );
613 Self::process_waitlist(shared_params, new_obj);
614 }
615
616 fn process_waitlist(shared_params: Arc<SharedParams>, base_obj: Arc<CacheObject>) {
617 let wait_objs = shared_params
618 .waitlist
619 .take(base_obj.offset, base_obj.base_object_hash().unwrap());
620 for obj in wait_objs {
621 Self::process_delta(shared_params.clone(), obj, base_obj.clone());
623 }
624 }
625
626 pub fn rebuild_delta(delta_obj: CacheObject, base_obj: Arc<CacheObject>) -> CacheObject {
629 const COPY_INSTRUCTION_FLAG: u8 = 1 << 7;
630 const COPY_OFFSET_BYTES: u8 = 4;
631 const COPY_SIZE_BYTES: u8 = 3;
632 const COPY_ZERO_SIZE: usize = 0x10000;
633
634 let mut stream = Cursor::new(&delta_obj.data_decompressed);
635
636 let (base_size, result_size) = utils::read_delta_object_size(&mut stream).unwrap();
639
640 let base_info = &base_obj.data_decompressed;
642 assert_eq!(base_info.len(), base_size, "Base object size mismatch");
643
644 let mut result = Vec::with_capacity(result_size);
645
646 loop {
647 let instruction = match utils::read_bytes(&mut stream) {
649 Ok([instruction]) => instruction,
650 Err(err) if err.kind() == ErrorKind::UnexpectedEof => break,
651 Err(err) => {
652 panic!(
653 "{}",
654 GitError::DeltaObjectError(format!("Wrong instruction in delta :{err}"))
655 );
656 }
657 };
658
659 if instruction & COPY_INSTRUCTION_FLAG == 0 {
660 if instruction == 0 {
662 panic!(
664 "{}",
665 GitError::DeltaObjectError(String::from("Invalid data instruction"))
666 );
667 }
668
669 let mut data = vec![0; instruction as usize];
671 stream.read_exact(&mut data).unwrap();
672 result.extend_from_slice(&data);
673 } else {
674 let mut nonzero_bytes = instruction;
679 let offset =
680 utils::read_partial_int(&mut stream, COPY_OFFSET_BYTES, &mut nonzero_bytes)
681 .unwrap();
682 let mut size =
683 utils::read_partial_int(&mut stream, COPY_SIZE_BYTES, &mut nonzero_bytes)
684 .unwrap();
685 if size == 0 {
686 size = COPY_ZERO_SIZE;
688 }
689 let base_data = base_info.get(offset..(offset + size)).ok_or_else(|| {
691 GitError::DeltaObjectError("Invalid copy instruction".to_string())
692 });
693
694 match base_data {
695 Ok(data) => result.extend_from_slice(data),
696 Err(e) => panic!("{}", e),
697 }
698 }
699 }
700 assert_eq!(result_size, result.len(), "Result size mismatch");
701
702 let hash = utils::calculate_object_hash(base_obj.object_type(), &result);
703 CacheObject {
705 info: CacheObjectInfo::BaseObject(base_obj.object_type(), hash),
706 offset: delta_obj.offset,
707 data_decompressed: result,
708 mem_recorder: None,
709 is_delta_in_pack: delta_obj.is_delta_in_pack,
710 } }
713 pub fn rebuild_zstdelta(delta_obj: CacheObject, base_obj: Arc<CacheObject>) -> CacheObject {
714 let result = zstdelta::apply(&base_obj.data_decompressed, &delta_obj.data_decompressed)
715 .expect("Failed to apply zstdelta");
716 let hash = utils::calculate_object_hash(base_obj.object_type(), &result);
717 CacheObject {
718 info: CacheObjectInfo::BaseObject(base_obj.object_type(), hash),
719 offset: delta_obj.offset,
720 data_decompressed: result,
721 mem_recorder: None,
722 is_delta_in_pack: delta_obj.is_delta_in_pack,
723 } }
726}
727
728#[cfg(test)]
729mod tests {
730 use std::fs;
731 use std::io::BufReader;
732 use std::io::Cursor;
733 use std::io::prelude::*;
734 use std::sync::Arc;
735 use std::sync::atomic::{AtomicUsize, Ordering};
736 use std::{env, path::PathBuf};
737
738 use flate2::Compression;
739 use flate2::write::ZlibEncoder;
740 use tokio_util::io::ReaderStream;
741
742 use crate::hash::{HashKind, ObjectHash, set_hash_kind};
743 use crate::internal::pack::Pack;
744 use crate::internal::pack::tests::init_logger;
745 use futures_util::TryStreamExt;
746
747 #[tokio::test]
748 async fn test_pack_check_header() {
749 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
750 source.push("tests/data/packs/git-2d187177923cd618a75da6c6db45bb89d92bd504.pack");
751
752 let f = fs::File::open(source).unwrap();
753 let mut buf_reader = BufReader::new(f);
754 let (object_num, _) = Pack::check_header(&mut buf_reader).unwrap();
755
756 assert_eq!(object_num, 358109);
757 }
758
759 #[test]
760 fn test_decompress_data() {
761 let data = b"Hello, world!"; let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
763 encoder.write_all(data).unwrap();
764 let compressed_data = encoder.finish().unwrap();
765 let compressed_size = compressed_data.len();
766
767 let mut cursor: Cursor<Vec<u8>> = Cursor::new(compressed_data);
769 let expected_size = data.len();
770
771 let result = Pack::decompress_data(&mut cursor, expected_size);
773 match result {
774 Ok((decompressed_data, bytes_read)) => {
775 assert_eq!(bytes_read, compressed_size);
776 assert_eq!(decompressed_data, data);
777 }
778 Err(e) => panic!("Decompression failed: {e:?}"),
779 }
780 }
781
782 #[test]
783 fn test_pack_decode_without_delta() {
784 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
785 source.push("tests/data/packs/pack-1d0e6c14760c956c173ede71cb28f33d921e232f.pack");
786
787 let tmp = PathBuf::from("/tmp/.cache_temp");
788
789 let f = fs::File::open(source).unwrap();
790 let mut buffered = BufReader::new(f);
791 let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
792 p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
793 .unwrap();
794 }
795 #[test]
796 fn test_pack_decode_without_delta_sha256() {
797 let _guard = set_hash_kind(HashKind::Sha256);
798 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
799 source.push("tests/data/packs/pack-78047853c60a1a3bb587f59598bdeb773fefc821f6f60f4f4797644ad43dad3d.pack");
800
801 let tmp = PathBuf::from("/tmp/.cache_temp");
802
803 let f = fs::File::open(source).unwrap();
804 let mut buffered = BufReader::new(f);
805 let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
806 p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
807 .unwrap();
808 }
809
810 #[test]
811 fn test_pack_decode_with_ref_delta() {
813 init_logger();
814
815 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
816 source.push("tests/data/packs/ref-delta-65d47638aa7cb7c39f1bd1d5011a415439b887a8.pack");
817
818 let tmp = PathBuf::from("/tmp/.cache_temp");
819
820 let f = fs::File::open(source).unwrap();
821 let mut buffered = BufReader::new(f);
822 let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
823 p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
824 .unwrap();
825 }
826 #[test]
827 fn test_pack_decode_with_ref_delta_sha256() {
828 let _guard = set_hash_kind(HashKind::Sha256);
829 init_logger();
830 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
831 source.push("tests/data/packs/ref-delta-0e26651d43b149c9baef6035c19cca140f82bb0d0cc5b12fda0ae89ff6a25195.pack");
832
833 let tmp = PathBuf::from("/tmp/.cache_temp");
834
835 let f = fs::File::open(source).unwrap();
836 let mut buffered = BufReader::new(f);
837 let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
838 p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
839 .unwrap();
840 }
841
842 #[test]
843 fn test_pack_decode_no_mem_limit() {
844 let _guard = set_hash_kind(HashKind::Sha1);
845 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
846 source.push("tests/data/packs/pack-1d0e6c14760c956c173ede71cb28f33d921e232f.pack");
847
848 let tmp = PathBuf::from("/tmp/.cache_temp");
849
850 let f = fs::File::open(source).unwrap();
851 let mut buffered = BufReader::new(f);
852 let mut p = Pack::new(None, None, Some(tmp), true);
853 p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
854 .unwrap();
855 }
856 #[test]
857 fn test_pack_decode_no_mem_limit_sha256() {
858 let _guard = set_hash_kind(HashKind::Sha256);
859 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
860 source.push("tests/data/packs/pack-78047853c60a1a3bb587f59598bdeb773fefc821f6f60f4f4797644ad43dad3d.pack");
861
862 let tmp = PathBuf::from("/tmp/.cache_temp");
863
864 let f = fs::File::open(source).unwrap();
865 let mut buffered = BufReader::new(f);
866 let mut p = Pack::new(None, None, Some(tmp), true);
867 p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
868 .unwrap();
869 }
870
871 #[tokio::test]
872 async fn test_pack_decode_with_large_file_with_delta_without_ref() {
873 let _guard = set_hash_kind(HashKind::Sha1);
874 init_logger();
875 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
876 source.push("tests/data/packs/git-2d187177923cd618a75da6c6db45bb89d92bd504.pack");
877
878 let tmp = PathBuf::from("/tmp/.cache_temp");
879
880 let f = fs::File::open(source).unwrap();
881 let mut buffered = BufReader::new(f);
882 let mut p = Pack::new(
883 Some(20),
884 Some(1024 * 1024 * 1024 * 1), Some(tmp.clone()),
886 true,
887 );
888 let rt = p.decode(
889 &mut buffered,
890 |_obj| {
891 },
893 None::<fn(ObjectHash)>,
894 );
895 if let Err(e) = rt {
896 fs::remove_dir_all(tmp).unwrap();
897 panic!("Error: {e:?}");
898 }
899 } #[tokio::test]
901 async fn test_pack_decode_with_large_file_with_delta_without_ref_sha256() {
902 let _guard = set_hash_kind(HashKind::Sha256);
903 init_logger();
904 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
905 source.push("tests/data/packs/git-large-sha256-f6455f09d816f54d115724975da6e7edfb100d746ad145bfd0d2ddc0e0261f5d.pack");
906
907 let tmp = PathBuf::from("/tmp/.cache_temp");
908
909 let f = fs::File::open(source).unwrap();
910 let mut buffered = BufReader::new(f);
911 let mut p = Pack::new(Some(20), Some(1024 * 1024), Some(tmp.clone()), true);
912 let rt = p.decode(
913 &mut buffered,
914 |_obj| {
915 },
917 None::<fn(ObjectHash)>,
918 );
919 if let Err(e) = rt {
920 fs::remove_dir_all(tmp).unwrap();
921 panic!("Error: {e:?}");
922 }
923 } #[tokio::test]
926 async fn test_decode_large_file_stream() {
927 init_logger();
928 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
929 source.push("tests/data/packs/git-2d187177923cd618a75da6c6db45bb89d92bd504.pack");
930
931 let tmp = PathBuf::from("/tmp/.cache_temp");
932 let f = tokio::fs::File::open(source).await.unwrap();
933 let stream = ReaderStream::new(f).map_err(axum::Error::new);
934 let p = Pack::new(
935 Some(20),
936 Some(1024 * 1024 * 1024 * 1),
937 Some(tmp.clone()),
938 true,
939 );
940
941 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
942 let handle = tokio::spawn(async move { p.decode_stream(stream, tx, None).await });
943 let count = Arc::new(AtomicUsize::new(0));
944 let count_c = count.clone();
945 let consume = tokio::spawn(async move {
947 let mut cnt = 0;
948 while let Some(_entry) = rx.recv().await {
949 cnt += 1;
950 }
951 tracing::info!("Received: {}", cnt);
952 count_c.store(cnt, Ordering::Release);
953 });
954 let p = handle.await.unwrap();
955 consume.await.unwrap();
956 assert_eq!(count.load(Ordering::Acquire), p.number);
957 assert_eq!(p.number, 358109);
958 }
959 #[tokio::test]
960 async fn test_decode_large_file_stream_sha256() {
961 let _guard = set_hash_kind(HashKind::Sha256);
962 init_logger();
963 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
964 source.push("tests/data/packs/git-large-sha256-f6455f09d816f54d115724975da6e7edfb100d746ad145bfd0d2ddc0e0261f5d.pack");
965
966 let tmp = PathBuf::from("/tmp/.cache_temp");
967 let f = tokio::fs::File::open(source).await.unwrap();
968 let stream = ReaderStream::new(f).map_err(axum::Error::new);
969 let p = Pack::new(Some(20), Some(1024 * 1024), Some(tmp.clone()), true);
970
971 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
972 let handle = tokio::spawn(async move { p.decode_stream(stream, tx, None).await });
973 let count = Arc::new(AtomicUsize::new(0));
974 let count_c = count.clone();
975 let consume = tokio::spawn(async move {
977 let mut cnt = 0;
978 while let Some(_entry) = rx.recv().await {
979 cnt += 1;
980 }
981 tracing::info!("Received: {}", cnt);
982 count_c.store(cnt, Ordering::Release);
983 });
984 let p = handle.await.unwrap();
985 consume.await.unwrap();
986 assert_eq!(count.load(Ordering::Acquire), p.number);
987 assert_eq!(p.number, 26);
988 }
989
990 #[tokio::test]
991 async fn test_decode_large_file_async() {
992 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
993 source.push("tests/data/packs/git-2d187177923cd618a75da6c6db45bb89d92bd504.pack");
994
995 let tmp = PathBuf::from("/tmp/.cache_temp");
996 let f = fs::File::open(source).unwrap();
997 let buffered = BufReader::new(f);
998 let p = Pack::new(
999 Some(20),
1000 Some(1024 * 1024 * 1024 * 1),
1001 Some(tmp.clone()),
1002 true,
1003 );
1004
1005 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1006 let handle = p.decode_async(buffered, tx); let mut cnt = 0;
1008 while let Some(_entry) = rx.recv().await {
1009 cnt += 1; }
1011 let p = handle.join().unwrap();
1012 assert_eq!(cnt, p.number);
1013 }
1014 #[tokio::test]
1015 async fn test_decode_large_file_async_sha256() {
1016 let _guard = set_hash_kind(HashKind::Sha256);
1017 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1018 source.push("tests/data/packs/git-large-sha256-f6455f09d816f54d115724975da6e7edfb100d746ad145bfd0d2ddc0e0261f5d.pack");
1019
1020 let tmp = PathBuf::from("/tmp/.cache_temp");
1021 let f = fs::File::open(source).unwrap();
1022 let buffered = BufReader::new(f);
1023 let p = Pack::new(Some(20), Some(1024 * 1024), Some(tmp.clone()), true);
1024
1025 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1026 let handle = p.decode_async(buffered, tx); let mut cnt = 0;
1028 while let Some(_entry) = rx.recv().await {
1029 cnt += 1; }
1031 let p = handle.join().unwrap();
1032 assert_eq!(cnt, p.number);
1033 }
1034
1035 #[test]
1036 fn test_pack_decode_with_delta_without_ref() {
1037 let _guard = set_hash_kind(HashKind::Sha1);
1038 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1039 source.push("tests/data/packs/pack-d50df695086eea6253a237cb5ac44af1629e7ced.pack");
1040
1041 let tmp = PathBuf::from("/tmp/.cache_temp");
1042
1043 let f = fs::File::open(source).unwrap();
1044 let mut buffered = BufReader::new(f);
1045 let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
1046 print!("pack_id: {:?}", p.signature);
1047 p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
1048 .unwrap();
1049 print!("pack_id: {:?}", p.signature.to_string());
1050 }
1051 #[test]
1052 fn test_pack_decode_with_delta_without_ref_sha256() {
1053 let _guard = set_hash_kind(HashKind::Sha256);
1054 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1055 source.push("tests/data/packs/pack-delta-sha256-3662654057d1adedf50f2a80bfdec752e00ac72798859d850ab1dcfd801beedd.pack");
1056
1057 let tmp = PathBuf::from("/tmp/.cache_temp");
1058
1059 let f = fs::File::open(source).unwrap();
1060 let mut buffered = BufReader::new(f);
1061 let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
1062 print!("pack_id: {:?}", p.signature); p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
1064 .unwrap();
1065 print!("pack_id: {:?}", p.signature.to_string());
1066 }
1067
1068 #[test] fn test_pack_decode_multi_task_with_large_file_with_delta_without_ref() {
1070 let _guard = set_hash_kind(HashKind::Sha1);
1071 let task1 = std::thread::spawn(|| {
1072 test_pack_decode_with_large_file_with_delta_without_ref();
1073 });
1074 let task2 = std::thread::spawn(|| {
1075 test_pack_decode_with_large_file_with_delta_without_ref();
1076 });
1077
1078 task1.join().unwrap();
1079 task2.join().unwrap();
1080 }
1081 #[test]
1082 fn test_pack_decode_multi_task_with_large_file_with_delta_without_ref_sha256() {
1083 let _guard = set_hash_kind(HashKind::Sha256);
1084 let task1 = std::thread::spawn(|| {
1085 test_pack_decode_with_large_file_with_delta_without_ref_sha256();
1086 });
1087 let task2 = std::thread::spawn(|| {
1088 test_pack_decode_with_large_file_with_delta_without_ref_sha256();
1089 });
1090
1091 task1.join().unwrap();
1092 task2.join().unwrap();
1093 }
1094}