1use std::{
5 cmp::Ordering,
6 collections::VecDeque,
7 hash::{Hash, Hasher},
8 io::Write,
9 path::{Path, PathBuf},
10};
11
12use ahash::AHasher;
13use chrono::Utc;
15use flate2::write::ZlibEncoder;
16use natord::compare;
17use rayon::prelude::*;
18use tokio::io::AsyncWriteExt as TokioAsyncWriteExt;
20use tokio::{fs::File, sync::mpsc, task::JoinHandle};
21
22use crate::delta;
24use crate::{
25 errors::GitError,
26 hash::ObjectHash,
27 internal::{
28 metadata::{EntryMeta, MetaAttached},
29 object::types::ObjectType,
30 pack::{entry::Entry, index_entry::IndexEntry, pack_index::IdxBuilder},
31 },
32 time_it,
33 utils::HashAlgorithm,
34 zstdelta,
35};
36
37const MAX_CHAIN_LEN: usize = 50;
38const MIN_DELTA_RATE: f64 = 0.5; pub struct PackEncoder {
43 object_number: usize,
45 process_index: usize,
46 window_size: usize,
47 pack_sender: Option<mpsc::Sender<Vec<u8>>>,
49 idx_sender: Option<mpsc::Sender<Vec<u8>>>,
50 idx_entries: Option<Vec<IndexEntry>>,
52 inner_offset: usize, inner_hash: HashAlgorithm, final_hash: Option<ObjectHash>,
55 start_encoding: bool,
56}
57
58pub async fn encode_and_output_to_files(
69 raw_entries_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
70 object_number: usize,
71 output_dir: PathBuf,
72 window_size: usize,
73) -> Result<(), GitError> {
74 let (pack_tx, mut pack_rx) = mpsc::channel(1024);
75 let (idx_tx, mut idx_rx) = mpsc::channel(1024);
76 let mut pack_encoder = PackEncoder::new_with_idx(object_number, window_size, pack_tx, idx_tx);
77
78 let now = Utc::now();
80 let timestamp = now.format("%Y%m%d%H%M%S%.3f").to_string(); let tmp_path = output_dir.join(format!("{}objects.pack.tmp", timestamp));
82 let mut pack_file = File::create(&tmp_path).await?;
83
84 let pack_writer = tokio::spawn(async move {
85 while let Some(chunk) = pack_rx.recv().await {
86 TokioAsyncWriteExt::write_all(&mut pack_file, &chunk).await?;
87 }
88 TokioAsyncWriteExt::flush(&mut pack_file).await?;
90 Ok::<(), GitError>(())
91 });
92
93 pack_encoder.encode(raw_entries_rx).await?;
94
95 let pack_write_result = pack_writer
97 .await
98 .map_err(|e| GitError::PackEncodeError(format!("pack writer task join error: {e}")))?;
99 pack_write_result?;
100
101 let final_pack_name =
102 output_dir.join(format!("pack-{}.pack", pack_encoder.final_hash.unwrap()));
103 let final_idx_name = output_dir.join(format!("pack-{}.idx", pack_encoder.final_hash.unwrap()));
104 tokio::fs::rename(tmp_path, &final_pack_name).await?;
105
106 let mut idx_file = File::create(&final_idx_name).await?;
107 let idx_writer = tokio::spawn(async move {
108 while let Some(chunk) = idx_rx.recv().await {
109 TokioAsyncWriteExt::write_all(&mut idx_file, &chunk).await?;
111 }
112 TokioAsyncWriteExt::flush(&mut idx_file).await?;
114 Ok::<(), GitError>(())
115 });
116
117 pack_encoder.encode_idx_file().await?;
119
120 let idx_write_result = idx_writer
121 .await
122 .map_err(|e| GitError::PackEncodeError(format!("idx writer task join error: {e}")))?;
123 idx_write_result?;
124
125 Ok(())
126}
127
128fn encode_header(object_number: usize) -> Vec<u8> {
131 let mut result: Vec<u8> = vec![
132 b'P', b'A', b'C', b'K', 0, 0, 0, 2, ];
135 assert_ne!(object_number, 0); assert!(object_number <= u32::MAX as usize);
137 result.append((object_number as u32).to_be_bytes().to_vec().as_mut()); result
140}
141
142fn encode_offset(mut value: usize) -> Vec<u8> {
144 assert_ne!(value, 0, "offset can't be zero");
145 let mut bytes = Vec::new();
146
147 bytes.push((value & 0x7F) as u8);
148 value >>= 7;
149 while value != 0 {
150 value -= 1;
151 let byte = (value & 0x7F) as u8 | 0x80; value >>= 7;
153 bytes.push(byte);
154 }
155 bytes.reverse();
156 bytes
157}
158
159fn encode_one_object(entry: &Entry, offset: Option<usize>) -> Result<Vec<u8>, GitError> {
162 let obj_data = &entry.data;
164 let obj_data_len = obj_data.len();
165 let obj_type_number = entry.obj_type.to_pack_type_u8()?;
166
167 let mut encoded_data = Vec::new();
168
169 let mut header_data = vec![(0x80 | (obj_type_number << 4)) + (obj_data_len & 0x0f) as u8];
171 let mut size = obj_data_len >> 4; if size > 0 {
173 while size > 0 {
174 if size >> 7 > 0 {
175 header_data.push((0x80 | size) as u8);
176 size >>= 7;
177 } else {
178 header_data.push(size as u8);
179 break;
180 }
181 }
182 } else {
183 header_data.push(0);
184 }
185 encoded_data.extend(header_data);
186
187 if entry.obj_type == ObjectType::OffsetDelta || entry.obj_type == ObjectType::OffsetZstdelta {
189 let offset_data = encode_offset(offset.unwrap());
190 encoded_data.extend(offset_data);
191 } else if entry.obj_type == ObjectType::HashDelta {
192 unreachable!("unsupported type")
193 }
194
195 let mut inflate = ZlibEncoder::new(Vec::new(), flate2::Compression::default());
197 inflate
198 .write_all(obj_data)
199 .expect("zlib compress should never failed");
200 inflate.flush().expect("zlib flush should never failed");
201 let compressed_data = inflate.finish().expect("zlib compress should never failed");
202 encoded_data.extend(compressed_data);
204 Ok(encoded_data)
205}
206
207fn magic_sort(a: &MetaAttached<Entry, EntryMeta>, b: &MetaAttached<Entry, EntryMeta>) -> Ordering {
209 let path_a = a.meta.file_path.as_ref();
210 let path_b = b.meta.file_path.as_ref();
211
212 match (path_a, path_b) {
214 (Some(pa), Some(pb)) => {
215 let pa = Path::new(pa);
216 let pb = Path::new(pb);
217
218 let dir_ord = pa.parent().cmp(&pb.parent());
220 if dir_ord != Ordering::Equal {
221 return dir_ord;
222 }
223
224 let name_a = pa.file_name().unwrap_or_default().to_string_lossy();
226 let name_b = pb.file_name().unwrap_or_default().to_string_lossy();
227 let name_ord = compare(&name_a, &name_b);
228 if name_ord != Ordering::Equal {
229 return name_ord;
230 }
231 }
232 (Some(_), None) => return Ordering::Less, (None, Some(_)) => return Ordering::Greater, (None, None) => {}
235 }
236
237 let ord = b.inner.data.len().cmp(&a.inner.data.len());
238 if ord != Ordering::Equal {
239 return ord;
240 }
241
242 (a as *const MetaAttached<Entry, EntryMeta>).cmp(&(b as *const MetaAttached<Entry, EntryMeta>))
244}
245
246fn calc_hash(data: &[u8]) -> u64 {
248 let mut hasher = AHasher::default();
249 data.hash(&mut hasher);
250 hasher.finish()
251}
252
253fn cheap_similar(a: &[u8], b: &[u8]) -> bool {
255 let k = a.len().min(b.len()).min(128);
256 if k == 0 {
257 return false;
258 }
259 calc_hash(&a[..k]) == calc_hash(&b[..k])
260}
261
262impl PackEncoder {
263 pub fn new(object_number: usize, window_size: usize, sender: mpsc::Sender<Vec<u8>>) -> Self {
264 PackEncoder {
265 object_number,
266 window_size,
267 process_index: 0,
268 pack_sender: Some(sender),
270 idx_sender: None,
271 idx_entries: None,
272 inner_offset: 12, inner_hash: HashAlgorithm::new(), final_hash: None,
275 start_encoding: false,
276 }
277 }
278
279 pub fn new_with_idx(
280 object_number: usize,
281 window_size: usize,
282 pack_sender: mpsc::Sender<Vec<u8>>,
283 idx_sender: mpsc::Sender<Vec<u8>>,
284 ) -> Self {
285 PackEncoder {
286 object_number,
288 window_size,
289 process_index: 0,
290 pack_sender: Some(pack_sender),
292 idx_sender: Some(idx_sender),
293 idx_entries: None,
294 inner_offset: 12, inner_hash: HashAlgorithm::new(), final_hash: None,
297 start_encoding: false,
298 }
299 }
300
301 pub fn drop_sender(&mut self) {
302 self.pack_sender.take(); }
304
305 pub async fn send_data(&mut self, data: Vec<u8>) {
306 if let Some(sender) = &self.pack_sender {
307 sender.send(data).await.unwrap();
308 }
309 }
310
311 pub fn get_hash(&self) -> Option<ObjectHash> {
313 self.final_hash
314 }
315
316 pub async fn encode(
324 &mut self,
325 entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
326 ) -> Result<(), GitError> {
327 if self.window_size == 0 {
329 self.parallel_encode(entry_rx).await
330 } else {
331 self.inner_encode(entry_rx, false).await
332 }
333 }
334
335 pub async fn encode_with_zstdelta(
337 &mut self,
338 entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
339 ) -> Result<(), GitError> {
340 self.inner_encode(entry_rx, true).await
341 }
342
343 async fn inner_encode(
346 &mut self,
347 mut entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
348 enable_zstdelta: bool,
349 ) -> Result<(), GitError> {
350 let head = encode_header(self.object_number);
351 self.send_data(head.clone()).await;
352 self.inner_hash.update(&head);
353
354 if self.start_encoding {
356 return Err(GitError::PackEncodeError(
357 "encoding operation is already in progress".to_string(),
358 ));
359 }
360
361 let mut commits: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
362 let mut trees: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
363 let mut blobs: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
364 let mut tags: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
365 while let Some(entry) = entry_rx.recv().await {
366 match entry.inner.obj_type {
367 ObjectType::Commit => {
368 commits.push(entry);
369 }
370 ObjectType::Tree => {
371 trees.push(entry);
372 }
373 ObjectType::Blob => {
374 blobs.push(entry);
375 }
376 ObjectType::Tag => {
377 tags.push(entry);
378 }
379 _ => {
380 return Err(GitError::PackEncodeError(format!(
381 "object type `{}` is not supported by delta-window pack encoding",
382 entry.inner.obj_type
383 )));
384 }
385 }
386 }
387
388 commits.sort_by(magic_sort);
389 trees.sort_by(magic_sort);
390 blobs.sort_by(magic_sort);
391 tags.sort_by(magic_sort);
392 tracing::info!(
393 "numbers : commits: {:?} trees: {:?} blobs:{:?} tag :{:?}",
394 commits.len(),
395 trees.len(),
396 blobs.len(),
397 tags.len()
398 );
399
400 let (commit_results, tree_results, blob_results, tag_results) = tokio::try_join!(
402 tokio::task::spawn_blocking(move || {
403 Self::try_as_offset_delta(
404 commits
405 .into_iter()
406 .map(|entry_with_meta| entry_with_meta.inner)
407 .collect(),
408 10,
409 enable_zstdelta,
410 )
411 }),
412 tokio::task::spawn_blocking(move || {
413 Self::try_as_offset_delta(
414 trees
415 .into_iter()
416 .map(|entry_with_meta| entry_with_meta.inner)
417 .collect(),
418 10,
419 enable_zstdelta,
420 )
421 }),
422 tokio::task::spawn_blocking(move || {
423 Self::try_as_offset_delta(
424 blobs
425 .into_iter()
426 .map(|entry_with_meta| entry_with_meta.inner)
427 .collect(),
428 10,
429 enable_zstdelta,
430 )
431 }),
432 tokio::task::spawn_blocking(move || {
433 Self::try_as_offset_delta(
434 tags.into_iter()
435 .map(|entry_with_meta| entry_with_meta.inner)
436 .collect(),
437 10,
438 enable_zstdelta,
439 )
440 }),
441 )
442 .map_err(|e| GitError::PackEncodeError(format!("Task join error: {e}")))?;
443
444 let commit_res = commit_results?;
445 let tree_res = tree_results?;
446 let blob_res = blob_results?;
447 let tag_res = tag_results?;
448
449 let mut all_res = vec![commit_res, tree_res, blob_res, tag_res];
450
451 let mut idx_entries = Vec::new();
452 for res in &mut all_res {
453 for data in res {
454 data.1.offset = self.inner_offset as u64;
455 self.write_all_and_update(&data.0).await;
456 idx_entries.push(data.1.clone());
457 }
458 }
459
460 self.idx_entries = Some(idx_entries);
461
462 let hash_result = self.inner_hash.clone().finalize();
464 self.final_hash = Some(ObjectHash::from_bytes(&hash_result).unwrap());
465 self.send_data(hash_result.to_vec()).await;
466
467 self.drop_sender();
468 Ok(())
469 }
470
471 fn try_as_offset_delta(
479 mut bucket: Vec<Entry>,
480 window_size: usize,
481 enable_zstdelta: bool,
482 ) -> Result<Vec<(Vec<u8>, IndexEntry)>, GitError> {
483 let mut current_offset = 0usize;
484 let mut window: VecDeque<(Entry, usize)> = VecDeque::with_capacity(window_size);
485 let mut res: Vec<(Vec<u8>, IndexEntry)> = Vec::new();
486 for entry in bucket.iter_mut() {
489 let mut best_base: Option<&(Entry, usize)> = None;
492 let mut best_rate: f64 = 0.0;
493 let tie_epsilon: f64 = 0.15;
494
495 let candidates: Vec<_> = window
496 .par_iter()
497 .with_min_len(3)
498 .filter_map(|try_base| {
499 if try_base.0.obj_type != entry.obj_type {
500 return None;
501 }
502
503 if try_base.0.chain_len >= MAX_CHAIN_LEN {
504 return None;
505 }
506
507 if try_base.0.hash == entry.hash {
508 return None;
509 }
510
511 let sym_ratio = (try_base.0.data.len().min(entry.data.len()) as f64)
512 / (try_base.0.data.len().max(entry.data.len()) as f64);
513 if sym_ratio < 0.5 {
514 return None;
515 }
516
517 if !cheap_similar(&try_base.0.data, &entry.data) {
518 return None;
519 }
520
521 let rate = if (try_base.0.data.len() + entry.data.len()) / 2 > 64 {
522 delta::heuristic_encode_rate_parallel(&try_base.0.data, &entry.data)
523 } else {
524 delta::encode_rate(&try_base.0.data, &entry.data)
525 };
528
529 if rate > MIN_DELTA_RATE {
530 Some((rate, try_base))
531 } else {
532 None
533 }
534 })
535 .collect();
536
537 for (rate, try_base) in candidates {
538 match best_base {
539 None => {
540 best_rate = rate;
541 best_base = Some(try_base);
543 }
544 Some(best_base_ref) => {
545 let is_better = if rate > best_rate + tie_epsilon {
546 true
547 } else if (rate - best_rate).abs() <= tie_epsilon {
548 try_base.0.chain_len > best_base_ref.0.chain_len
549 } else {
550 false
551 };
552
553 if is_better {
554 best_rate = rate;
555 best_base = Some(try_base);
556 }
557 }
558 }
559 }
560
561 let mut entry_for_window = entry.clone();
562
563 let offset = best_base.map(|best_base| {
564 let delta = if enable_zstdelta {
565 entry.obj_type = ObjectType::OffsetZstdelta;
566 zstdelta::diff(&best_base.0.data, &entry.data)
567 .map_err(|e| {
568 GitError::DeltaObjectError(format!("zstdelta diff failed: {e}"))
569 })
570 .unwrap()
571 } else {
572 entry.obj_type = ObjectType::OffsetDelta;
573 delta::encode(&best_base.0.data, &entry.data)
574 };
575 entry.data = delta;
577 entry.chain_len = best_base.0.chain_len + 1;
578 current_offset - best_base.1
579 });
580
581 entry_for_window.chain_len = entry.chain_len;
582 let obj_data = encode_one_object(entry, offset)?;
583 window.push_back((entry_for_window, current_offset));
584 if window.len() > window_size {
585 window.pop_front();
586 }
587 res.push((obj_data.clone(), IndexEntry::new(entry, 0)));
588 current_offset += obj_data.len();
589 }
590 Ok(res)
591 }
592
593 pub async fn parallel_encode(
595 &mut self,
596 mut entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
597 ) -> Result<(), GitError> {
598 if self.window_size != 0 {
599 return Err(GitError::PackEncodeError(
600 "parallel encode only works when window_size == 0".to_string(),
601 ));
602 }
603
604 let head = encode_header(self.object_number);
605 self.send_data(head.clone()).await;
606 self.inner_hash.update(&head);
607
608 if self.start_encoding {
610 return Err(GitError::PackEncodeError(
611 "encoding operation is already in progress".to_string(),
612 ));
613 }
614
615 let mut idx_entries = Vec::new();
616 let batch_size = usize::max(1000, entry_rx.max_capacity() / 10); tracing::info!("encode with batch size: {}", batch_size);
618 loop {
619 let mut batch_entries = Vec::with_capacity(batch_size);
620 time_it!("parallel encode: receive batch", {
621 for _ in 0..batch_size {
622 match entry_rx.recv().await {
623 Some(entry) => {
624 if entry.inner.obj_type.is_ai_object() {
625 return Err(GitError::PackEncodeError(format!(
626 "AI object type `{}` cannot be encoded in a pack file",
627 entry.inner.obj_type
628 )));
629 }
630 batch_entries.push(entry.inner);
631 self.process_index += 1;
632 }
633 None => break,
634 }
635 }
636 });
637
638 if batch_entries.is_empty() {
639 break;
640 }
641
642 let batch_result: Vec<Result<(Vec<u8>, IndexEntry), GitError>> =
644 time_it!("parallel encode: encode batch", {
645 batch_entries
646 .par_iter()
647 .map(|entry| {
648 encode_one_object(entry, None)
649 .map(|encoded| (encoded, IndexEntry::new(entry, 0)))
650 })
651 .collect()
652 });
653
654 time_it!("parallel encode: write batch", {
655 for obj_data in batch_result {
656 let mut obj_data = obj_data?;
657 obj_data.1.offset = self.inner_offset as u64;
658 self.write_all_and_update(&obj_data.0).await;
659 idx_entries.push(obj_data.1);
660 }
661 });
662 }
663
664 tracing::debug!("parallel encode idx entries: {:?}", idx_entries.len());
665 if self.process_index != self.object_number {
666 panic!(
667 "not all objects are encoded, process:{}, total:{}",
668 self.process_index, self.object_number
669 );
670 }
671
672 let hash_result = self.inner_hash.clone().finalize();
674 self.final_hash = Some(ObjectHash::from_bytes(&hash_result).unwrap());
675 self.send_data(hash_result.to_vec()).await;
676 self.drop_sender();
677
678 self.idx_entries = Some(idx_entries);
679 Ok(())
680 }
681
682 async fn write_all_and_update(&mut self, data: &[u8]) {
684 self.inner_hash.update(data);
685 self.inner_offset += data.len();
686 self.send_data(data.to_vec()).await;
687 }
688
689 async fn generate_idx_file(&mut self) -> Result<(), GitError> {
690 let final_hash = self.final_hash
691 .ok_or(GitError::PackEncodeError("final_hash is missing,The pack file must be generated before the index file is produced.".into()))?;
692 let idx_entries = self.idx_entries.clone().ok_or(GitError::PackEncodeError(
693 "The pack file must be generated before the index file is produced.".into(),
694 ))?;
695 let mut idx_builder = IdxBuilder::new(
696 self.object_number,
697 self.idx_sender.clone().unwrap(),
698 final_hash,
699 );
700 idx_builder.write_idx(idx_entries).await?;
701 Ok(())
702 }
703
704 pub async fn encode_async(
710 mut self,
711 rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
712 ) -> Result<JoinHandle<()>, GitError> {
713 Ok(tokio::spawn(async move {
714 if self.window_size == 0 {
715 self.parallel_encode(rx).await.unwrap()
716 } else {
717 self.encode(rx).await.unwrap()
718 }
719 }))
720 }
721
722 pub async fn encode_async_with_zstdelta(
724 mut self,
725 rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
726 ) -> Result<JoinHandle<()>, GitError> {
727 Ok(tokio::spawn(async move {
728 self.encode_with_zstdelta(rx).await.unwrap()
730 }))
731 }
732
733 pub async fn encode_idx_file(&mut self) -> Result<(), GitError> {
735 if self.idx_sender.is_none() {
736 return Err(GitError::PackEncodeError(String::from(
737 "idx sender is none",
738 )));
739 }
740 self.generate_idx_file().await?;
741 self.idx_sender.take();
743 Ok(())
744 }
745}
746
747#[cfg(test)]
748mod tests {
749 use std::{env, io::Cursor, path::PathBuf, sync::Arc, time::Instant};
750
751 use tempfile::tempdir;
752 use tokio::sync::Mutex;
753
754 use super::*;
755 use crate::{
756 hash::{HashKind, ObjectHash, set_hash_kind_for_test},
757 internal::{
758 object::{blob::Blob, types::ObjectType},
759 pack::{Pack, tests::init_logger, utils::read_offset_encoding},
760 },
761 time_it,
762 };
763
764 fn check_format(data: &Vec<u8>) {
766 let max_pack_size_u64 = if cfg!(target_pointer_width = "64") {
768 6u64 * 1024 * 1024 * 1024
769 } else {
770 2u64 * 1024 * 1024 * 1024
771 };
772 let max_pack_size = usize::try_from(max_pack_size_u64).unwrap_or_else(|_| {
773 panic!(
774 "internal assertion failed: pack size cap {} does not fit in usize on this \
775 target; this should be unreachable given the target_pointer_width configuration",
776 max_pack_size_u64
777 )
778 });
779 let mut p = Pack::new(
780 None,
781 Some(max_pack_size), Some(PathBuf::from("/tmp/.cache_temp")),
783 true,
784 );
785 let mut reader = Cursor::new(data);
786 tracing::debug!("start check format");
787 p.decode(&mut reader, |_| {}, None::<fn(ObjectHash)>)
788 .expect("pack file format error");
789 }
790
791 #[tokio::test]
792 async fn test_pack_encoder() {
793 let _guard = set_hash_kind_for_test(HashKind::Sha1);
794 async fn encode_once(window_size: usize) -> Vec<u8> {
795 let (tx, mut rx) = mpsc::channel(100);
796 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
797
798 let str_vec = vec!["hello, word", "hello, world.", "!", "123141251251"];
800 let encoder = PackEncoder::new(str_vec.len(), window_size, tx);
801 encoder.encode_async(entry_rx).await.unwrap();
802
803 for str in str_vec {
804 let blob = Blob::from_content(str);
805 let entry: Entry = blob.into();
806 entry_tx
807 .send(MetaAttached {
808 inner: entry,
809 meta: EntryMeta::new(),
810 })
811 .await
812 .unwrap();
813 }
814 drop(entry_tx);
815 let mut result = Vec::new();
817 while let Some(chunk) = rx.recv().await {
818 result.extend(chunk);
819 }
820 result
821 }
822
823 let pack_without_delta = encode_once(0).await;
825 let pack_without_delta_size = pack_without_delta.len();
826 check_format(&pack_without_delta);
827
828 let pack_with_delta = encode_once(4).await;
830 assert!(pack_with_delta.len() <= pack_without_delta_size);
831 check_format(&pack_with_delta);
832 }
833 #[tokio::test]
834 async fn test_pack_encoder_sha256() {
835 let _guard = set_hash_kind_for_test(HashKind::Sha256);
836
837 async fn encode_once(window_size: usize) -> Vec<u8> {
838 let (tx, mut rx) = mpsc::channel(100);
839 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
840
841 let str_vec = vec!["hello, word", "hello, world.", "!", "123141251251"];
842 let encoder = PackEncoder::new(str_vec.len(), window_size, tx);
843 encoder.encode_async(entry_rx).await.unwrap();
844
845 for s in str_vec {
846 let blob = Blob::from_content(s);
847 let entry: Entry = blob.into();
848 entry_tx
849 .send(MetaAttached {
850 inner: entry,
851 meta: EntryMeta::new(),
852 })
853 .await
854 .unwrap();
855 }
856 drop(entry_tx);
857
858 let mut result = Vec::new();
859 while let Some(chunk) = rx.recv().await {
860 result.extend(chunk);
861 }
862 result
863 }
864
865 let pack_without_delta = encode_once(0).await;
867 let pack_without_delta_size = pack_without_delta.len();
868 check_format(&pack_without_delta);
869
870 let pack_with_delta = encode_once(4).await;
872 assert!(pack_with_delta.len() <= pack_without_delta_size);
873 check_format(&pack_with_delta);
874 }
875
876 #[tokio::test]
877 async fn test_pack_encoder_rejects_unencodable_ai_type_parallel() {
878 let (tx, _rx) = mpsc::channel(8);
879 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
880 let mut encoder = PackEncoder::new(1, 0, tx);
881
882 let mut entry: Entry = Blob::from_content("ai").into();
883 entry.obj_type = ObjectType::Task;
884 entry_tx
885 .send(MetaAttached {
886 inner: entry,
887 meta: EntryMeta::new(),
888 })
889 .await
890 .expect("send entry");
891 drop(entry_tx);
892
893 let err = encoder
894 .encode(entry_rx)
895 .await
896 .expect_err("must reject AI pack type");
897 assert!(matches!(err, GitError::PackEncodeError(_)));
898 }
899
900 #[tokio::test]
901 async fn test_pack_encoder_rejects_unencodable_ai_type_delta_window() {
902 let (tx, _rx) = mpsc::channel(8);
903 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
904 let mut encoder = PackEncoder::new(1, 10, tx);
905
906 let mut entry: Entry = Blob::from_content("ai").into();
907 entry.obj_type = ObjectType::Task;
908 entry_tx
909 .send(MetaAttached {
910 inner: entry,
911 meta: EntryMeta::new(),
912 })
913 .await
914 .expect("send entry");
915 drop(entry_tx);
916
917 let err = encoder
918 .encode(entry_rx)
919 .await
920 .expect_err("must reject AI pack type");
921 assert!(matches!(err, GitError::PackEncodeError(_)));
922 }
923
924 async fn get_entries_for_test() -> Arc<Mutex<Vec<Entry>>> {
925 let source = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
926 .join("tests/data/packs/encode-test-sha1.pack");
927
928 let mut p = Pack::new(None, None, Some(PathBuf::from("/tmp/.cache_temp")), true);
929
930 let f = std::fs::File::open(&source).unwrap();
931 tracing::info!("pack file size: {}", f.metadata().unwrap().len());
932 let mut reader = std::io::BufReader::new(f);
933 let entries = Arc::new(Mutex::new(Vec::new()));
934 let entries_clone = entries.clone();
935 p.decode(
936 &mut reader,
937 move |entry| {
938 let mut entries = entries_clone.blocking_lock();
939 entries.push(entry.inner);
940 },
941 None::<fn(ObjectHash)>,
942 )
943 .unwrap();
944 assert_eq!(p.number, entries.lock().await.len());
945 tracing::info!("total entries: {}", p.number);
946 drop(p);
947
948 entries
949 }
950 async fn get_entries_for_test_sha256() -> Arc<Mutex<Vec<Entry>>> {
951 let source = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
952 .join("tests/data/packs/encode-test-sha256.pack");
953
954 let mut p = Pack::new(None, None, Some(PathBuf::from("/tmp/.cache_temp")), true);
955
956 let f = std::fs::File::open(&source).unwrap();
957 tracing::info!("pack file size: {}", f.metadata().unwrap().len());
958 let mut reader = std::io::BufReader::new(f);
959 let entries = Arc::new(Mutex::new(Vec::new()));
960 let entries_clone = entries.clone();
961 p.decode(
962 &mut reader,
963 move |entry| {
964 let mut entries = entries_clone.blocking_lock();
965 entries.push(entry.inner);
966 },
967 None::<fn(ObjectHash)>,
968 )
969 .unwrap();
970 assert_eq!(p.number, entries.lock().await.len());
971 tracing::info!("total entries: {}", p.number);
972 drop(p);
973
974 entries
975 }
976
977 #[tokio::test]
978 async fn test_pack_encoder_parallel_large_file() {
979 let _guard = set_hash_kind_for_test(HashKind::Sha1);
980 init_logger();
981
982 let start = Instant::now();
983 let entries = get_entries_for_test().await;
984 let entries_number = entries.lock().await.len();
985
986 let total_original_size: usize = entries
987 .lock()
988 .await
989 .iter()
990 .map(|entry| entry.data.len())
991 .sum();
992
993 let (tx, mut rx) = mpsc::channel(1_000_000);
995 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1_000_000);
996
997 let mut encoder = PackEncoder::new(entries_number, 0, tx);
998 tokio::spawn(async move {
999 time_it!("test parallel encode", {
1000 encoder.parallel_encode(entry_rx).await.unwrap();
1001 });
1002 });
1003
1004 tokio::spawn(async move {
1006 let entries = entries.lock().await;
1007 for entry in entries.iter() {
1008 entry_tx
1009 .send(MetaAttached {
1010 inner: entry.clone(),
1011 meta: EntryMeta::new(),
1012 })
1013 .await
1014 .unwrap();
1015 }
1016 drop(entry_tx);
1017 tracing::info!("all entries sent");
1018 });
1019
1020 let mut result = Vec::new();
1021 while let Some(chunk) = rx.recv().await {
1022 result.extend(chunk);
1023 }
1024
1025 let pack_size = result.len();
1026 let compression_rate = if total_original_size > 0 {
1027 1.0 - (pack_size as f64 / total_original_size as f64)
1028 } else {
1029 0.0
1030 };
1031
1032 let duration = start.elapsed();
1033 tracing::info!("test executed in: {:.2?}", duration);
1034 tracing::info!("new pack file size: {}", result.len());
1035 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1036 check_format(&result);
1038 }
1039 #[tokio::test]
1040 async fn test_pack_encoder_parallel_large_file_sha256() {
1041 let _guard = set_hash_kind_for_test(HashKind::Sha256);
1042 init_logger();
1043
1044 let start = Instant::now();
1045 let entries = get_entries_for_test_sha256().await;
1047 let entries_number = entries.lock().await.len();
1048
1049 let total_original_size: usize = entries
1050 .lock()
1051 .await
1052 .iter()
1053 .map(|entry| entry.data.len())
1054 .sum();
1055
1056 let (tx, mut rx) = mpsc::channel(1_000_000);
1057 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1_000_000);
1058
1059 let mut encoder = PackEncoder::new(entries_number, 0, tx);
1060 tokio::spawn(async move {
1061 time_it!("test parallel encode sha256", {
1062 encoder.parallel_encode(entry_rx).await.unwrap();
1063 });
1064 });
1065
1066 tokio::spawn(async move {
1067 let entries = entries.lock().await;
1068 for entry in entries.iter() {
1069 entry_tx
1070 .send(MetaAttached {
1071 inner: entry.clone(),
1072 meta: EntryMeta::new(),
1073 })
1074 .await
1075 .unwrap();
1076 }
1077 drop(entry_tx);
1078 tracing::info!("all entries sent");
1079 });
1080
1081 let mut result = Vec::new();
1082 while let Some(chunk) = rx.recv().await {
1083 result.extend(chunk);
1084 }
1085
1086 let pack_size = result.len();
1087 let compression_rate = if total_original_size > 0 {
1088 1.0 - (pack_size as f64 / total_original_size as f64)
1089 } else {
1090 0.0
1091 };
1092
1093 let duration = start.elapsed();
1094 tracing::info!("sha256 test executed in: {:.2?}", duration);
1095 tracing::info!("new pack file size: {}", result.len());
1096 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1097 check_format(&result);
1098 }
1099
1100 #[tokio::test]
1101 async fn test_pack_encoder_large_file() {
1102 let _guard = set_hash_kind_for_test(HashKind::Sha1);
1103 init_logger();
1104 let entries = get_entries_for_test().await;
1105 let entries_number = entries.lock().await.len();
1106
1107 let total_original_size: usize = entries
1108 .lock()
1109 .await
1110 .iter()
1111 .map(|entry| entry.data.len())
1112 .sum();
1113
1114 let start = Instant::now();
1115 let (tx, mut rx) = mpsc::channel(100_000);
1117 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1118
1119 let mut encoder = PackEncoder::new(entries_number, 0, tx);
1120 tokio::spawn(async move {
1121 time_it!("test encode no parallel", {
1122 encoder.encode(entry_rx).await.unwrap();
1123 });
1124 });
1125
1126 tokio::spawn(async move {
1128 let entries = entries.lock().await;
1129 for entry in entries.iter() {
1130 entry_tx
1131 .send(MetaAttached {
1132 inner: entry.clone(),
1133 meta: EntryMeta::new(),
1134 })
1135 .await
1136 .unwrap();
1137 }
1138 drop(entry_tx);
1139 tracing::info!("all entries sent");
1140 });
1141
1142 let mut result = Vec::new();
1148 while let Some(chunk) = rx.recv().await {
1149 result.extend(chunk);
1150 }
1151
1152 let pack_size = result.len();
1153 let compression_rate = if total_original_size > 0 {
1154 1.0 - (pack_size as f64 / total_original_size as f64)
1155 } else {
1156 0.0
1157 };
1158
1159 let duration = start.elapsed();
1160 tracing::info!("test executed in: {:.2?}", duration);
1161 tracing::info!("new pack file size: {}", pack_size);
1162 tracing::info!("original total size: {}", total_original_size);
1163 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1164 tracing::info!(
1165 "space saved: {} bytes",
1166 total_original_size.saturating_sub(pack_size)
1167 );
1168 }
1169 #[tokio::test]
1170 async fn test_pack_encoder_large_file_sha256() {
1171 let _guard = set_hash_kind_for_test(HashKind::Sha256);
1172 init_logger();
1173 let entries = get_entries_for_test_sha256().await;
1174 let entries_number = entries.lock().await.len();
1175
1176 let total_original_size: usize = entries
1177 .lock()
1178 .await
1179 .iter()
1180 .map(|entry| entry.data.len())
1181 .sum();
1182
1183 let start = Instant::now();
1184 let (tx, mut rx) = mpsc::channel(100_000);
1186 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1187
1188 let mut encoder = PackEncoder::new(entries_number, 0, tx);
1189 tokio::spawn(async move {
1190 time_it!("test encode no parallel sha256", {
1191 encoder.encode(entry_rx).await.unwrap();
1192 });
1193 });
1194
1195 tokio::spawn(async move {
1197 let entries = entries.lock().await;
1198 for entry in entries.iter() {
1199 entry_tx
1200 .send(MetaAttached {
1201 inner: entry.clone(),
1202 meta: EntryMeta::new(),
1203 })
1204 .await
1205 .unwrap();
1206 }
1207 drop(entry_tx);
1208 tracing::info!("all entries sent");
1209 });
1210
1211 let mut result = Vec::new();
1217 while let Some(chunk) = rx.recv().await {
1218 result.extend(chunk);
1219 }
1220
1221 let pack_size = result.len();
1222 let compression_rate = if total_original_size > 0 {
1223 1.0 - (pack_size as f64 / total_original_size as f64)
1224 } else {
1225 0.0
1226 };
1227
1228 let duration = start.elapsed();
1229 tracing::info!("test executed in: {:.2?}", duration);
1230 tracing::info!("new pack file size: {}", pack_size);
1231 tracing::info!("original total size: {}", total_original_size);
1232 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1233 tracing::info!(
1234 "space saved: {} bytes",
1235 total_original_size.saturating_sub(pack_size)
1236 );
1237 }
1238
1239 #[tokio::test]
1240 async fn test_pack_encoder_with_zstdelta() {
1241 let _guard = set_hash_kind_for_test(HashKind::Sha1);
1242 init_logger();
1243 let entries = get_entries_for_test().await;
1244 let entries_number = entries.lock().await.len();
1245
1246 let total_original_size: usize = entries
1247 .lock()
1248 .await
1249 .iter()
1250 .map(|entry| entry.data.len())
1251 .sum();
1252
1253 let start = Instant::now();
1254 let (tx, mut rx) = mpsc::channel(100_000);
1255 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1256
1257 let encoder = PackEncoder::new(entries_number, 10, tx);
1258 encoder.encode_async_with_zstdelta(entry_rx).await.unwrap();
1259
1260 tokio::spawn(async move {
1262 let entries = entries.lock().await;
1263 for entry in entries.iter() {
1264 entry_tx
1265 .send(MetaAttached {
1266 inner: entry.clone(),
1267 meta: EntryMeta::new(),
1268 })
1269 .await
1270 .unwrap();
1271 }
1272 drop(entry_tx);
1273 tracing::info!("all entries sent");
1274 });
1275
1276 let mut result = Vec::new();
1277 while let Some(chunk) = rx.recv().await {
1278 result.extend(chunk);
1279 }
1280
1281 let pack_size = result.len();
1282 let compression_rate = if total_original_size > 0 {
1283 1.0 - (pack_size as f64 / total_original_size as f64)
1284 } else {
1285 0.0
1286 };
1287
1288 let duration = start.elapsed();
1289 tracing::info!("test executed in: {:.2?}", duration);
1290 tracing::info!("new pack file size: {}", pack_size);
1291 tracing::info!("original total size: {}", total_original_size);
1292 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1293 tracing::info!(
1294 "space saved: {} bytes",
1295 total_original_size.saturating_sub(pack_size)
1296 );
1297
1298 check_format(&result);
1300 }
1301 #[tokio::test]
1302 async fn test_pack_encoder_with_zstdelta_sha256() {
1303 let _guard = set_hash_kind_for_test(HashKind::Sha256);
1304 init_logger();
1305 let entries = get_entries_for_test_sha256().await;
1306 let entries_number = entries.lock().await.len();
1307
1308 let total_original_size: usize = entries
1309 .lock()
1310 .await
1311 .iter()
1312 .map(|entry| entry.data.len())
1313 .sum();
1314
1315 let start = Instant::now();
1316 let (tx, mut rx) = mpsc::channel(100_000);
1317 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1318
1319 let encoder = PackEncoder::new(entries_number, 10, tx);
1320 encoder.encode_async_with_zstdelta(entry_rx).await.unwrap();
1321
1322 tokio::spawn(async move {
1324 let entries = entries.lock().await;
1325 for entry in entries.iter() {
1326 entry_tx
1327 .send(MetaAttached {
1328 inner: entry.clone(),
1329 meta: EntryMeta::new(),
1330 })
1331 .await
1332 .unwrap();
1333 }
1334 drop(entry_tx);
1335 tracing::info!("all entries sent");
1336 });
1337
1338 let mut result = Vec::new();
1339 while let Some(chunk) = rx.recv().await {
1340 result.extend(chunk);
1341 }
1342
1343 let pack_size = result.len();
1344 let compression_rate = if total_original_size > 0 {
1345 1.0 - (pack_size as f64 / total_original_size as f64)
1346 } else {
1347 0.0
1348 };
1349
1350 let duration = start.elapsed();
1351 tracing::info!("test executed in: {:.2?}", duration);
1352 tracing::info!("new pack file size: {}", pack_size);
1353 tracing::info!("original total size: {}", total_original_size);
1354 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1355 tracing::info!(
1356 "space saved: {} bytes",
1357 total_original_size.saturating_sub(pack_size)
1358 );
1359
1360 check_format(&result);
1362 }
1363
1364 #[test]
1365 fn test_encode_offset() {
1366 let value = 16389;
1368
1369 let data = encode_offset(value);
1370 println!("{data:?}");
1371 let mut reader = Cursor::new(data);
1372 let (result, _) = read_offset_encoding(&mut reader).unwrap();
1373 println!("result: {result}");
1374 assert_eq!(result, value as u64);
1375 }
1376
1377 #[tokio::test]
1378 async fn test_pack_encoder_large_file_with_delta() {
1379 let _guard = set_hash_kind_for_test(HashKind::Sha1);
1380 init_logger();
1381 let entries = get_entries_for_test().await;
1382 let entries_number = entries.lock().await.len();
1383
1384 let total_original_size: usize = entries
1385 .lock()
1386 .await
1387 .iter()
1388 .map(|entry| entry.data.len())
1389 .sum();
1390
1391 let (tx, mut rx) = mpsc::channel(100_000);
1392 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1393
1394 let encoder = PackEncoder::new(entries_number, 10, tx);
1395
1396 let start = Instant::now(); encoder.encode_async(entry_rx).await.unwrap();
1398
1399 tokio::spawn(async move {
1401 let entries = entries.lock().await;
1402 for entry in entries.iter() {
1403 entry_tx
1404 .send(MetaAttached {
1405 inner: entry.clone(),
1406 meta: EntryMeta::new(),
1407 })
1408 .await
1409 .unwrap();
1410 }
1411 drop(entry_tx);
1412 tracing::info!("all entries sent");
1413 });
1414
1415 let mut result = Vec::new();
1416 while let Some(chunk) = rx.recv().await {
1417 result.extend(chunk);
1418 }
1419
1420 let pack_size = result.len();
1421 let compression_rate = if total_original_size > 0 {
1422 1.0 - (pack_size as f64 / total_original_size as f64)
1423 } else {
1424 0.0
1425 };
1426
1427 let duration = start.elapsed();
1428 tracing::info!("test executed in: {:.2?}", duration);
1429 tracing::info!("new pack file size: {}", pack_size);
1430 tracing::info!("original total size: {}", total_original_size);
1431 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1432 tracing::info!(
1433 "space saved: {} bytes",
1434 total_original_size.saturating_sub(pack_size)
1435 );
1436
1437 check_format(&result);
1439 }
1440 #[tokio::test]
1441 async fn test_pack_encoder_large_file_with_delta_sha256() {
1442 let _guard = set_hash_kind_for_test(HashKind::Sha256);
1443 init_logger();
1444 let entries = get_entries_for_test_sha256().await;
1445 let entries_number = entries.lock().await.len();
1446
1447 let total_original_size: usize = entries
1448 .lock()
1449 .await
1450 .iter()
1451 .map(|entry| entry.data.len())
1452 .sum();
1453
1454 let (tx, mut rx) = mpsc::channel(100_000);
1455 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1456
1457 let encoder = PackEncoder::new(entries_number, 10, tx);
1458
1459 let start = Instant::now(); encoder.encode_async(entry_rx).await.unwrap();
1461
1462 tokio::spawn(async move {
1464 let entries = entries.lock().await;
1465 for entry in entries.iter() {
1466 entry_tx
1467 .send(MetaAttached {
1468 inner: entry.clone(),
1469 meta: EntryMeta::new(),
1470 })
1471 .await
1472 .unwrap();
1473 }
1474 drop(entry_tx);
1475 tracing::info!("all entries sent");
1476 });
1477
1478 let mut result = Vec::new();
1479 while let Some(chunk) = rx.recv().await {
1480 result.extend(chunk);
1481 }
1482
1483 let pack_size = result.len();
1484 let compression_rate = if total_original_size > 0 {
1485 1.0 - (pack_size as f64 / total_original_size as f64)
1486 } else {
1487 0.0
1488 };
1489
1490 let duration = start.elapsed();
1491 tracing::info!("test executed in: {:.2?}", duration);
1492 tracing::info!("new pack file size: {}", pack_size);
1493 tracing::info!("original total size: {}", total_original_size);
1494 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1495 tracing::info!(
1496 "space saved: {} bytes",
1497 total_original_size.saturating_sub(pack_size)
1498 );
1499
1500 check_format(&result);
1502 }
1503
1504 #[tokio::test]
1505 async fn test_pack_encoder_output_to_files() {
1506 let _guard = set_hash_kind_for_test(HashKind::Sha1);
1507 init_logger();
1508 let entries = get_entries_for_test().await;
1509 let entries_number = entries.lock().await.len();
1510
1511 let total_original_size: usize = entries
1512 .lock()
1513 .await
1514 .iter()
1515 .map(|entry| entry.data.len())
1516 .sum();
1517
1518 let start = Instant::now();
1519
1520 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1521 let dir = tempdir().unwrap();
1523 let path = dir.path();
1524
1525 tokio::spawn(async move {
1527 let entries = entries.lock().await;
1528 for entry in entries.iter() {
1529 entry_tx
1530 .send(MetaAttached {
1531 inner: entry.clone(),
1532 meta: EntryMeta::new(),
1533 })
1534 .await
1535 .unwrap();
1536 }
1537 drop(entry_tx);
1538 tracing::info!("all entries sent");
1539 });
1540
1541 encode_and_output_to_files(entry_rx, entries_number, path.to_path_buf(), 0)
1542 .await
1543 .unwrap();
1544
1545 let mut pack_file = None;
1547 let mut idx_file = None;
1548 for entry in std::fs::read_dir(path).unwrap() {
1549 let entry = entry.unwrap();
1550 let file_name = entry.file_name();
1551 tracing::info!("file name: {:?}", file_name);
1552 let file_name = file_name.to_string_lossy();
1553 if file_name.ends_with(".pack") {
1554 pack_file = Some(entry.path());
1555 } else if file_name.ends_with(".idx") {
1556 idx_file = Some(entry.path());
1557 }
1558 }
1559 let pack_file = pack_file.expect("pack file not generated");
1560 let idx_file = idx_file.expect("idx file not generated");
1561 assert!(
1562 pack_file.metadata().unwrap().len() > 0,
1563 "pack file is empty"
1564 );
1565 assert!(idx_file.metadata().unwrap().len() > 0, "idx file is empty");
1566
1567 let duration = start.elapsed();
1568 tracing::info!("test executed in: {:.2?}", duration);
1569 tracing::info!("original total size: {}", total_original_size);
1570 }
1571
1572 #[tokio::test]
1573 async fn test_pack_encoder_output_to_files_with_delta() {
1574 let _guard = set_hash_kind_for_test(HashKind::Sha1);
1575 init_logger();
1576 let entries = get_entries_for_test().await;
1577 let entries_number = entries.lock().await.len();
1578
1579 let total_original_size: usize = entries
1580 .lock()
1581 .await
1582 .iter()
1583 .map(|entry| entry.data.len())
1584 .sum();
1585
1586 let start = Instant::now();
1587
1588 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1589 let dir = tempdir().unwrap();
1591 let path = dir.path();
1592
1593 tokio::spawn(async move {
1595 let entries = entries.lock().await;
1596 for entry in entries.iter() {
1597 entry_tx
1598 .send(MetaAttached {
1599 inner: entry.clone(),
1600 meta: EntryMeta::new(),
1601 })
1602 .await
1603 .unwrap();
1604 }
1605 drop(entry_tx);
1606 tracing::info!("all entries sent");
1607 });
1608
1609 encode_and_output_to_files(entry_rx, entries_number, path.to_path_buf(), 10)
1610 .await
1611 .unwrap();
1612
1613 let mut pack_file = None;
1615 let mut idx_file = None;
1616 for entry in std::fs::read_dir(path).unwrap() {
1617 let entry = entry.unwrap();
1618 let file_name = entry.file_name();
1619 tracing::info!("file name: {:?}", file_name);
1620 let file_name = file_name.to_string_lossy();
1621 if file_name.ends_with(".pack") {
1622 pack_file = Some(entry.path());
1623 } else if file_name.ends_with(".idx") {
1624 idx_file = Some(entry.path());
1625 }
1626 }
1627 let pack_file = pack_file.expect("pack file not generated");
1628 let idx_file = idx_file.expect("idx file not generated");
1629 assert!(
1630 pack_file.metadata().unwrap().len() > 0,
1631 "pack file is empty"
1632 );
1633 assert!(idx_file.metadata().unwrap().len() > 0, "idx file is empty");
1634
1635 let duration = start.elapsed();
1636 tracing::info!("test executed in: {:.2?}", duration);
1637 tracing::info!("original total size: {}", total_original_size);
1638 }
1639}