1use std::io::{self, BufRead, Cursor, ErrorKind, Read};
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::thread::{self, JoinHandle};
6use std::time::Instant;
7
8use axum::Error;
9use bytes::Bytes;
10use flate2::bufread::ZlibDecoder;
11use futures_util::{Stream, StreamExt};
12use threadpool::ThreadPool;
13use tokio::sync::mpsc::UnboundedSender;
14use uuid::Uuid;
15
16use crate::errors::GitError;
17use crate::hash::SHA1;
18
19use crate::internal::metadata::{EntryMeta, MetaAttached};
20use crate::zstdelta;
21
22use crate::internal::object::types::ObjectType;
23
24use super::cache_object::CacheObjectInfo;
25use crate::internal::pack::cache::_Cache;
26use crate::internal::pack::cache::Caches;
27use crate::internal::pack::cache_object::{CacheObject, MemSizeRecorder};
28use crate::internal::pack::channel_reader::StreamBufReader;
29use crate::internal::pack::entry::Entry;
30use crate::internal::pack::waitlist::Waitlist;
31use crate::internal::pack::wrapper::Wrapper;
32use crate::internal::pack::{DEFAULT_TMP_DIR, Pack, utils};
33use crate::utils::CountingReader;
34
35struct SharedParams {
37 pub pool: Arc<ThreadPool>,
38 pub waitlist: Arc<Waitlist>,
39 pub caches: Arc<Caches>,
40 pub cache_objs_mem_size: Arc<AtomicUsize>,
41 pub callback: Arc<dyn Fn(MetaAttached<Entry, EntryMeta>) + Sync + Send>,
42}
43
44impl Drop for Pack {
45 fn drop(&mut self) {
46 if self.clean_tmp {
47 self.caches.remove_tmp_dir();
48 }
49 }
50}
51
52impl Pack {
53 pub fn new(
63 thread_num: Option<usize>,
64 mem_limit: Option<usize>,
65 temp_path: Option<PathBuf>,
66 clean_tmp: bool,
67 ) -> Self {
68 let mut temp_path = temp_path.unwrap_or(PathBuf::from(DEFAULT_TMP_DIR));
69 loop {
71 let sub_dir = Uuid::new_v4().to_string()[..8].to_string();
72 temp_path.push(sub_dir);
73 if !temp_path.exists() {
74 break;
75 }
76 temp_path.pop();
77 }
78 let thread_num = thread_num.unwrap_or_else(num_cpus::get);
79 let cache_mem_size = mem_limit.map(|mem_limit| mem_limit * 4 / 5);
80 Pack {
81 number: 0,
82 signature: SHA1::default(),
83 objects: Vec::new(),
84 pool: Arc::new(ThreadPool::new(thread_num)),
85 waitlist: Arc::new(Waitlist::new()),
86 caches: Arc::new(Caches::new(cache_mem_size, temp_path, thread_num)),
87 mem_limit,
88 cache_objs_mem: Arc::new(AtomicUsize::default()),
89 clean_tmp,
90 }
91 }
92
93 pub fn check_header(pack: &mut impl BufRead) -> Result<(u32, Vec<u8>), GitError> {
117 let mut header_data = Vec::new();
119
120 let mut magic = [0; 4];
122 let result = pack.read_exact(&mut magic);
124 match result {
125 Ok(_) => {
126 header_data.extend_from_slice(&magic);
128
129 if magic != *b"PACK" {
131 return Err(GitError::InvalidPackHeader(format!(
133 "{},{},{},{}",
134 magic[0], magic[1], magic[2], magic[3]
135 )));
136 }
137 }
138 Err(e) => {
139 return Err(GitError::InvalidPackFile(format!(
141 "Error reading magic identifier: {e}"
142 )));
143 }
144 }
145
146 let mut version_bytes = [0; 4];
148 let result = pack.read_exact(&mut version_bytes); match result {
150 Ok(_) => {
151 header_data.extend_from_slice(&version_bytes);
153
154 let version = u32::from_be_bytes(version_bytes);
156 if version != 2 {
157 return Err(GitError::InvalidPackFile(format!(
159 "Version Number is {version}, not 2"
160 )));
161 }
162 }
163 Err(e) => {
164 return Err(GitError::InvalidPackFile(format!(
166 "Error reading version number: {e}"
167 )));
168 }
169 }
170
171 let mut object_num_bytes = [0; 4];
173 let result = pack.read_exact(&mut object_num_bytes);
175 match result {
176 Ok(_) => {
177 header_data.extend_from_slice(&object_num_bytes);
179 let object_num = u32::from_be_bytes(object_num_bytes);
181 Ok((object_num, header_data))
183 }
184 Err(e) => {
185 Err(GitError::InvalidPackFile(format!(
187 "Error reading object number: {e}"
188 )))
189 }
190 }
191 }
192
193 pub fn decompress_data(
205 pack: &mut (impl BufRead + Send),
206 expected_size: usize,
207 ) -> Result<(Vec<u8>, usize), GitError> {
208 let mut buf = Vec::with_capacity(expected_size);
210
211 let mut counting_reader = CountingReader::new(pack);
212 let mut deflate = ZlibDecoder::new(&mut counting_reader);
215 match deflate.read_to_end(&mut buf) {
217 Ok(_) => {
218 if buf.len() != expected_size {
220 Err(GitError::InvalidPackFile(format!(
221 "The object size {} does not match the expected size {}",
222 buf.len(),
223 expected_size
224 )))
225 } else {
226 let actual_input_bytes = counting_reader.bytes_read as usize;
228 Ok((buf, actual_input_bytes))
229 }
230 }
231 Err(e) => {
232 Err(GitError::InvalidPackFile(format!(
234 "Decompression error: {e}"
235 )))
236 }
237 }
238 }
239
240 pub fn decode_pack_object(
252 pack: &mut (impl BufRead + Send),
253 offset: &mut usize,
254 ) -> Result<CacheObject, GitError> {
255 let init_offset = *offset;
256
257 let (type_bits, size) = match utils::read_type_and_varint_size(pack, offset) {
259 Ok(result) => result,
260 Err(e) => {
261 return Err(GitError::InvalidPackFile(format!("Read error: {e}")));
264 }
265 };
266
267 let t = ObjectType::from_u8(type_bits)?;
269
270 match t {
271 ObjectType::Commit | ObjectType::Tree | ObjectType::Blob | ObjectType::Tag => {
272 let (data, raw_size) = Pack::decompress_data(pack, size)?;
273 *offset += raw_size;
274 Ok(CacheObject::new_for_undeltified(t, data, init_offset))
275 }
276 ObjectType::OffsetDelta | ObjectType::OffsetZstdelta => {
277 let (delta_offset, bytes) = utils::read_offset_encoding(pack).unwrap();
278 *offset += bytes;
279
280 let (data, raw_size) = Pack::decompress_data(pack, size)?;
281 *offset += raw_size;
282
283 let base_offset = init_offset
285 .checked_sub(delta_offset as usize)
286 .ok_or_else(|| {
287 GitError::InvalidObjectInfo("Invalid OffsetDelta offset".to_string())
288 })
289 .unwrap();
290
291 let mut reader = Cursor::new(&data);
292 let (_, final_size) = utils::read_delta_object_size(&mut reader)?;
293
294 let obj_info = match t {
295 ObjectType::OffsetDelta => {
296 CacheObjectInfo::OffsetDelta(base_offset, final_size)
297 }
298 ObjectType::OffsetZstdelta => {
299 CacheObjectInfo::OffsetZstdelta(base_offset, final_size)
300 }
301 _ => unreachable!(),
302 };
303 Ok(CacheObject {
304 info: obj_info,
305 offset: init_offset,
306 data_decompressed: data,
307 mem_recorder: None,
308 is_delta_in_pack: true,
309 })
310 }
311 ObjectType::HashDelta => {
312 let ref_sha1 = SHA1::from_stream(pack).unwrap();
314 *offset += SHA1::SIZE;
316
317 let (data, raw_size) = Pack::decompress_data(pack, size)?;
318 *offset += raw_size;
319
320 let mut reader = Cursor::new(&data);
321 let (_, final_size) = utils::read_delta_object_size(&mut reader)?;
322
323 Ok(CacheObject {
324 info: CacheObjectInfo::HashDelta(ref_sha1, final_size),
325 offset: init_offset,
326 data_decompressed: data,
327 mem_recorder: None,
328 is_delta_in_pack: true,
329 })
330 }
331 }
332 }
333
334 pub fn decode<F, C>(
341 &mut self,
342 pack: &mut (impl BufRead + Send),
343 callback: F,
344 pack_id_callback: Option<C>,
345 ) -> Result<(), GitError>
346 where
347 F: Fn(MetaAttached<Entry, EntryMeta>) + Sync + Send + 'static,
348 C: FnOnce(SHA1) + Send + 'static,
349 {
350 let time = Instant::now();
351 let mut last_update_time = time.elapsed().as_millis();
352 let log_info = |_i: usize, pack: &Pack| {
353 tracing::info!(
354 "time {:.2} s \t decode: {:?} \t dec-num: {} \t cah-num: {} \t Objs: {} MB \t CacheUsed: {} MB",
355 time.elapsed().as_millis() as f64 / 1000.0,
356 _i,
357 pack.pool.queued_count(),
358 pack.caches.queued_tasks(),
359 pack.cache_objs_mem_used() / 1024 / 1024,
360 pack.caches.memory_used() / 1024 / 1024
361 );
362 };
363 let callback = Arc::new(callback);
364
365 let caches = self.caches.clone();
366 let mut reader = Wrapper::new(io::BufReader::new(pack));
367
368 let result = Pack::check_header(&mut reader);
369 match result {
370 Ok((object_num, _)) => {
371 self.number = object_num as usize;
372 }
373 Err(e) => {
374 return Err(e);
375 }
376 }
377 tracing::info!("The pack file has {} objects", self.number);
378 let mut offset: usize = 12;
379 let mut i = 0;
380 while i < self.number {
381 if i % 1000 == 0 {
383 let time_now = time.elapsed().as_millis();
384 if time_now - last_update_time > 1000 {
385 log_info(i, self);
386 last_update_time = time_now;
387 }
388 }
389 while self.pool.queued_count() > 2000
392 || self
393 .mem_limit
394 .map(|limit| self.memory_used() > limit)
395 .unwrap_or(false)
396 {
397 thread::yield_now();
398 }
399 let r: Result<CacheObject, GitError> =
400 Pack::decode_pack_object(&mut reader, &mut offset);
401 match r {
402 Ok(mut obj) => {
403 obj.set_mem_recorder(self.cache_objs_mem.clone());
404 obj.record_mem_size();
405
406 let params = Arc::new(SharedParams {
408 pool: self.pool.clone(),
409 waitlist: self.waitlist.clone(),
410 caches: self.caches.clone(),
411 cache_objs_mem_size: self.cache_objs_mem.clone(),
412 callback: callback.clone(),
413 });
414
415 let caches = caches.clone();
416 let waitlist = self.waitlist.clone();
417 self.pool.execute(move || {
418 match obj.info {
419 CacheObjectInfo::BaseObject(_, _) => {
420 Self::cache_obj_and_process_waitlist(params, obj);
421 }
422 CacheObjectInfo::OffsetDelta(base_offset, _)
423 | CacheObjectInfo::OffsetZstdelta(base_offset, _) => {
424 if let Some(base_obj) = caches.get_by_offset(base_offset) {
425 Self::process_delta(params, obj, base_obj);
426 } else {
427 waitlist.insert_offset(base_offset, obj);
430 if let Some(base_obj) = caches.get_by_offset(base_offset) {
432 Self::process_waitlist(params, base_obj);
433 }
434 }
435 }
436 CacheObjectInfo::HashDelta(base_ref, _) => {
437 if let Some(base_obj) = caches.get_by_hash(base_ref) {
438 Self::process_delta(params, obj, base_obj);
439 } else {
440 waitlist.insert_ref(base_ref, obj);
441 if let Some(base_obj) = caches.get_by_hash(base_ref) {
442 Self::process_waitlist(params, base_obj);
443 }
444 }
445 }
446 }
447 });
448 }
449 Err(e) => {
450 return Err(e);
451 }
452 }
453 i += 1;
454 }
455 log_info(i, self);
456 let render_hash = reader.final_hash();
457 self.signature = SHA1::from_stream(&mut reader).unwrap();
458
459 if render_hash != self.signature {
460 return Err(GitError::InvalidPackFile(format!(
461 "The pack file hash {} does not match the trailer hash {}",
462 render_hash, self.signature
463 )));
464 }
465
466 let end = utils::is_eof(&mut reader);
467 if !end {
468 return Err(GitError::InvalidPackFile(
469 "The pack file is not at the end".to_string(),
470 ));
471 }
472
473 self.pool.join(); if let Some(pack_callback) = pack_id_callback {
477 pack_callback(self.signature);
478 }
479 assert_eq!(self.waitlist.map_offset.len(), 0);
482 assert_eq!(self.waitlist.map_ref.len(), 0);
483 assert_eq!(self.number, caches.total_inserted());
484 tracing::info!(
485 "The pack file has been decoded successfully, takes: [ {:?} ]",
486 time.elapsed()
487 );
488 self.caches.clear(); assert_eq!(self.cache_objs_mem_used(), 0); Ok(())
497 }
498
499 pub fn decode_async(
502 mut self,
503 mut pack: impl BufRead + Send + 'static,
504 sender: UnboundedSender<Entry>,
505 ) -> JoinHandle<Pack> {
506 thread::spawn(move || {
507 self.decode(
508 &mut pack,
509 move |entry| {
510 if let Err(e) = sender.send(entry.inner) {
511 eprintln!("Channel full, failed to send entry: {e:?}");
512 }
513 },
514 None::<fn(SHA1)>,
515 )
516 .unwrap();
517 self
518 })
519 }
520
521 pub async fn decode_stream(
523 mut self,
524 mut stream: impl Stream<Item = Result<Bytes, Error>> + Unpin + Send + 'static,
525 sender: UnboundedSender<MetaAttached<Entry, EntryMeta>>,
526 pack_hash_send: Option<UnboundedSender<SHA1>>,
527 ) -> Self {
528 let (tx, rx) = std::sync::mpsc::channel();
529 let mut reader = StreamBufReader::new(rx);
530 tokio::spawn(async move {
531 while let Some(chunk) = stream.next().await {
532 let data = chunk.unwrap().to_vec();
533 if let Err(e) = tx.send(data) {
534 eprintln!("Sending Error: {e:?}");
535 break;
536 }
537 }
538 });
539 tokio::task::spawn_blocking(move || {
542 self.decode(
543 &mut reader,
544 move |entry: MetaAttached<Entry, EntryMeta>| {
545 if let Err(e) = sender.send(entry) {
547 eprintln!("unbound channel Sending Error: {e:?}");
548 }
549 },
550 Some(move |pack_id: SHA1| {
551 if let Some(pack_id_send) = pack_hash_send
552 && let Err(e) = pack_id_send.send(pack_id)
553 {
554 eprintln!("unbound channel Sending Error: {e:?}");
555 }
556 }),
557 )
558 .unwrap();
559 self
560 })
561 .await
562 .unwrap()
563 }
564
565 fn memory_used(&self) -> usize {
567 self.cache_objs_mem_used() + self.caches.memory_used_index()
568 }
569
570 fn cache_objs_mem_used(&self) -> usize {
572 self.cache_objs_mem.load(Ordering::Acquire)
573 }
574
575 fn process_delta(
578 shared_params: Arc<SharedParams>,
579 delta_obj: CacheObject,
580 base_obj: Arc<CacheObject>,
581 ) {
582 shared_params.pool.clone().execute(move || {
583 let mut new_obj = match delta_obj.info {
584 CacheObjectInfo::OffsetDelta(_, _) | CacheObjectInfo::HashDelta(_, _) => {
585 Pack::rebuild_delta(delta_obj, base_obj)
586 }
587 CacheObjectInfo::OffsetZstdelta(_, _) => {
588 Pack::rebuild_zstdelta(delta_obj, base_obj)
589 }
590 _ => unreachable!(),
591 };
592
593 new_obj.set_mem_recorder(shared_params.cache_objs_mem_size.clone());
594 new_obj.record_mem_size();
595 Self::cache_obj_and_process_waitlist(shared_params, new_obj); });
597 }
598
599 fn cache_obj_and_process_waitlist(shared_params: Arc<SharedParams>, new_obj: CacheObject) {
601 (shared_params.callback)(new_obj.to_entry_metadata());
602 let new_obj = shared_params.caches.insert(
603 new_obj.offset,
604 new_obj.base_object_hash().unwrap(),
605 new_obj,
606 );
607 Self::process_waitlist(shared_params, new_obj);
608 }
609
610 fn process_waitlist(shared_params: Arc<SharedParams>, base_obj: Arc<CacheObject>) {
611 let wait_objs = shared_params
612 .waitlist
613 .take(base_obj.offset, base_obj.base_object_hash().unwrap());
614 for obj in wait_objs {
615 Self::process_delta(shared_params.clone(), obj, base_obj.clone());
617 }
618 }
619
620 pub fn rebuild_delta(delta_obj: CacheObject, base_obj: Arc<CacheObject>) -> CacheObject {
623 const COPY_INSTRUCTION_FLAG: u8 = 1 << 7;
624 const COPY_OFFSET_BYTES: u8 = 4;
625 const COPY_SIZE_BYTES: u8 = 3;
626 const COPY_ZERO_SIZE: usize = 0x10000;
627
628 let mut stream = Cursor::new(&delta_obj.data_decompressed);
629
630 let (base_size, result_size) = utils::read_delta_object_size(&mut stream).unwrap();
633
634 let base_info = &base_obj.data_decompressed;
636 assert_eq!(base_info.len(), base_size, "Base object size mismatch");
637
638 let mut result = Vec::with_capacity(result_size);
639
640 loop {
641 let instruction = match utils::read_bytes(&mut stream) {
643 Ok([instruction]) => instruction,
644 Err(err) if err.kind() == ErrorKind::UnexpectedEof => break,
645 Err(err) => {
646 panic!(
647 "{}",
648 GitError::DeltaObjectError(format!("Wrong instruction in delta :{err}"))
649 );
650 }
651 };
652
653 if instruction & COPY_INSTRUCTION_FLAG == 0 {
654 if instruction == 0 {
656 panic!(
658 "{}",
659 GitError::DeltaObjectError(String::from("Invalid data instruction"))
660 );
661 }
662
663 let mut data = vec![0; instruction as usize];
665 stream.read_exact(&mut data).unwrap();
666 result.extend_from_slice(&data);
667 } else {
668 let mut nonzero_bytes = instruction;
673 let offset =
674 utils::read_partial_int(&mut stream, COPY_OFFSET_BYTES, &mut nonzero_bytes)
675 .unwrap();
676 let mut size =
677 utils::read_partial_int(&mut stream, COPY_SIZE_BYTES, &mut nonzero_bytes)
678 .unwrap();
679 if size == 0 {
680 size = COPY_ZERO_SIZE;
682 }
683 let base_data = base_info.get(offset..(offset + size)).ok_or_else(|| {
685 GitError::DeltaObjectError("Invalid copy instruction".to_string())
686 });
687
688 match base_data {
689 Ok(data) => result.extend_from_slice(data),
690 Err(e) => panic!("{}", e),
691 }
692 }
693 }
694 assert_eq!(result_size, result.len(), "Result size mismatch");
695
696 let hash = utils::calculate_object_hash(base_obj.object_type(), &result);
697 CacheObject {
699 info: CacheObjectInfo::BaseObject(base_obj.object_type(), hash),
700 offset: delta_obj.offset,
701 data_decompressed: result,
702 mem_recorder: None,
703 is_delta_in_pack: delta_obj.is_delta_in_pack,
704 } }
707 pub fn rebuild_zstdelta(delta_obj: CacheObject, base_obj: Arc<CacheObject>) -> CacheObject {
708 let result = zstdelta::apply(&base_obj.data_decompressed, &delta_obj.data_decompressed)
709 .expect("Failed to apply zstdelta");
710 let hash = utils::calculate_object_hash(base_obj.object_type(), &result);
711 CacheObject {
712 info: CacheObjectInfo::BaseObject(base_obj.object_type(), hash),
713 offset: delta_obj.offset,
714 data_decompressed: result,
715 mem_recorder: None,
716 is_delta_in_pack: delta_obj.is_delta_in_pack,
717 } }
720}
721
722#[cfg(test)]
723mod tests {
724 use std::fs;
725 use std::io::BufReader;
726 use std::io::Cursor;
727 use std::io::prelude::*;
728 use std::sync::Arc;
729 use std::sync::atomic::{AtomicUsize, Ordering};
730 use std::{env, path::PathBuf};
731
732 use flate2::Compression;
733 use flate2::write::ZlibEncoder;
734 use tokio_util::io::ReaderStream;
735
736 use crate::hash::SHA1;
737 use crate::internal::pack::Pack;
738 use crate::internal::pack::tests::init_logger;
739 use futures_util::TryStreamExt;
740
741 #[tokio::test]
742 async fn test_pack_check_header() {
743 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
744 source.push("tests/data/packs/git-2d187177923cd618a75da6c6db45bb89d92bd504.pack");
745
746 let f = fs::File::open(source).unwrap();
747 let mut buf_reader = BufReader::new(f);
748 let (object_num, _) = Pack::check_header(&mut buf_reader).unwrap();
749
750 assert_eq!(object_num, 358109);
751 }
752
753 #[test]
754 fn test_decompress_data() {
755 let data = b"Hello, world!"; let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
757 encoder.write_all(data).unwrap();
758 let compressed_data = encoder.finish().unwrap();
759 let compressed_size = compressed_data.len();
760
761 let mut cursor: Cursor<Vec<u8>> = Cursor::new(compressed_data);
763 let expected_size = data.len();
764
765 let result = Pack::decompress_data(&mut cursor, expected_size);
767 match result {
768 Ok((decompressed_data, bytes_read)) => {
769 assert_eq!(bytes_read, compressed_size);
770 assert_eq!(decompressed_data, data);
771 }
772 Err(e) => panic!("Decompression failed: {e:?}"),
773 }
774 }
775
776 #[test]
777 fn test_pack_decode_without_delta() {
778 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
779 source.push("tests/data/packs/pack-1d0e6c14760c956c173ede71cb28f33d921e232f.pack");
780
781 let tmp = PathBuf::from("/tmp/.cache_temp");
782
783 let f = fs::File::open(source).unwrap();
784 let mut buffered = BufReader::new(f);
785 let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
786 p.decode(&mut buffered, |_| {}, None::<fn(SHA1)>).unwrap();
787 }
788
789 #[test]
790 fn test_pack_decode_with_ref_delta() {
792 init_logger();
793
794 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
795 source.push("tests/data/packs/ref-delta-65d47638aa7cb7c39f1bd1d5011a415439b887a8.pack");
796
797 let tmp = PathBuf::from("/tmp/.cache_temp");
798
799 let f = fs::File::open(source).unwrap();
800 let mut buffered = BufReader::new(f);
801 let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
802 p.decode(&mut buffered, |_| {}, None::<fn(SHA1)>).unwrap();
803 }
804
805 #[test]
806 fn test_pack_decode_no_mem_limit() {
807 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
808 source.push("tests/data/packs/pack-1d0e6c14760c956c173ede71cb28f33d921e232f.pack");
809
810 let tmp = PathBuf::from("/tmp/.cache_temp");
811
812 let f = fs::File::open(source).unwrap();
813 let mut buffered = BufReader::new(f);
814 let mut p = Pack::new(None, None, Some(tmp), true);
815 p.decode(&mut buffered, |_| {}, None::<fn(SHA1)>).unwrap();
816 }
817
818 #[tokio::test]
819 async fn test_pack_decode_with_large_file_with_delta_without_ref() {
820 init_logger();
821 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
822 source.push("tests/data/packs/git-2d187177923cd618a75da6c6db45bb89d92bd504.pack");
823
824 let tmp = PathBuf::from("/tmp/.cache_temp");
825
826 let f = fs::File::open(source).unwrap();
827 let mut buffered = BufReader::new(f);
828 let mut p = Pack::new(
829 Some(20),
830 Some(1024 * 1024 * 1024 * 2),
831 Some(tmp.clone()),
832 true,
833 );
834 let rt = p.decode(
835 &mut buffered,
836 |_obj| {
837 },
839 None::<fn(SHA1)>,
840 );
841 if let Err(e) = rt {
842 fs::remove_dir_all(tmp).unwrap();
843 panic!("Error: {e:?}");
844 }
845 } #[tokio::test]
848 async fn test_decode_large_file_stream() {
849 init_logger();
850 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
851 source.push("tests/data/packs/git-2d187177923cd618a75da6c6db45bb89d92bd504.pack");
852
853 let tmp = PathBuf::from("/tmp/.cache_temp");
854 let f = tokio::fs::File::open(source).await.unwrap();
855 let stream = ReaderStream::new(f).map_err(axum::Error::new);
856 let p = Pack::new(
857 Some(20),
858 Some(1024 * 1024 * 1024 * 4),
859 Some(tmp.clone()),
860 true,
861 );
862
863 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
864 let handle = tokio::spawn(async move { p.decode_stream(stream, tx, None).await });
865 let count = Arc::new(AtomicUsize::new(0));
866 let count_c = count.clone();
867 let consume = tokio::spawn(async move {
869 let mut cnt = 0;
870 while let Some(_entry) = rx.recv().await {
871 cnt += 1;
872 }
873 tracing::info!("Received: {}", cnt);
874 count_c.store(cnt, Ordering::Release);
875 });
876 let p = handle.await.unwrap();
877 consume.await.unwrap();
878 assert_eq!(count.load(Ordering::Acquire), p.number);
879 assert_eq!(p.number, 358109);
880 }
881
882 #[tokio::test]
883 async fn test_decode_large_file_async() {
884 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
885 source.push("tests/data/packs/git-2d187177923cd618a75da6c6db45bb89d92bd504.pack");
886
887 let tmp = PathBuf::from("/tmp/.cache_temp");
888 let f = fs::File::open(source).unwrap();
889 let buffered = BufReader::new(f);
890 let p = Pack::new(
891 Some(20),
892 Some(1024 * 1024 * 1024 * 2),
893 Some(tmp.clone()),
894 true,
895 );
896
897 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
898 let handle = p.decode_async(buffered, tx); let mut cnt = 0;
900 while let Some(_entry) = rx.recv().await {
901 cnt += 1; }
903 let p = handle.join().unwrap();
904 assert_eq!(cnt, p.number);
905 }
906
907 #[test]
908 fn test_pack_decode_with_delta_without_ref() {
909 let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
910 source.push("tests/data/packs/pack-d50df695086eea6253a237cb5ac44af1629e7ced.pack");
911
912 let tmp = PathBuf::from("/tmp/.cache_temp");
913
914 let f = fs::File::open(source).unwrap();
915 let mut buffered = BufReader::new(f);
916 let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
917 print!("pack_id: {:?}", p.signature);
918 p.decode(&mut buffered, |_| {}, None::<fn(SHA1)>).unwrap();
919 print!("pack_id: {:?}", p.signature.to_string());
920 }
921
922 #[test] fn test_pack_decode_multi_task_with_large_file_with_delta_without_ref() {
924 let task1 = std::thread::spawn(|| {
925 test_pack_decode_with_large_file_with_delta_without_ref();
926 });
927 let task2 = std::thread::spawn(|| {
928 test_pack_decode_with_large_file_with_delta_without_ref();
929 });
930
931 task1.join().unwrap();
932 task2.join().unwrap();
933 }
934}