1use std::io::{self, BufRead, Cursor, ErrorKind, Read};
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::thread::{self, JoinHandle};
6use std::time::Instant;
7
8use axum::Error;
9use bytes::Bytes;
10use flate2::bufread::ZlibDecoder;
11use futures_util::{Stream, StreamExt};
12use threadpool::ThreadPool;
13use tokio::sync::mpsc::UnboundedSender;
14use uuid::Uuid;
15
16use crate::errors::GitError;
17use crate::hash::SHA1;
18use crate::zstdelta;
19use crate::internal::object::types::ObjectType;
20
21use super::cache_object::CacheObjectInfo;
22use crate::internal::pack::cache::_Cache;
23use crate::internal::pack::cache::Caches;
24use crate::internal::pack::cache_object::{CacheObject, MemSizeRecorder};
25use crate::internal::pack::channel_reader::StreamBufReader;
26use crate::internal::pack::entry::Entry;
27use crate::internal::pack::waitlist::Waitlist;
28use crate::internal::pack::wrapper::Wrapper;
29use crate::internal::pack::{DEFAULT_TMP_DIR, Pack, utils};
30use crate::utils::CountingReader;
31
32struct SharedParams {
34 pub pool: Arc<ThreadPool>,
35 pub waitlist: Arc<Waitlist>,
36 pub caches: Arc<Caches>,
37 pub cache_objs_mem_size: Arc<AtomicUsize>,
38 pub callback: Arc<dyn Fn(Entry, usize) + Sync + Send>,
39}
40
41impl Drop for Pack {
42 fn drop(&mut self) {
43 if self.clean_tmp {
44 self.caches.remove_tmp_dir();
45 }
46 }
47}
48
49impl Pack {
50 pub fn new(
60 thread_num: Option<usize>,
61 mem_limit: Option<usize>,
62 temp_path: Option<PathBuf>,
63 clean_tmp: bool,
64 ) -> Self {
65 let mut temp_path = temp_path.unwrap_or(PathBuf::from(DEFAULT_TMP_DIR));
66 loop {
68 let sub_dir = Uuid::new_v4().to_string()[..8].to_string();
69 temp_path.push(sub_dir);
70 if !temp_path.exists() {
71 break;
72 }
73 temp_path.pop();
74 }
75 let thread_num = thread_num.unwrap_or_else(num_cpus::get);
76 let cache_mem_size = mem_limit.map(|mem_limit| mem_limit * 4 / 5);
77 Pack {
78 number: 0,
79 signature: SHA1::default(),
80 objects: Vec::new(),
81 pool: Arc::new(ThreadPool::new(thread_num)),
82 waitlist: Arc::new(Waitlist::new()),
83 caches: Arc::new(Caches::new(cache_mem_size, temp_path, thread_num)),
84 mem_limit,
85 cache_objs_mem: Arc::new(AtomicUsize::default()),
86 clean_tmp,
87 }
88 }
89
90 pub fn check_header(pack: &mut impl BufRead) -> Result<(u32, Vec<u8>), GitError> {
114 let mut header_data = Vec::new();
116
117 let mut magic = [0; 4];
119 let result = pack.read_exact(&mut magic);
121 match result {
122 Ok(_) => {
123 header_data.extend_from_slice(&magic);
125
126 if magic != *b"PACK" {
128 return Err(GitError::InvalidPackHeader(format!(
130 "{},{},{},{}",
131 magic[0], magic[1], magic[2], magic[3]
132 )));
133 }
134 }
135 Err(e) => {
136 return Err(GitError::InvalidPackFile(format!(
138 "Error reading magic identifier: {e}"
139 )));
140 }
141 }
142
143 let mut version_bytes = [0; 4];
145 let result = pack.read_exact(&mut version_bytes); match result {
147 Ok(_) => {
148 header_data.extend_from_slice(&version_bytes);
150
151 let version = u32::from_be_bytes(version_bytes);
153 if version != 2 {
154 return Err(GitError::InvalidPackFile(format!(
156 "Version Number is {version}, not 2"
157 )));
158 }
159 }
160 Err(e) => {
161 return Err(GitError::InvalidPackFile(format!(
163 "Error reading version number: {e}"
164 )));
165 }
166 }
167
168 let mut object_num_bytes = [0; 4];
170 let result = pack.read_exact(&mut object_num_bytes);
172 match result {
173 Ok(_) => {
174 header_data.extend_from_slice(&object_num_bytes);
176 let object_num = u32::from_be_bytes(object_num_bytes);
178 Ok((object_num, header_data))
180 }
181 Err(e) => {
182 Err(GitError::InvalidPackFile(format!(
184 "Error reading object number: {e}"
185 )))
186 }
187 }
188 }
189
190 pub fn decompress_data(
202 pack: &mut (impl BufRead + Send),
203 expected_size: usize,
204 ) -> Result<(Vec<u8>, usize), GitError> {
205 let mut buf = Vec::with_capacity(expected_size);
207
208 let mut counting_reader = CountingReader::new(pack);
209 let mut deflate = ZlibDecoder::new(&mut counting_reader);
212 match deflate.read_to_end(&mut buf) {
214 Ok(_) => {
215 if buf.len() != expected_size {
217 Err(GitError::InvalidPackFile(format!(
218 "The object size {} does not match the expected size {}",
219 buf.len(),
220 expected_size
221 )))
222 } else {
223 let actual_input_bytes = counting_reader.bytes_read as usize;
225 Ok((buf, actual_input_bytes))
226 }
227 }
228 Err(e) => {
229 Err(GitError::InvalidPackFile(format!(
231 "Decompression error: {e}"
232 )))
233 }
234 }
235 }
236
237 pub fn decode_pack_object(
249 pack: &mut (impl BufRead + Send),
250 offset: &mut usize,
251 ) -> Result<CacheObject, GitError> {
252 let init_offset = *offset;
253
254 let (type_bits, size) = match utils::read_type_and_varint_size(pack, offset) {
256 Ok(result) => result,
257 Err(e) => {
258 return Err(GitError::InvalidPackFile(format!("Read error: {e}")));
261 }
262 };
263
264 let t = ObjectType::from_u8(type_bits)?;
266
267 match t {
268 ObjectType::Commit | ObjectType::Tree | ObjectType::Blob | ObjectType::Tag => {
269 let (data, raw_size) = Pack::decompress_data(pack, size)?;
270 *offset += raw_size;
271 Ok(CacheObject::new_for_undeltified(t, data, init_offset))
272 }
273 ObjectType::OffsetDelta | ObjectType::OffsetZstdelta => {
274 let (delta_offset, bytes) = utils::read_offset_encoding(pack).unwrap();
275 *offset += bytes;
276
277 let (data, raw_size) = Pack::decompress_data(pack, size)?;
278 *offset += raw_size;
279
280 let base_offset = init_offset
282 .checked_sub(delta_offset as usize)
283 .ok_or_else(|| {
284 GitError::InvalidObjectInfo("Invalid OffsetDelta offset".to_string())
285 })
286 .unwrap();
287
288 let mut reader = Cursor::new(&data);
289 let (_, final_size) = utils::read_delta_object_size(&mut reader)?;
290
291 let obj_info = match t {
292 ObjectType::OffsetDelta => {
293 CacheObjectInfo::OffsetDelta(base_offset, final_size)
294 }
295 ObjectType::OffsetZstdelta => {
296 CacheObjectInfo::OffsetZstdelta(base_offset, final_size)
297 }
298 _ => unreachable!(),
299 };
300 Ok(CacheObject {
301 info: obj_info,
302 offset: init_offset,
303 data_decompressed: data,
304 mem_recorder: None,
305 })
306 }
307 ObjectType::HashDelta => {
308 let ref_sha1 = SHA1::from_stream(pack).unwrap();
310 *offset += SHA1::SIZE;
312
313 let (data, raw_size) = Pack::decompress_data(pack, size)?;
314 *offset += raw_size;
315
316 let mut reader = Cursor::new(&data);
317 let (_, final_size) = utils::read_delta_object_size(&mut reader)?;
318
319 Ok(CacheObject {
320 info: CacheObjectInfo::HashDelta(ref_sha1, final_size),
321 offset: init_offset,
322 data_decompressed: data,
323 mem_recorder: None,
324 })
325 }
326 }
327 }
328
329 pub fn decode<F>(
332 &mut self,
333 pack: &mut (impl BufRead + Send),
334 callback: F,
335 ) -> Result<(), GitError>
336 where
337 F: Fn(Entry, usize) + Sync + Send + 'static,
338 {
339 let time = Instant::now();
340 let mut last_update_time = time.elapsed().as_millis();
341 let log_info = |_i: usize, pack: &Pack| {
342 tracing::info!(
343 "time {:.2} s \t decode: {:?} \t dec-num: {} \t cah-num: {} \t Objs: {} MB \t CacheUsed: {} MB",
344 time.elapsed().as_millis() as f64 / 1000.0,
345 _i,
346 pack.pool.queued_count(),
347 pack.caches.queued_tasks(),
348 pack.cache_objs_mem_used() / 1024 / 1024,
349 pack.caches.memory_used() / 1024 / 1024
350 );
351 };
352 let callback = Arc::new(callback);
353
354 let caches = self.caches.clone();
355 let mut reader = Wrapper::new(io::BufReader::new(pack));
356
357 let result = Pack::check_header(&mut reader);
358 match result {
359 Ok((object_num, _)) => {
360 self.number = object_num as usize;
361 }
362 Err(e) => {
363 return Err(e);
364 }
365 }
366 tracing::info!("The pack file has {} objects", self.number);
367 let mut offset: usize = 12;
368 let mut i = 0;
369 while i < self.number {
370 if i % 1000 == 0 {
372 let time_now = time.elapsed().as_millis();
373 if time_now - last_update_time > 1000 {
374 log_info(i, self);
375 last_update_time = time_now;
376 }
377 }
378 while self.pool.queued_count() > 2000
381 || self
382 .mem_limit
383 .map(|limit| self.memory_used() > limit)
384 .unwrap_or(false)
385 {
386 thread::yield_now();
387 }
388 let r: Result<CacheObject, GitError> =
389 Pack::decode_pack_object(&mut reader, &mut offset);
390 match r {
391 Ok(mut obj) => {
392 obj.set_mem_recorder(self.cache_objs_mem.clone());
393 obj.record_mem_size();
394
395 let params = Arc::new(SharedParams {
397 pool: self.pool.clone(),
398 waitlist: self.waitlist.clone(),
399 caches: self.caches.clone(),
400 cache_objs_mem_size: self.cache_objs_mem.clone(),
401 callback: callback.clone(),
402 });
403
404 let caches = caches.clone();
405 let waitlist = self.waitlist.clone();
406 self.pool.execute(move || {
407 match obj.info {
408 CacheObjectInfo::BaseObject(_, _) => {
409 Self::cache_obj_and_process_waitlist(params, obj);
410 }
411 CacheObjectInfo::OffsetDelta(base_offset, _)
412 | CacheObjectInfo::OffsetZstdelta(base_offset, _) => {
413 if let Some(base_obj) = caches.get_by_offset(base_offset) {
414 Self::process_delta(params, obj, base_obj);
415 } else {
416 waitlist.insert_offset(base_offset, obj);
419 if let Some(base_obj) = caches.get_by_offset(base_offset) {
421 Self::process_waitlist(params, base_obj);
422 }
423 }
424 }
425 CacheObjectInfo::HashDelta(base_ref, _) => {
426 if let Some(base_obj) = caches.get_by_hash(base_ref) {
427 Self::process_delta(params, obj, base_obj);
428 } else {
429 waitlist.insert_ref(base_ref, obj);
430 if let Some(base_obj) = caches.get_by_hash(base_ref) {
431 Self::process_waitlist(params, base_obj);
432 }
433 }
434 }
435 }
436 });
437 }
438 Err(e) => {
439 return Err(e);
440 }
441 }
442 i += 1;
443 }
444 log_info(i, self);
445 let render_hash = reader.final_hash();
446 self.signature = SHA1::from_stream(&mut reader).unwrap();
447
448 if render_hash != self.signature {
449 return Err(GitError::InvalidPackFile(format!(
450 "The pack file hash {} does not match the trailer hash {}",
451 render_hash, self.signature
452 )));
453 }
454
455 let end = utils::is_eof(&mut reader);
456 if !end {
457 return Err(GitError::InvalidPackFile(
458 "The pack file is not at the end".to_string(),
459 ));
460 }
461
462 self.pool.join(); assert_eq!(self.waitlist.map_offset.len(), 0);
466 assert_eq!(self.waitlist.map_ref.len(), 0);
467 assert_eq!(self.number, caches.total_inserted());
468 tracing::info!(
469 "The pack file has been decoded successfully, takes: [ {:?} ]",
470 time.elapsed()
471 );
472 self.caches.clear(); assert_eq!(self.cache_objs_mem_used(), 0); Ok(())
481 }
482
483 pub fn decode_async(
486 mut self,
487 mut pack: impl BufRead + Send + 'static,
488 sender: UnboundedSender<Entry>,
489 ) -> JoinHandle<Pack> {
490 thread::spawn(move || {
491 self.decode(&mut pack, move |entry, _| {
492 if let Err(e) = sender.send(entry) {
493 eprintln!("Channel full, failed to send entry: {e:?}");
494 }
495 })
496 .unwrap();
497 self
498 })
499 }
500
501 pub async fn decode_stream(
503 mut self,
504 mut stream: impl Stream<Item = Result<Bytes, Error>> + Unpin + Send + 'static,
505 sender: UnboundedSender<Entry>,
506 ) -> Self {
507 let (tx, rx) = std::sync::mpsc::channel();
508 let mut reader = StreamBufReader::new(rx);
509 tokio::spawn(async move {
510 while let Some(chunk) = stream.next().await {
511 let data = chunk.unwrap().to_vec();
512 if let Err(e) = tx.send(data) {
513 eprintln!("Sending Error: {e:?}");
514 break;
515 }
516 }
517 });
518 tokio::task::spawn_blocking(move || {
521 self.decode(&mut reader, move |entry: Entry, _| {
522 if let Err(e) = sender.send(entry) {
524 eprintln!("unbound channel Sending Error: {e:?}");
525 }
526 })
527 .unwrap();
528 self
529 })
530 .await
531 .unwrap()
532 }
533
534 fn memory_used(&self) -> usize {
536 self.cache_objs_mem_used() + self.caches.memory_used_index()
537 }
538
539 fn cache_objs_mem_used(&self) -> usize {
541 self.cache_objs_mem.load(Ordering::Acquire)
542 }
543
544 fn process_delta(
547 shared_params: Arc<SharedParams>,
548 delta_obj: CacheObject,
549 base_obj: Arc<CacheObject>,
550 ) {
551 shared_params.pool.clone().execute(move || {
552 let mut new_obj = match delta_obj.info {
553 CacheObjectInfo::OffsetDelta(_, _) | CacheObjectInfo::HashDelta(_, _) => {
554 Pack::rebuild_delta(delta_obj, base_obj)
555 }
556 CacheObjectInfo::OffsetZstdelta(_, _) => {
557 Pack::rebuild_zstdelta(delta_obj, base_obj)
558 }
559 _ => unreachable!(),
560 };
561
562 new_obj.set_mem_recorder(shared_params.cache_objs_mem_size.clone());
563 new_obj.record_mem_size();
564 Self::cache_obj_and_process_waitlist(shared_params, new_obj); });
566 }
567
568 fn cache_obj_and_process_waitlist(shared_params: Arc<SharedParams>, new_obj: CacheObject) {
570 (shared_params.callback)(new_obj.to_entry(), new_obj.offset);
571 let new_obj = shared_params.caches.insert(
572 new_obj.offset,
573 new_obj.base_object_hash().unwrap(),
574 new_obj,
575 );
576 Self::process_waitlist(shared_params, new_obj);
577 }
578
579 fn process_waitlist(shared_params: Arc<SharedParams>, base_obj: Arc<CacheObject>) {
580 let wait_objs = shared_params
581 .waitlist
582 .take(base_obj.offset, base_obj.base_object_hash().unwrap());
583 for obj in wait_objs {
584 Self::process_delta(shared_params.clone(), obj, base_obj.clone());
586 }
587 }
588
589 pub fn rebuild_delta(delta_obj: CacheObject, base_obj: Arc<CacheObject>) -> CacheObject {
592 const COPY_INSTRUCTION_FLAG: u8 = 1 << 7;
593 const COPY_OFFSET_BYTES: u8 = 4;
594 const COPY_SIZE_BYTES: u8 = 3;
595 const COPY_ZERO_SIZE: usize = 0x10000;
596
597 let mut stream = Cursor::new(&delta_obj.data_decompressed);
598
599 let (base_size, result_size) = utils::read_delta_object_size(&mut stream).unwrap();
602
603 let base_info = &base_obj.data_decompressed;
605 assert_eq!(base_info.len(), base_size, "Base object size mismatch");
606
607 let mut result = Vec::with_capacity(result_size);
608
609 loop {
610 let instruction = match utils::read_bytes(&mut stream) {
612 Ok([instruction]) => instruction,
613 Err(err) if err.kind() == ErrorKind::UnexpectedEof => break,
614 Err(err) => {
615 panic!(
616 "{}",
617 GitError::DeltaObjectError(format!("Wrong instruction in delta :{err}"))
618 );
619 }
620 };
621
622 if instruction & COPY_INSTRUCTION_FLAG == 0 {
623 if instruction == 0 {
625 panic!(
627 "{}",
628 GitError::DeltaObjectError(String::from("Invalid data instruction"))
629 );
630 }
631
632 let mut data = vec![0; instruction as usize];
634 stream.read_exact(&mut data).unwrap();
635 result.extend_from_slice(&data);
636 } else {
637 let mut nonzero_bytes = instruction;
642 let offset =
643 utils::read_partial_int(&mut stream, COPY_OFFSET_BYTES, &mut nonzero_bytes)
644 .unwrap();
645 let mut size =
646 utils::read_partial_int(&mut stream, COPY_SIZE_BYTES, &mut nonzero_bytes)
647 .unwrap();
648 if size == 0 {
649 size = COPY_ZERO_SIZE;
651 }
652 let base_data = base_info.get(offset..(offset + size)).ok_or_else(|| {
654 GitError::DeltaObjectError("Invalid copy instruction".to_string())
655 });
656
657 match base_data {
658 Ok(data) => result.extend_from_slice(data),
659 Err(e) => panic!("{}", e),
660 }
661 }
662 }
663 assert_eq!(result_size, result.len(), "Result size mismatch");
664
665 let hash = utils::calculate_object_hash(base_obj.object_type(), &result);
666 CacheObject {
668 info: CacheObjectInfo::BaseObject(base_obj.object_type(), hash),
669 offset: delta_obj.offset,
670 data_decompressed: result,
671 mem_recorder: None,
672 } }
675 pub fn rebuild_zstdelta(delta_obj: CacheObject, base_obj: Arc<CacheObject>) -> CacheObject {
676 let result = zstdelta::apply(&base_obj.data_decompressed, &delta_obj.data_decompressed)
677 .expect("Failed to apply zstdelta");
678 let hash = utils::calculate_object_hash(base_obj.object_type(), &result);
679 CacheObject {
680 info: CacheObjectInfo::BaseObject(base_obj.object_type(), hash),
681 offset: delta_obj.offset,
682 data_decompressed: result,
683 mem_recorder: None,
684 } }
687}
688
689#[cfg(test)]
690mod tests {
691 use std::fs;
692 use std::io::BufReader;
693 use std::io::Cursor;
694 use std::io::prelude::*;
695 use std::sync::Arc;
696 use std::sync::atomic::{AtomicUsize, Ordering};
697 use std::{env, path::PathBuf};
698
699 use flate2::Compression;
700 use flate2::write::ZlibEncoder;
701 use tokio_util::io::ReaderStream;
702
703 use crate::internal::pack::Pack;
704 use crate::internal::pack::tests::init_logger;
705 use futures_util::TryStreamExt;
706
707 #[tokio::test]
708 async fn test_pack_check_header() {
709 let res = crate::test_utils::setup_lfs_file().await;
710 println!("{res:?}");
711 let source = res
712 .get("git-2d187177923cd618a75da6c6db45bb89d92bd504.pack")
713 .unwrap();
714
715 let f = fs::File::open(source).unwrap();
716 let mut buf_reader = BufReader::new(f);
717 let (object_num, _) = Pack::check_header(&mut buf_reader).unwrap();
718
719 assert_eq!(object_num, 358109);
720 }
721
722 #[test]
723 fn test_decompress_data() {
724 let data = b"Hello, world!"; let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
726 encoder.write_all(data).unwrap();
727 let compressed_data = encoder.finish().unwrap();
728 let compressed_size = compressed_data.len();
729
730 let mut cursor: Cursor<Vec<u8>> = Cursor::new(compressed_data);
732 let expected_size = data.len();
733
734 let result = Pack::decompress_data(&mut cursor, expected_size);
736 match result {
737 Ok((decompressed_data, bytes_read)) => {
738 assert_eq!(bytes_read, compressed_size);
739 assert_eq!(decompressed_data, data);
740 }
741 Err(e) => panic!("Decompression failed: {e:?}"),
742 }
743 }
744
745 #[test]
746 fn test_pack_decode_without_delta() {
747 let mut source = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
748 source.push("tests/data/packs/pack-1d0e6c14760c956c173ede71cb28f33d921e232f.pack");
749
750 let tmp = PathBuf::from("/tmp/.cache_temp");
751
752 let f = fs::File::open(source).unwrap();
753 let mut buffered = BufReader::new(f);
754 let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
755 p.decode(&mut buffered, |_, _| {}).unwrap();
756 }
757
758 #[test]
759 fn test_pack_decode_with_ref_delta() {
761 init_logger();
762
763 let mut source = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
764 source.push("tests/data/packs/ref-delta-65d47638aa7cb7c39f1bd1d5011a415439b887a8.pack");
765
766 let tmp = PathBuf::from("/tmp/.cache_temp");
767
768 let f = fs::File::open(source).unwrap();
769 let mut buffered = BufReader::new(f);
770 let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
771 p.decode(&mut buffered, |_, _| {}).unwrap();
772 }
773
774 #[test]
775 fn test_pack_decode_no_mem_limit() {
776 let mut source = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
777 source.push("tests/data/packs/pack-1d0e6c14760c956c173ede71cb28f33d921e232f.pack");
778
779 let tmp = PathBuf::from("/tmp/.cache_temp");
780
781 let f = fs::File::open(source).unwrap();
782 let mut buffered = BufReader::new(f);
783 let mut p = Pack::new(None, None, Some(tmp), true);
784 p.decode(&mut buffered, |_, _| {}).unwrap();
785 }
786
787 #[tokio::test]
788 #[ignore] async fn test_pack_decode_with_large_file_with_delta_without_ref() {
790 init_logger();
791 let file_map = crate::test_utils::setup_lfs_file().await;
792 let source = file_map
793 .get("git-2d187177923cd618a75da6c6db45bb89d92bd504.pack")
794 .unwrap();
795
796 let tmp = PathBuf::from("/tmp/.cache_temp");
797
798 let f = fs::File::open(source).unwrap();
799 let mut buffered = BufReader::new(f);
800 let mut p = Pack::new(
801 Some(20),
802 Some(1024 * 1024 * 1024 * 2),
803 Some(tmp.clone()),
804 true,
805 );
806 let rt = p.decode(&mut buffered, |_obj, _offset| {
807 });
809 if let Err(e) = rt {
810 fs::remove_dir_all(tmp).unwrap();
811 panic!("Error: {e:?}");
812 }
813 } #[tokio::test]
816 async fn test_decode_large_file_stream() {
817 init_logger();
818 let file_map = crate::test_utils::setup_lfs_file().await;
819 let source = file_map
820 .get("git-2d187177923cd618a75da6c6db45bb89d92bd504.pack")
821 .unwrap();
822
823 let tmp = PathBuf::from("/tmp/.cache_temp");
824 let f = tokio::fs::File::open(source).await.unwrap();
825 let stream = ReaderStream::new(f).map_err(axum::Error::new);
826 let p = Pack::new(
827 Some(20),
828 Some(1024 * 1024 * 1024 * 4),
829 Some(tmp.clone()),
830 true,
831 );
832
833 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
834 let handle = tokio::spawn(async move { p.decode_stream(stream, tx).await });
835 let count = Arc::new(AtomicUsize::new(0));
836 let count_c = count.clone();
837 let consume = tokio::spawn(async move {
839 let mut cnt = 0;
840 while let Some(_entry) = rx.recv().await {
841 cnt += 1;
842 }
843 tracing::info!("Received: {}", cnt);
844 count_c.store(cnt, Ordering::Release);
845 });
846 let p = handle.await.unwrap();
847 consume.await.unwrap();
848 assert_eq!(count.load(Ordering::Acquire), p.number);
849 assert_eq!(p.number, 358109);
850 }
851
852 #[tokio::test]
853 #[ignore] async fn test_decode_large_file_async() {
855 let file_map = crate::test_utils::setup_lfs_file().await;
856 let source = file_map
857 .get("git-2d187177923cd618a75da6c6db45bb89d92bd504.pack")
858 .unwrap();
859
860 let tmp = PathBuf::from("/tmp/.cache_temp");
861 let f = fs::File::open(source).unwrap();
862 let buffered = BufReader::new(f);
863 let p = Pack::new(
864 Some(20),
865 Some(1024 * 1024 * 1024 * 2),
866 Some(tmp.clone()),
867 true,
868 );
869
870 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
871 let handle = p.decode_async(buffered, tx); let mut cnt = 0;
873 while let Ok(_entry) = rx.try_recv() {
874 cnt += 1; }
876 let p = handle.join().unwrap();
877 assert_eq!(cnt, p.number);
878 }
879
880 #[test]
881 fn test_pack_decode_with_delta_without_ref() {
882 let mut source = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
883 source.push("tests/data/packs/pack-d50df695086eea6253a237cb5ac44af1629e7ced.pack");
884
885 let tmp = PathBuf::from("/tmp/.cache_temp");
886
887 let f = fs::File::open(source).unwrap();
888 let mut buffered = BufReader::new(f);
889 let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
890 p.decode(&mut buffered, |_, _| {}).unwrap();
891 }
892
893 #[test]
894 #[ignore] fn test_pack_decode_multi_task_with_large_file_with_delta_without_ref() {
896 let task1 = std::thread::spawn(|| {
897 test_pack_decode_with_large_file_with_delta_without_ref();
898 });
899 let task2 = std::thread::spawn(|| {
900 test_pack_decode_with_large_file_with_delta_without_ref();
901 });
902
903 task1.join().unwrap();
904 task2.join().unwrap();
905 }
906}