1use std::{
5 cmp::Ordering,
6 collections::VecDeque,
7 hash::{Hash, Hasher},
8 io::Write,
9 path::{Path, PathBuf},
10};
11
12use ahash::AHasher;
13use chrono::Utc;
15use flate2::write::ZlibEncoder;
16use natord::compare;
17use rayon::prelude::*;
18use tokio::io::AsyncWriteExt as TokioAsyncWriteExt;
20use tokio::{fs::File, sync::mpsc, task::JoinHandle};
21
22use crate::delta;
24use crate::{
25 errors::GitError,
26 hash::ObjectHash,
27 internal::{
28 metadata::{EntryMeta, MetaAttached},
29 object::types::ObjectType,
30 pack::{entry::Entry, index_entry::IndexEntry, pack_index::IdxBuilder},
31 },
32 time_it,
33 utils::HashAlgorithm,
34 zstdelta,
35};
36
37const MAX_CHAIN_LEN: usize = 50;
38const MIN_DELTA_RATE: f64 = 0.5; pub struct PackEncoder {
43 object_number: usize,
45 process_index: usize,
46 window_size: usize,
47 pack_sender: Option<mpsc::Sender<Vec<u8>>>,
49 idx_sender: Option<mpsc::Sender<Vec<u8>>>,
50 idx_entries: Option<Vec<IndexEntry>>,
52 inner_offset: usize, inner_hash: HashAlgorithm, final_hash: Option<ObjectHash>,
55 start_encoding: bool,
56}
57
58pub async fn encode_and_output_to_files(
69 raw_entries_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
70 object_number: usize,
71 output_dir: PathBuf,
72 window_size: usize,
73) -> Result<(), GitError> {
74 let (pack_tx, mut pack_rx) = mpsc::channel(1024);
75 let (idx_tx, mut idx_rx) = mpsc::channel(1024);
76 let mut pack_encoder = PackEncoder::new_with_idx(object_number, window_size, pack_tx, idx_tx);
77
78 let now = Utc::now();
80 let timestamp = now.format("%Y%m%d%H%M%S%.3f").to_string(); let tmp_path = output_dir.join(format!("{}objects.pack.tmp", timestamp));
82 let mut pack_file = File::create(&tmp_path).await?;
83
84 let pack_writer = tokio::spawn(async move {
85 while let Some(chunk) = pack_rx.recv().await {
86 TokioAsyncWriteExt::write_all(&mut pack_file, &chunk).await?;
87 }
88 TokioAsyncWriteExt::flush(&mut pack_file).await?;
90 Ok::<(), GitError>(())
91 });
92
93 pack_encoder.encode(raw_entries_rx).await?;
94
95 let pack_write_result = pack_writer
97 .await
98 .map_err(|e| GitError::PackEncodeError(format!("pack writer task join error: {e}")))?;
99 pack_write_result?;
100
101 let final_pack_name =
102 output_dir.join(format!("pack-{}.pack", pack_encoder.final_hash.unwrap()));
103 let final_idx_name = output_dir.join(format!("pack-{}.idx", pack_encoder.final_hash.unwrap()));
104 tokio::fs::rename(tmp_path, &final_pack_name).await?;
105
106 let mut idx_file = File::create(&final_idx_name).await?;
107 let idx_writer = tokio::spawn(async move {
108 while let Some(chunk) = idx_rx.recv().await {
109 TokioAsyncWriteExt::write_all(&mut idx_file, &chunk).await?;
111 }
112 TokioAsyncWriteExt::flush(&mut idx_file).await?;
114 Ok::<(), GitError>(())
115 });
116
117 pack_encoder.encode_idx_file().await?;
119
120 let idx_write_result = idx_writer
121 .await
122 .map_err(|e| GitError::PackEncodeError(format!("idx writer task join error: {e}")))?;
123 idx_write_result?;
124
125 Ok(())
126}
127
128fn encode_header(object_number: usize) -> Vec<u8> {
131 let mut result: Vec<u8> = vec![
132 b'P', b'A', b'C', b'K', 0, 0, 0, 2, ];
135 assert_ne!(object_number, 0); assert!(object_number <= u32::MAX as usize);
137 result.append((object_number as u32).to_be_bytes().to_vec().as_mut()); result
140}
141
142fn encode_offset(mut value: usize) -> Vec<u8> {
144 assert_ne!(value, 0, "offset can't be zero");
145 let mut bytes = Vec::new();
146
147 bytes.push((value & 0x7F) as u8);
148 value >>= 7;
149 while value != 0 {
150 value -= 1;
151 let byte = (value & 0x7F) as u8 | 0x80; value >>= 7;
153 bytes.push(byte);
154 }
155 bytes.reverse();
156 bytes
157}
158
159fn encode_one_object(entry: &Entry, offset: Option<usize>) -> Result<Vec<u8>, GitError> {
162 let obj_data = &entry.data;
164 let obj_data_len = obj_data.len();
165 let obj_type_number = entry.obj_type.to_pack_type_u8()?;
166
167 let mut encoded_data = Vec::new();
168
169 let mut header_data = vec![(0x80 | (obj_type_number << 4)) + (obj_data_len & 0x0f) as u8];
171 let mut size = obj_data_len >> 4; if size > 0 {
173 while size > 0 {
174 if size >> 7 > 0 {
175 header_data.push((0x80 | size) as u8);
176 size >>= 7;
177 } else {
178 header_data.push(size as u8);
179 break;
180 }
181 }
182 } else {
183 header_data.push(0);
184 }
185 encoded_data.extend(header_data);
186
187 if entry.obj_type == ObjectType::OffsetDelta || entry.obj_type == ObjectType::OffsetZstdelta {
189 let offset_data = encode_offset(offset.unwrap());
190 encoded_data.extend(offset_data);
191 } else if entry.obj_type == ObjectType::HashDelta {
192 unreachable!("unsupported type")
193 }
194
195 let mut inflate = ZlibEncoder::new(Vec::new(), flate2::Compression::default());
197 inflate
198 .write_all(obj_data)
199 .expect("zlib compress should never failed");
200 inflate.flush().expect("zlib flush should never failed");
201 let compressed_data = inflate.finish().expect("zlib compress should never failed");
202 encoded_data.extend(compressed_data);
204 Ok(encoded_data)
205}
206
207fn magic_sort(a: &MetaAttached<Entry, EntryMeta>, b: &MetaAttached<Entry, EntryMeta>) -> Ordering {
209 let path_a = a.meta.file_path.as_ref();
210 let path_b = b.meta.file_path.as_ref();
211
212 match (path_a, path_b) {
214 (Some(pa), Some(pb)) => {
215 let pa = Path::new(pa);
216 let pb = Path::new(pb);
217
218 let dir_ord = pa.parent().cmp(&pb.parent());
220 if dir_ord != Ordering::Equal {
221 return dir_ord;
222 }
223
224 let name_a = pa.file_name().unwrap_or_default().to_string_lossy();
226 let name_b = pb.file_name().unwrap_or_default().to_string_lossy();
227 let name_ord = compare(&name_a, &name_b);
228 if name_ord != Ordering::Equal {
229 return name_ord;
230 }
231 }
232 (Some(_), None) => return Ordering::Less, (None, Some(_)) => return Ordering::Greater, (None, None) => {}
235 }
236
237 let ord = b.inner.data.len().cmp(&a.inner.data.len());
238 if ord != Ordering::Equal {
239 return ord;
240 }
241
242 (a as *const MetaAttached<Entry, EntryMeta>).cmp(&(b as *const MetaAttached<Entry, EntryMeta>))
244}
245
246fn calc_hash(data: &[u8]) -> u64 {
248 let mut hasher = AHasher::default();
249 data.hash(&mut hasher);
250 hasher.finish()
251}
252
253fn cheap_similar(a: &[u8], b: &[u8]) -> bool {
255 let k = a.len().min(b.len()).min(128);
256 if k == 0 {
257 return false;
258 }
259 calc_hash(&a[..k]) == calc_hash(&b[..k])
260}
261
262impl PackEncoder {
263 pub fn new(object_number: usize, window_size: usize, sender: mpsc::Sender<Vec<u8>>) -> Self {
264 PackEncoder {
265 object_number,
266 window_size,
267 process_index: 0,
268 pack_sender: Some(sender),
270 idx_sender: None,
271 idx_entries: None,
272 inner_offset: 12, inner_hash: HashAlgorithm::new(), final_hash: None,
275 start_encoding: false,
276 }
277 }
278
279 pub fn new_with_idx(
280 object_number: usize,
281 window_size: usize,
282 pack_sender: mpsc::Sender<Vec<u8>>,
283 idx_sender: mpsc::Sender<Vec<u8>>,
284 ) -> Self {
285 PackEncoder {
286 object_number,
288 window_size,
289 process_index: 0,
290 pack_sender: Some(pack_sender),
292 idx_sender: Some(idx_sender),
293 idx_entries: None,
294 inner_offset: 12, inner_hash: HashAlgorithm::new(), final_hash: None,
297 start_encoding: false,
298 }
299 }
300
301 pub fn drop_sender(&mut self) {
302 self.pack_sender.take(); }
304
305 pub async fn send_data(&mut self, data: Vec<u8>) {
306 if let Some(sender) = &self.pack_sender {
307 sender.send(data).await.unwrap();
308 }
309 }
310
311 pub fn get_hash(&self) -> Option<ObjectHash> {
313 self.final_hash
314 }
315
316 pub async fn encode(
324 &mut self,
325 entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
326 ) -> Result<(), GitError> {
327 if self.window_size == 0 {
329 self.parallel_encode(entry_rx).await
330 } else {
331 self.inner_encode(entry_rx, false).await
332 }
333 }
334
335 pub async fn encode_with_zstdelta(
337 &mut self,
338 entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
339 ) -> Result<(), GitError> {
340 self.inner_encode(entry_rx, true).await
341 }
342
343 async fn inner_encode(
346 &mut self,
347 mut entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
348 enable_zstdelta: bool,
349 ) -> Result<(), GitError> {
350 let head = encode_header(self.object_number);
351 self.send_data(head.clone()).await;
352 self.inner_hash.update(&head);
353
354 if self.start_encoding {
356 return Err(GitError::PackEncodeError(
357 "encoding operation is already in progress".to_string(),
358 ));
359 }
360
361 let mut commits: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
362 let mut trees: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
363 let mut blobs: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
364 let mut tags: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
365 while let Some(entry) = entry_rx.recv().await {
366 match entry.inner.obj_type {
367 ObjectType::Commit => {
368 commits.push(entry);
369 }
370 ObjectType::Tree => {
371 trees.push(entry);
372 }
373 ObjectType::Blob => {
374 blobs.push(entry);
375 }
376 ObjectType::Tag => {
377 tags.push(entry);
378 }
379 _ => {
380 return Err(GitError::PackEncodeError(format!(
381 "object type `{}` is not supported by delta-window pack encoding",
382 entry.inner.obj_type
383 )));
384 }
385 }
386 }
387
388 commits.sort_by(magic_sort);
389 trees.sort_by(magic_sort);
390 blobs.sort_by(magic_sort);
391 tags.sort_by(magic_sort);
392 tracing::info!(
393 "numbers : commits: {:?} trees: {:?} blobs:{:?} tag :{:?}",
394 commits.len(),
395 trees.len(),
396 blobs.len(),
397 tags.len()
398 );
399
400 let (commit_results, tree_results, blob_results, tag_results) = tokio::try_join!(
402 tokio::task::spawn_blocking(move || {
403 Self::try_as_offset_delta(
404 commits
405 .into_iter()
406 .map(|entry_with_meta| entry_with_meta.inner)
407 .collect(),
408 10,
409 enable_zstdelta,
410 )
411 }),
412 tokio::task::spawn_blocking(move || {
413 Self::try_as_offset_delta(
414 trees
415 .into_iter()
416 .map(|entry_with_meta| entry_with_meta.inner)
417 .collect(),
418 10,
419 enable_zstdelta,
420 )
421 }),
422 tokio::task::spawn_blocking(move || {
423 Self::try_as_offset_delta(
424 blobs
425 .into_iter()
426 .map(|entry_with_meta| entry_with_meta.inner)
427 .collect(),
428 10,
429 enable_zstdelta,
430 )
431 }),
432 tokio::task::spawn_blocking(move || {
433 Self::try_as_offset_delta(
434 tags.into_iter()
435 .map(|entry_with_meta| entry_with_meta.inner)
436 .collect(),
437 10,
438 enable_zstdelta,
439 )
440 }),
441 )
442 .map_err(|e| GitError::PackEncodeError(format!("Task join error: {e}")))?;
443
444 let commit_res = commit_results?;
445 let tree_res = tree_results?;
446 let blob_res = blob_results?;
447 let tag_res = tag_results?;
448
449 let mut all_res = vec![commit_res, tree_res, blob_res, tag_res];
450
451 let mut idx_entries = Vec::new();
452 for res in &mut all_res {
453 for data in res {
454 data.1.offset = self.inner_offset as u64;
455 self.write_all_and_update(&data.0).await;
456 idx_entries.push(data.1.clone());
457 }
458 }
459
460 self.idx_entries = Some(idx_entries);
461
462 let hash_result = self.inner_hash.clone().finalize();
464 self.final_hash = Some(ObjectHash::from_bytes(&hash_result).unwrap());
465 self.send_data(hash_result.to_vec()).await;
466
467 self.drop_sender();
468 Ok(())
469 }
470
471 fn try_as_offset_delta(
479 mut bucket: Vec<Entry>,
480 window_size: usize,
481 enable_zstdelta: bool,
482 ) -> Result<Vec<(Vec<u8>, IndexEntry)>, GitError> {
483 let mut current_offset = 0usize;
484 let mut window: VecDeque<(Entry, usize)> = VecDeque::with_capacity(window_size);
485 let mut res: Vec<(Vec<u8>, IndexEntry)> = Vec::new();
486 for entry in bucket.iter_mut() {
489 let mut best_base: Option<&(Entry, usize)> = None;
492 let mut best_rate: f64 = 0.0;
493 let tie_epsilon: f64 = 0.15;
494
495 let candidates: Vec<_> = window
496 .par_iter()
497 .with_min_len(3)
498 .filter_map(|try_base| {
499 if try_base.0.obj_type != entry.obj_type {
500 return None;
501 }
502
503 if try_base.0.chain_len >= MAX_CHAIN_LEN {
504 return None;
505 }
506
507 if try_base.0.hash == entry.hash {
508 return None;
509 }
510
511 let sym_ratio = (try_base.0.data.len().min(entry.data.len()) as f64)
512 / (try_base.0.data.len().max(entry.data.len()) as f64);
513 if sym_ratio < 0.5 {
514 return None;
515 }
516
517 if !cheap_similar(&try_base.0.data, &entry.data) {
518 return None;
519 }
520
521 let rate = if (try_base.0.data.len() + entry.data.len()) / 2 > 64 {
522 delta::heuristic_encode_rate_parallel(&try_base.0.data, &entry.data)
523 } else {
524 delta::encode_rate(&try_base.0.data, &entry.data)
525 };
528
529 if rate > MIN_DELTA_RATE {
530 Some((rate, try_base))
531 } else {
532 None
533 }
534 })
535 .collect();
536
537 for (rate, try_base) in candidates {
538 match best_base {
539 None => {
540 best_rate = rate;
541 best_base = Some(try_base);
543 }
544 Some(best_base_ref) => {
545 let is_better = if rate > best_rate + tie_epsilon {
546 true
547 } else if (rate - best_rate).abs() <= tie_epsilon {
548 try_base.0.chain_len > best_base_ref.0.chain_len
549 } else {
550 false
551 };
552
553 if is_better {
554 best_rate = rate;
555 best_base = Some(try_base);
556 }
557 }
558 }
559 }
560
561 let mut entry_for_window = entry.clone();
562
563 let offset = best_base.map(|best_base| {
564 let delta = if enable_zstdelta {
565 entry.obj_type = ObjectType::OffsetZstdelta;
566 zstdelta::diff(&best_base.0.data, &entry.data)
567 .map_err(|e| {
568 GitError::DeltaObjectError(format!("zstdelta diff failed: {e}"))
569 })
570 .unwrap()
571 } else {
572 entry.obj_type = ObjectType::OffsetDelta;
573 delta::encode(&best_base.0.data, &entry.data)
574 };
575 entry.data = delta;
577 entry.chain_len = best_base.0.chain_len + 1;
578 current_offset - best_base.1
579 });
580
581 entry_for_window.chain_len = entry.chain_len;
582 let obj_data = encode_one_object(entry, offset)?;
583 window.push_back((entry_for_window, current_offset));
584 if window.len() > window_size {
585 window.pop_front();
586 }
587 res.push((obj_data.clone(), IndexEntry::new(entry, 0)));
588 current_offset += obj_data.len();
589 }
590 Ok(res)
591 }
592
593 pub async fn parallel_encode(
595 &mut self,
596 mut entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
597 ) -> Result<(), GitError> {
598 if self.window_size != 0 {
599 return Err(GitError::PackEncodeError(
600 "parallel encode only works when window_size == 0".to_string(),
601 ));
602 }
603
604 let head = encode_header(self.object_number);
605 self.send_data(head.clone()).await;
606 self.inner_hash.update(&head);
607
608 if self.start_encoding {
610 return Err(GitError::PackEncodeError(
611 "encoding operation is already in progress".to_string(),
612 ));
613 }
614
615 let mut idx_entries = Vec::new();
616 let batch_size = usize::max(1000, entry_rx.max_capacity() / 10); tracing::info!("encode with batch size: {}", batch_size);
618 loop {
619 let mut batch_entries = Vec::with_capacity(batch_size);
620 time_it!("parallel encode: receive batch", {
621 for _ in 0..batch_size {
622 match entry_rx.recv().await {
623 Some(entry) => {
624 batch_entries.push(entry.inner);
625 self.process_index += 1;
626 }
627 None => break,
628 }
629 }
630 });
631
632 if batch_entries.is_empty() {
633 break;
634 }
635
636 let batch_result: Vec<Result<(Vec<u8>, IndexEntry), GitError>> =
638 time_it!("parallel encode: encode batch", {
639 batch_entries
640 .par_iter()
641 .map(|entry| {
642 encode_one_object(entry, None)
643 .map(|encoded| (encoded, IndexEntry::new(entry, 0)))
644 })
645 .collect()
646 });
647
648 time_it!("parallel encode: write batch", {
649 for obj_data in batch_result {
650 let mut obj_data = obj_data?;
651 obj_data.1.offset = self.inner_offset as u64;
652 self.write_all_and_update(&obj_data.0).await;
653 idx_entries.push(obj_data.1);
654 }
655 });
656 }
657
658 tracing::debug!("parallel encode idx entries: {:?}", idx_entries.len());
659 if self.process_index != self.object_number {
660 panic!(
661 "not all objects are encoded, process:{}, total:{}",
662 self.process_index, self.object_number
663 );
664 }
665
666 let hash_result = self.inner_hash.clone().finalize();
668 self.final_hash = Some(ObjectHash::from_bytes(&hash_result).unwrap());
669 self.send_data(hash_result.to_vec()).await;
670 self.drop_sender();
671
672 self.idx_entries = Some(idx_entries);
673 Ok(())
674 }
675
676 async fn write_all_and_update(&mut self, data: &[u8]) {
678 self.inner_hash.update(data);
679 self.inner_offset += data.len();
680 self.send_data(data.to_vec()).await;
681 }
682
683 async fn generate_idx_file(&mut self) -> Result<(), GitError> {
684 let final_hash = self.final_hash
685 .ok_or(GitError::PackEncodeError("final_hash is missing,The pack file must be generated before the index file is produced.".into()))?;
686 let idx_entries = self.idx_entries.clone().ok_or(GitError::PackEncodeError(
687 "The pack file must be generated before the index file is produced.".into(),
688 ))?;
689 let mut idx_builder = IdxBuilder::new(
690 self.object_number,
691 self.idx_sender.clone().unwrap(),
692 final_hash,
693 );
694 idx_builder.write_idx(idx_entries).await?;
695 Ok(())
696 }
697
698 pub async fn encode_async(
704 mut self,
705 rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
706 ) -> Result<JoinHandle<()>, GitError> {
707 Ok(tokio::spawn(async move {
708 if self.window_size == 0 {
709 self.parallel_encode(rx).await.unwrap()
710 } else {
711 self.encode(rx).await.unwrap()
712 }
713 }))
714 }
715
716 pub async fn encode_async_with_zstdelta(
718 mut self,
719 rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
720 ) -> Result<JoinHandle<()>, GitError> {
721 Ok(tokio::spawn(async move {
722 self.encode_with_zstdelta(rx).await.unwrap()
724 }))
725 }
726
727 pub async fn encode_idx_file(&mut self) -> Result<(), GitError> {
729 if self.idx_sender.is_none() {
730 return Err(GitError::PackEncodeError(String::from(
731 "idx sender is none",
732 )));
733 }
734 self.generate_idx_file().await?;
735 self.idx_sender.take();
737 Ok(())
738 }
739}
740
741#[cfg(test)]
742mod tests {
743 use std::{env, io::Cursor, path::PathBuf, sync::Arc, time::Instant};
744
745 use tempfile::tempdir;
746 use tokio::sync::Mutex;
747
748 use super::*;
749 use crate::{
750 hash::{HashKind, ObjectHash, set_hash_kind_for_test},
751 internal::{
752 object::{blob::Blob, types::ObjectType},
753 pack::{Pack, tests::init_logger, utils::read_offset_encoding},
754 },
755 time_it,
756 };
757
758 fn check_format(data: &Vec<u8>) {
760 let max_pack_size_u64 = if cfg!(target_pointer_width = "64") {
762 6u64 * 1024 * 1024 * 1024
763 } else {
764 2u64 * 1024 * 1024 * 1024
765 };
766 let max_pack_size = usize::try_from(max_pack_size_u64).unwrap_or_else(|_| {
767 panic!(
768 "internal assertion failed: pack size cap {} does not fit in usize on this \
769 target; this should be unreachable given the target_pointer_width configuration",
770 max_pack_size_u64
771 )
772 });
773 let mut p = Pack::new(
774 None,
775 Some(max_pack_size), Some(PathBuf::from("/tmp/.cache_temp")),
777 true,
778 );
779 let mut reader = Cursor::new(data);
780 tracing::debug!("start check format");
781 p.decode(&mut reader, |_| {}, None::<fn(ObjectHash)>)
782 .expect("pack file format error");
783 }
784
785 #[tokio::test]
786 async fn test_pack_encoder() {
787 let _guard = set_hash_kind_for_test(HashKind::Sha1);
788 async fn encode_once(window_size: usize) -> Vec<u8> {
789 let (tx, mut rx) = mpsc::channel(100);
790 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
791
792 let str_vec = vec!["hello, word", "hello, world.", "!", "123141251251"];
794 let encoder = PackEncoder::new(str_vec.len(), window_size, tx);
795 encoder.encode_async(entry_rx).await.unwrap();
796
797 for str in str_vec {
798 let blob = Blob::from_content(str);
799 let entry: Entry = blob.into();
800 entry_tx
801 .send(MetaAttached {
802 inner: entry,
803 meta: EntryMeta::new(),
804 })
805 .await
806 .unwrap();
807 }
808 drop(entry_tx);
809 let mut result = Vec::new();
811 while let Some(chunk) = rx.recv().await {
812 result.extend(chunk);
813 }
814 result
815 }
816
817 let pack_without_delta = encode_once(0).await;
819 let pack_without_delta_size = pack_without_delta.len();
820 check_format(&pack_without_delta);
821
822 let pack_with_delta = encode_once(4).await;
824 assert!(pack_with_delta.len() <= pack_without_delta_size);
825 check_format(&pack_with_delta);
826 }
827 #[tokio::test]
828 async fn test_pack_encoder_sha256() {
829 let _guard = set_hash_kind_for_test(HashKind::Sha256);
830
831 async fn encode_once(window_size: usize) -> Vec<u8> {
832 let (tx, mut rx) = mpsc::channel(100);
833 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
834
835 let str_vec = vec!["hello, word", "hello, world.", "!", "123141251251"];
836 let encoder = PackEncoder::new(str_vec.len(), window_size, tx);
837 encoder.encode_async(entry_rx).await.unwrap();
838
839 for s in str_vec {
840 let blob = Blob::from_content(s);
841 let entry: Entry = blob.into();
842 entry_tx
843 .send(MetaAttached {
844 inner: entry,
845 meta: EntryMeta::new(),
846 })
847 .await
848 .unwrap();
849 }
850 drop(entry_tx);
851
852 let mut result = Vec::new();
853 while let Some(chunk) = rx.recv().await {
854 result.extend(chunk);
855 }
856 result
857 }
858
859 let pack_without_delta = encode_once(0).await;
861 let pack_without_delta_size = pack_without_delta.len();
862 check_format(&pack_without_delta);
863
864 let pack_with_delta = encode_once(4).await;
866 assert!(pack_with_delta.len() <= pack_without_delta_size);
867 check_format(&pack_with_delta);
868 }
869
870 #[tokio::test]
871 async fn test_pack_encoder_rejects_unencodable_ai_type_parallel() {
872 let (tx, _rx) = mpsc::channel(8);
873 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
874 let mut encoder = PackEncoder::new(1, 0, tx);
875
876 let mut entry: Entry = Blob::from_content("ai").into();
877 entry.obj_type = ObjectType::Task;
878 entry_tx
879 .send(MetaAttached {
880 inner: entry,
881 meta: EntryMeta::new(),
882 })
883 .await
884 .expect("send entry");
885 drop(entry_tx);
886
887 let err = encoder
888 .encode(entry_rx)
889 .await
890 .expect_err("must reject AI pack type");
891 assert!(matches!(err, GitError::PackEncodeError(_)));
892 }
893
894 #[tokio::test]
895 async fn test_pack_encoder_rejects_unencodable_ai_type_delta_window() {
896 let (tx, _rx) = mpsc::channel(8);
897 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
898 let mut encoder = PackEncoder::new(1, 10, tx);
899
900 let mut entry: Entry = Blob::from_content("ai").into();
901 entry.obj_type = ObjectType::Task;
902 entry_tx
903 .send(MetaAttached {
904 inner: entry,
905 meta: EntryMeta::new(),
906 })
907 .await
908 .expect("send entry");
909 drop(entry_tx);
910
911 let err = encoder
912 .encode(entry_rx)
913 .await
914 .expect_err("must reject AI pack type");
915 assert!(matches!(err, GitError::PackEncodeError(_)));
916 }
917
918 async fn get_entries_for_test() -> Arc<Mutex<Vec<Entry>>> {
919 let source = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
920 .join("tests/data/packs/encode-test-sha1.pack");
921
922 let mut p = Pack::new(None, None, Some(PathBuf::from("/tmp/.cache_temp")), true);
923
924 let f = std::fs::File::open(&source).unwrap();
925 tracing::info!("pack file size: {}", f.metadata().unwrap().len());
926 let mut reader = std::io::BufReader::new(f);
927 let entries = Arc::new(Mutex::new(Vec::new()));
928 let entries_clone = entries.clone();
929 p.decode(
930 &mut reader,
931 move |entry| {
932 let mut entries = entries_clone.blocking_lock();
933 entries.push(entry.inner);
934 },
935 None::<fn(ObjectHash)>,
936 )
937 .unwrap();
938 assert_eq!(p.number, entries.lock().await.len());
939 tracing::info!("total entries: {}", p.number);
940 drop(p);
941
942 entries
943 }
944 async fn get_entries_for_test_sha256() -> Arc<Mutex<Vec<Entry>>> {
945 let source = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
946 .join("tests/data/packs/encode-test-sha256.pack");
947
948 let mut p = Pack::new(None, None, Some(PathBuf::from("/tmp/.cache_temp")), true);
949
950 let f = std::fs::File::open(&source).unwrap();
951 tracing::info!("pack file size: {}", f.metadata().unwrap().len());
952 let mut reader = std::io::BufReader::new(f);
953 let entries = Arc::new(Mutex::new(Vec::new()));
954 let entries_clone = entries.clone();
955 p.decode(
956 &mut reader,
957 move |entry| {
958 let mut entries = entries_clone.blocking_lock();
959 entries.push(entry.inner);
960 },
961 None::<fn(ObjectHash)>,
962 )
963 .unwrap();
964 assert_eq!(p.number, entries.lock().await.len());
965 tracing::info!("total entries: {}", p.number);
966 drop(p);
967
968 entries
969 }
970
971 #[tokio::test]
972 async fn test_pack_encoder_parallel_large_file() {
973 let _guard = set_hash_kind_for_test(HashKind::Sha1);
974 init_logger();
975
976 let start = Instant::now();
977 let entries = get_entries_for_test().await;
978 let entries_number = entries.lock().await.len();
979
980 let total_original_size: usize = entries
981 .lock()
982 .await
983 .iter()
984 .map(|entry| entry.data.len())
985 .sum();
986
987 let (tx, mut rx) = mpsc::channel(1_000_000);
989 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1_000_000);
990
991 let mut encoder = PackEncoder::new(entries_number, 0, tx);
992 tokio::spawn(async move {
993 time_it!("test parallel encode", {
994 encoder.parallel_encode(entry_rx).await.unwrap();
995 });
996 });
997
998 tokio::spawn(async move {
1000 let entries = entries.lock().await;
1001 for entry in entries.iter() {
1002 entry_tx
1003 .send(MetaAttached {
1004 inner: entry.clone(),
1005 meta: EntryMeta::new(),
1006 })
1007 .await
1008 .unwrap();
1009 }
1010 drop(entry_tx);
1011 tracing::info!("all entries sent");
1012 });
1013
1014 let mut result = Vec::new();
1015 while let Some(chunk) = rx.recv().await {
1016 result.extend(chunk);
1017 }
1018
1019 let pack_size = result.len();
1020 let compression_rate = if total_original_size > 0 {
1021 1.0 - (pack_size as f64 / total_original_size as f64)
1022 } else {
1023 0.0
1024 };
1025
1026 let duration = start.elapsed();
1027 tracing::info!("test executed in: {:.2?}", duration);
1028 tracing::info!("new pack file size: {}", result.len());
1029 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1030 check_format(&result);
1032 }
1033 #[tokio::test]
1034 async fn test_pack_encoder_parallel_large_file_sha256() {
1035 let _guard = set_hash_kind_for_test(HashKind::Sha256);
1036 init_logger();
1037
1038 let start = Instant::now();
1039 let entries = get_entries_for_test_sha256().await;
1041 let entries_number = entries.lock().await.len();
1042
1043 let total_original_size: usize = entries
1044 .lock()
1045 .await
1046 .iter()
1047 .map(|entry| entry.data.len())
1048 .sum();
1049
1050 let (tx, mut rx) = mpsc::channel(1_000_000);
1051 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1_000_000);
1052
1053 let mut encoder = PackEncoder::new(entries_number, 0, tx);
1054 tokio::spawn(async move {
1055 time_it!("test parallel encode sha256", {
1056 encoder.parallel_encode(entry_rx).await.unwrap();
1057 });
1058 });
1059
1060 tokio::spawn(async move {
1061 let entries = entries.lock().await;
1062 for entry in entries.iter() {
1063 entry_tx
1064 .send(MetaAttached {
1065 inner: entry.clone(),
1066 meta: EntryMeta::new(),
1067 })
1068 .await
1069 .unwrap();
1070 }
1071 drop(entry_tx);
1072 tracing::info!("all entries sent");
1073 });
1074
1075 let mut result = Vec::new();
1076 while let Some(chunk) = rx.recv().await {
1077 result.extend(chunk);
1078 }
1079
1080 let pack_size = result.len();
1081 let compression_rate = if total_original_size > 0 {
1082 1.0 - (pack_size as f64 / total_original_size as f64)
1083 } else {
1084 0.0
1085 };
1086
1087 let duration = start.elapsed();
1088 tracing::info!("sha256 test executed in: {:.2?}", duration);
1089 tracing::info!("new pack file size: {}", result.len());
1090 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1091 check_format(&result);
1092 }
1093
1094 #[tokio::test]
1095 async fn test_pack_encoder_large_file() {
1096 let _guard = set_hash_kind_for_test(HashKind::Sha1);
1097 init_logger();
1098 let entries = get_entries_for_test().await;
1099 let entries_number = entries.lock().await.len();
1100
1101 let total_original_size: usize = entries
1102 .lock()
1103 .await
1104 .iter()
1105 .map(|entry| entry.data.len())
1106 .sum();
1107
1108 let start = Instant::now();
1109 let (tx, mut rx) = mpsc::channel(100_000);
1111 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1112
1113 let mut encoder = PackEncoder::new(entries_number, 0, tx);
1114 tokio::spawn(async move {
1115 time_it!("test encode no parallel", {
1116 encoder.encode(entry_rx).await.unwrap();
1117 });
1118 });
1119
1120 tokio::spawn(async move {
1122 let entries = entries.lock().await;
1123 for entry in entries.iter() {
1124 entry_tx
1125 .send(MetaAttached {
1126 inner: entry.clone(),
1127 meta: EntryMeta::new(),
1128 })
1129 .await
1130 .unwrap();
1131 }
1132 drop(entry_tx);
1133 tracing::info!("all entries sent");
1134 });
1135
1136 let mut result = Vec::new();
1142 while let Some(chunk) = rx.recv().await {
1143 result.extend(chunk);
1144 }
1145
1146 let pack_size = result.len();
1147 let compression_rate = if total_original_size > 0 {
1148 1.0 - (pack_size as f64 / total_original_size as f64)
1149 } else {
1150 0.0
1151 };
1152
1153 let duration = start.elapsed();
1154 tracing::info!("test executed in: {:.2?}", duration);
1155 tracing::info!("new pack file size: {}", pack_size);
1156 tracing::info!("original total size: {}", total_original_size);
1157 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1158 tracing::info!(
1159 "space saved: {} bytes",
1160 total_original_size.saturating_sub(pack_size)
1161 );
1162 }
1163 #[tokio::test]
1164 async fn test_pack_encoder_large_file_sha256() {
1165 let _guard = set_hash_kind_for_test(HashKind::Sha256);
1166 init_logger();
1167 let entries = get_entries_for_test_sha256().await;
1168 let entries_number = entries.lock().await.len();
1169
1170 let total_original_size: usize = entries
1171 .lock()
1172 .await
1173 .iter()
1174 .map(|entry| entry.data.len())
1175 .sum();
1176
1177 let start = Instant::now();
1178 let (tx, mut rx) = mpsc::channel(100_000);
1180 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1181
1182 let mut encoder = PackEncoder::new(entries_number, 0, tx);
1183 tokio::spawn(async move {
1184 time_it!("test encode no parallel sha256", {
1185 encoder.encode(entry_rx).await.unwrap();
1186 });
1187 });
1188
1189 tokio::spawn(async move {
1191 let entries = entries.lock().await;
1192 for entry in entries.iter() {
1193 entry_tx
1194 .send(MetaAttached {
1195 inner: entry.clone(),
1196 meta: EntryMeta::new(),
1197 })
1198 .await
1199 .unwrap();
1200 }
1201 drop(entry_tx);
1202 tracing::info!("all entries sent");
1203 });
1204
1205 let mut result = Vec::new();
1211 while let Some(chunk) = rx.recv().await {
1212 result.extend(chunk);
1213 }
1214
1215 let pack_size = result.len();
1216 let compression_rate = if total_original_size > 0 {
1217 1.0 - (pack_size as f64 / total_original_size as f64)
1218 } else {
1219 0.0
1220 };
1221
1222 let duration = start.elapsed();
1223 tracing::info!("test executed in: {:.2?}", duration);
1224 tracing::info!("new pack file size: {}", pack_size);
1225 tracing::info!("original total size: {}", total_original_size);
1226 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1227 tracing::info!(
1228 "space saved: {} bytes",
1229 total_original_size.saturating_sub(pack_size)
1230 );
1231 }
1232
1233 #[tokio::test]
1234 async fn test_pack_encoder_with_zstdelta() {
1235 let _guard = set_hash_kind_for_test(HashKind::Sha1);
1236 init_logger();
1237 let entries = get_entries_for_test().await;
1238 let entries_number = entries.lock().await.len();
1239
1240 let total_original_size: usize = entries
1241 .lock()
1242 .await
1243 .iter()
1244 .map(|entry| entry.data.len())
1245 .sum();
1246
1247 let start = Instant::now();
1248 let (tx, mut rx) = mpsc::channel(100_000);
1249 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1250
1251 let encoder = PackEncoder::new(entries_number, 10, tx);
1252 encoder.encode_async_with_zstdelta(entry_rx).await.unwrap();
1253
1254 tokio::spawn(async move {
1256 let entries = entries.lock().await;
1257 for entry in entries.iter() {
1258 entry_tx
1259 .send(MetaAttached {
1260 inner: entry.clone(),
1261 meta: EntryMeta::new(),
1262 })
1263 .await
1264 .unwrap();
1265 }
1266 drop(entry_tx);
1267 tracing::info!("all entries sent");
1268 });
1269
1270 let mut result = Vec::new();
1271 while let Some(chunk) = rx.recv().await {
1272 result.extend(chunk);
1273 }
1274
1275 let pack_size = result.len();
1276 let compression_rate = if total_original_size > 0 {
1277 1.0 - (pack_size as f64 / total_original_size as f64)
1278 } else {
1279 0.0
1280 };
1281
1282 let duration = start.elapsed();
1283 tracing::info!("test executed in: {:.2?}", duration);
1284 tracing::info!("new pack file size: {}", pack_size);
1285 tracing::info!("original total size: {}", total_original_size);
1286 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1287 tracing::info!(
1288 "space saved: {} bytes",
1289 total_original_size.saturating_sub(pack_size)
1290 );
1291
1292 check_format(&result);
1294 }
1295 #[tokio::test]
1296 async fn test_pack_encoder_with_zstdelta_sha256() {
1297 let _guard = set_hash_kind_for_test(HashKind::Sha256);
1298 init_logger();
1299 let entries = get_entries_for_test_sha256().await;
1300 let entries_number = entries.lock().await.len();
1301
1302 let total_original_size: usize = entries
1303 .lock()
1304 .await
1305 .iter()
1306 .map(|entry| entry.data.len())
1307 .sum();
1308
1309 let start = Instant::now();
1310 let (tx, mut rx) = mpsc::channel(100_000);
1311 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1312
1313 let encoder = PackEncoder::new(entries_number, 10, tx);
1314 encoder.encode_async_with_zstdelta(entry_rx).await.unwrap();
1315
1316 tokio::spawn(async move {
1318 let entries = entries.lock().await;
1319 for entry in entries.iter() {
1320 entry_tx
1321 .send(MetaAttached {
1322 inner: entry.clone(),
1323 meta: EntryMeta::new(),
1324 })
1325 .await
1326 .unwrap();
1327 }
1328 drop(entry_tx);
1329 tracing::info!("all entries sent");
1330 });
1331
1332 let mut result = Vec::new();
1333 while let Some(chunk) = rx.recv().await {
1334 result.extend(chunk);
1335 }
1336
1337 let pack_size = result.len();
1338 let compression_rate = if total_original_size > 0 {
1339 1.0 - (pack_size as f64 / total_original_size as f64)
1340 } else {
1341 0.0
1342 };
1343
1344 let duration = start.elapsed();
1345 tracing::info!("test executed in: {:.2?}", duration);
1346 tracing::info!("new pack file size: {}", pack_size);
1347 tracing::info!("original total size: {}", total_original_size);
1348 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1349 tracing::info!(
1350 "space saved: {} bytes",
1351 total_original_size.saturating_sub(pack_size)
1352 );
1353
1354 check_format(&result);
1356 }
1357
1358 #[test]
1359 fn test_encode_offset() {
1360 let value = 16389;
1362
1363 let data = encode_offset(value);
1364 println!("{data:?}");
1365 let mut reader = Cursor::new(data);
1366 let (result, _) = read_offset_encoding(&mut reader).unwrap();
1367 println!("result: {result}");
1368 assert_eq!(result, value as u64);
1369 }
1370
1371 #[tokio::test]
1372 async fn test_pack_encoder_large_file_with_delta() {
1373 let _guard = set_hash_kind_for_test(HashKind::Sha1);
1374 init_logger();
1375 let entries = get_entries_for_test().await;
1376 let entries_number = entries.lock().await.len();
1377
1378 let total_original_size: usize = entries
1379 .lock()
1380 .await
1381 .iter()
1382 .map(|entry| entry.data.len())
1383 .sum();
1384
1385 let (tx, mut rx) = mpsc::channel(100_000);
1386 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1387
1388 let encoder = PackEncoder::new(entries_number, 10, tx);
1389
1390 let start = Instant::now(); encoder.encode_async(entry_rx).await.unwrap();
1392
1393 tokio::spawn(async move {
1395 let entries = entries.lock().await;
1396 for entry in entries.iter() {
1397 entry_tx
1398 .send(MetaAttached {
1399 inner: entry.clone(),
1400 meta: EntryMeta::new(),
1401 })
1402 .await
1403 .unwrap();
1404 }
1405 drop(entry_tx);
1406 tracing::info!("all entries sent");
1407 });
1408
1409 let mut result = Vec::new();
1410 while let Some(chunk) = rx.recv().await {
1411 result.extend(chunk);
1412 }
1413
1414 let pack_size = result.len();
1415 let compression_rate = if total_original_size > 0 {
1416 1.0 - (pack_size as f64 / total_original_size as f64)
1417 } else {
1418 0.0
1419 };
1420
1421 let duration = start.elapsed();
1422 tracing::info!("test executed in: {:.2?}", duration);
1423 tracing::info!("new pack file size: {}", pack_size);
1424 tracing::info!("original total size: {}", total_original_size);
1425 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1426 tracing::info!(
1427 "space saved: {} bytes",
1428 total_original_size.saturating_sub(pack_size)
1429 );
1430
1431 check_format(&result);
1433 }
1434 #[tokio::test]
1435 async fn test_pack_encoder_large_file_with_delta_sha256() {
1436 let _guard = set_hash_kind_for_test(HashKind::Sha256);
1437 init_logger();
1438 let entries = get_entries_for_test_sha256().await;
1439 let entries_number = entries.lock().await.len();
1440
1441 let total_original_size: usize = entries
1442 .lock()
1443 .await
1444 .iter()
1445 .map(|entry| entry.data.len())
1446 .sum();
1447
1448 let (tx, mut rx) = mpsc::channel(100_000);
1449 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1450
1451 let encoder = PackEncoder::new(entries_number, 10, tx);
1452
1453 let start = Instant::now(); encoder.encode_async(entry_rx).await.unwrap();
1455
1456 tokio::spawn(async move {
1458 let entries = entries.lock().await;
1459 for entry in entries.iter() {
1460 entry_tx
1461 .send(MetaAttached {
1462 inner: entry.clone(),
1463 meta: EntryMeta::new(),
1464 })
1465 .await
1466 .unwrap();
1467 }
1468 drop(entry_tx);
1469 tracing::info!("all entries sent");
1470 });
1471
1472 let mut result = Vec::new();
1473 while let Some(chunk) = rx.recv().await {
1474 result.extend(chunk);
1475 }
1476
1477 let pack_size = result.len();
1478 let compression_rate = if total_original_size > 0 {
1479 1.0 - (pack_size as f64 / total_original_size as f64)
1480 } else {
1481 0.0
1482 };
1483
1484 let duration = start.elapsed();
1485 tracing::info!("test executed in: {:.2?}", duration);
1486 tracing::info!("new pack file size: {}", pack_size);
1487 tracing::info!("original total size: {}", total_original_size);
1488 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1489 tracing::info!(
1490 "space saved: {} bytes",
1491 total_original_size.saturating_sub(pack_size)
1492 );
1493
1494 check_format(&result);
1496 }
1497
1498 #[tokio::test]
1499 async fn test_pack_encoder_output_to_files() {
1500 let _guard = set_hash_kind_for_test(HashKind::Sha1);
1501 init_logger();
1502 let entries = get_entries_for_test().await;
1503 let entries_number = entries.lock().await.len();
1504
1505 let total_original_size: usize = entries
1506 .lock()
1507 .await
1508 .iter()
1509 .map(|entry| entry.data.len())
1510 .sum();
1511
1512 let start = Instant::now();
1513
1514 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1515 let dir = tempdir().unwrap();
1517 let path = dir.path();
1518
1519 tokio::spawn(async move {
1521 let entries = entries.lock().await;
1522 for entry in entries.iter() {
1523 entry_tx
1524 .send(MetaAttached {
1525 inner: entry.clone(),
1526 meta: EntryMeta::new(),
1527 })
1528 .await
1529 .unwrap();
1530 }
1531 drop(entry_tx);
1532 tracing::info!("all entries sent");
1533 });
1534
1535 encode_and_output_to_files(entry_rx, entries_number, path.to_path_buf(), 0)
1536 .await
1537 .unwrap();
1538
1539 let mut pack_file = None;
1541 let mut idx_file = None;
1542 for entry in std::fs::read_dir(path).unwrap() {
1543 let entry = entry.unwrap();
1544 let file_name = entry.file_name();
1545 tracing::info!("file name: {:?}", file_name);
1546 let file_name = file_name.to_string_lossy();
1547 if file_name.ends_with(".pack") {
1548 pack_file = Some(entry.path());
1549 } else if file_name.ends_with(".idx") {
1550 idx_file = Some(entry.path());
1551 }
1552 }
1553 let pack_file = pack_file.expect("pack file not generated");
1554 let idx_file = idx_file.expect("idx file not generated");
1555 assert!(
1556 pack_file.metadata().unwrap().len() > 0,
1557 "pack file is empty"
1558 );
1559 assert!(idx_file.metadata().unwrap().len() > 0, "idx file is empty");
1560
1561 let duration = start.elapsed();
1562 tracing::info!("test executed in: {:.2?}", duration);
1563 tracing::info!("original total size: {}", total_original_size);
1564 }
1565
1566 #[tokio::test]
1567 async fn test_pack_encoder_output_to_files_with_delta() {
1568 let _guard = set_hash_kind_for_test(HashKind::Sha1);
1569 init_logger();
1570 let entries = get_entries_for_test().await;
1571 let entries_number = entries.lock().await.len();
1572
1573 let total_original_size: usize = entries
1574 .lock()
1575 .await
1576 .iter()
1577 .map(|entry| entry.data.len())
1578 .sum();
1579
1580 let start = Instant::now();
1581
1582 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1583 let dir = tempdir().unwrap();
1585 let path = dir.path();
1586
1587 tokio::spawn(async move {
1589 let entries = entries.lock().await;
1590 for entry in entries.iter() {
1591 entry_tx
1592 .send(MetaAttached {
1593 inner: entry.clone(),
1594 meta: EntryMeta::new(),
1595 })
1596 .await
1597 .unwrap();
1598 }
1599 drop(entry_tx);
1600 tracing::info!("all entries sent");
1601 });
1602
1603 encode_and_output_to_files(entry_rx, entries_number, path.to_path_buf(), 10)
1604 .await
1605 .unwrap();
1606
1607 let mut pack_file = None;
1609 let mut idx_file = None;
1610 for entry in std::fs::read_dir(path).unwrap() {
1611 let entry = entry.unwrap();
1612 let file_name = entry.file_name();
1613 tracing::info!("file name: {:?}", file_name);
1614 let file_name = file_name.to_string_lossy();
1615 if file_name.ends_with(".pack") {
1616 pack_file = Some(entry.path());
1617 } else if file_name.ends_with(".idx") {
1618 idx_file = Some(entry.path());
1619 }
1620 }
1621 let pack_file = pack_file.expect("pack file not generated");
1622 let idx_file = idx_file.expect("idx file not generated");
1623 assert!(
1624 pack_file.metadata().unwrap().len() > 0,
1625 "pack file is empty"
1626 );
1627 assert!(idx_file.metadata().unwrap().len() > 0, "idx file is empty");
1628
1629 let duration = start.elapsed();
1630 tracing::info!("test executed in: {:.2?}", duration);
1631 tracing::info!("original total size: {}", total_original_size);
1632 }
1633}