1use std::cmp::Ordering;
2use std::collections::VecDeque;
3use std::io::Write;
4
5use crate::delta;
6use crate::internal::metadata::{EntryMeta, MetaAttached};
7use crate::internal::object::types::ObjectType;
8use crate::time_it;
9use crate::zstdelta;
10use crate::{
11 errors::GitError, hash::ObjectHash, internal::pack::entry::Entry, utils::HashAlgorithm,
12};
13use ahash::AHasher;
14use flate2::write::ZlibEncoder;
15use natord::compare;
16use rayon::prelude::*;
17
18use std::hash::{Hash, Hasher};
19use std::path::Path;
20use tokio::sync::mpsc;
21use tokio::task::JoinHandle;
22
23const MAX_CHAIN_LEN: usize = 50;
24const MIN_DELTA_RATE: f64 = 0.5; pub struct PackEncoder {
29 object_number: usize,
30 process_index: usize,
31 window_size: usize,
32 sender: Option<mpsc::Sender<Vec<u8>>>,
34 inner_offset: usize, inner_hash: HashAlgorithm, final_hash: Option<ObjectHash>,
37 start_encoding: bool,
38}
39
40fn encode_header(object_number: usize) -> Vec<u8> {
43 let mut result: Vec<u8> = vec![
44 b'P', b'A', b'C', b'K', 0, 0, 0, 2, ];
47 assert_ne!(object_number, 0); assert!(object_number < (1 << 32));
49 result.append((object_number as u32).to_be_bytes().to_vec().as_mut()); result
52}
53
54fn encode_offset(mut value: usize) -> Vec<u8> {
56 assert_ne!(value, 0, "offset can't be zero");
57 let mut bytes = Vec::new();
58
59 bytes.push((value & 0x7F) as u8);
60 value >>= 7;
61 while value != 0 {
62 value -= 1;
63 let byte = (value & 0x7F) as u8 | 0x80; value >>= 7;
65 bytes.push(byte);
66 }
67 bytes.reverse();
68 bytes
69}
70
71fn encode_one_object(entry: &Entry, offset: Option<usize>) -> Result<Vec<u8>, GitError> {
74 let obj_data = &entry.data;
76 let obj_data_len = obj_data.len();
77 let obj_type_number = entry.obj_type.to_u8();
78
79 let mut encoded_data = Vec::new();
80
81 let mut header_data = vec![(0x80 | (obj_type_number << 4)) + (obj_data_len & 0x0f) as u8];
83 let mut size = obj_data_len >> 4; if size > 0 {
85 while size > 0 {
86 if size >> 7 > 0 {
87 header_data.push((0x80 | size) as u8);
88 size >>= 7;
89 } else {
90 header_data.push(size as u8);
91 break;
92 }
93 }
94 } else {
95 header_data.push(0);
96 }
97 encoded_data.extend(header_data);
98
99 if entry.obj_type == ObjectType::OffsetDelta || entry.obj_type == ObjectType::OffsetZstdelta {
101 let offset_data = encode_offset(offset.unwrap());
102 encoded_data.extend(offset_data);
103 } else if entry.obj_type == ObjectType::HashDelta {
104 unreachable!("unsupported type")
105 }
106
107 let mut inflate = ZlibEncoder::new(Vec::new(), flate2::Compression::default());
109 inflate
110 .write_all(obj_data)
111 .expect("zlib compress should never failed");
112 inflate.flush().expect("zlib flush should never failed");
113 let compressed_data = inflate.finish().expect("zlib compress should never failed");
114 encoded_data.extend(compressed_data);
116 Ok(encoded_data)
117}
118
119fn magic_sort(a: &MetaAttached<Entry, EntryMeta>, b: &MetaAttached<Entry, EntryMeta>) -> Ordering {
120 let path_a = a.meta.file_path.as_ref();
121 let path_b = b.meta.file_path.as_ref();
122
123 match (path_a, path_b) {
125 (Some(pa), Some(pb)) => {
126 let pa = Path::new(pa);
127 let pb = Path::new(pb);
128
129 let dir_ord = pa.parent().cmp(&pb.parent());
131 if dir_ord != Ordering::Equal {
132 return dir_ord;
133 }
134
135 let name_a = pa.file_name().unwrap_or_default().to_string_lossy();
137 let name_b = pb.file_name().unwrap_or_default().to_string_lossy();
138 let name_ord = compare(&name_a, &name_b);
139 if name_ord != Ordering::Equal {
140 return name_ord;
141 }
142 }
143 (Some(_), None) => return Ordering::Less, (None, Some(_)) => return Ordering::Greater, (None, None) => {}
146 }
147
148 let ord = b.inner.data.len().cmp(&a.inner.data.len());
159 if ord != Ordering::Equal {
160 return ord;
161 }
162
163 (a as *const MetaAttached<Entry, EntryMeta>).cmp(&(b as *const MetaAttached<Entry, EntryMeta>))
165}
166
167fn calc_hash(data: &[u8]) -> u64 {
168 let mut hasher = AHasher::default();
169 data.hash(&mut hasher);
170 hasher.finish()
171}
172
173fn cheap_similar(a: &[u8], b: &[u8]) -> bool {
174 let k = a.len().min(b.len()).min(128);
175 if k == 0 {
176 return false;
177 }
178 calc_hash(&a[..k]) == calc_hash(&b[..k])
179}
180
181impl PackEncoder {
182 pub fn new(object_number: usize, window_size: usize, sender: mpsc::Sender<Vec<u8>>) -> Self {
183 PackEncoder {
184 object_number,
185 window_size,
186 process_index: 0,
187 sender: Some(sender),
189 inner_offset: 12, inner_hash: HashAlgorithm::new(), final_hash: None,
192 start_encoding: false,
193 }
194 }
195
196 pub fn drop_sender(&mut self) {
197 self.sender.take(); }
199
200 pub async fn send_data(&mut self, data: Vec<u8>) {
201 if let Some(sender) = &self.sender {
202 sender.send(data).await.unwrap();
203 }
204 }
205
206 pub fn get_hash(&self) -> Option<ObjectHash> {
208 self.final_hash
209 }
210
211 pub async fn encode(
219 &mut self,
220 entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
221 ) -> Result<(), GitError> {
222 self.inner_encode(entry_rx, false).await
223 }
224
225 pub async fn encode_with_zstdelta(
226 &mut self,
227 entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
228 ) -> Result<(), GitError> {
229 self.inner_encode(entry_rx, true).await
230 }
231
232 async fn inner_encode(
235 &mut self,
236 mut entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
237 enable_zstdelta: bool,
238 ) -> Result<(), GitError> {
239 let head = encode_header(self.object_number);
240 self.send_data(head.clone()).await;
241 self.inner_hash.update(&head);
242
243 if self.start_encoding {
245 return Err(GitError::PackEncodeError(
246 "encoding operation is already in progress".to_string(),
247 ));
248 }
249
250 let mut commits: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
251 let mut trees: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
252 let mut blobs: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
253 let mut tags: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
254 while let Some(entry) = entry_rx.recv().await {
255 match entry.inner.obj_type {
256 ObjectType::Commit => {
257 commits.push(entry);
258 }
259 ObjectType::Tree => {
260 trees.push(entry);
261 }
262 ObjectType::Blob => {
263 blobs.push(entry);
264 }
265 ObjectType::Tag => {
266 tags.push(entry);
267 }
268 _ => {}
269 }
270 }
271
272 commits.sort_by(magic_sort);
273 trees.sort_by(magic_sort);
274 blobs.sort_by(magic_sort);
275 tags.sort_by(magic_sort);
276 tracing::info!(
277 "numbers : commits: {:?} trees: {:?} blobs:{:?} tag :{:?}",
278 commits.len(),
279 trees.len(),
280 blobs.len(),
281 tags.len()
282 );
283
284 let (commit_results, tree_results, blob_results, tag_results) = tokio::try_join!(
286 tokio::task::spawn_blocking(move || {
287 Self::try_as_offset_delta(
288 commits
289 .into_iter()
290 .map(|entry_with_meta| entry_with_meta.inner)
291 .collect(),
292 10,
293 enable_zstdelta,
294 )
295 }),
296 tokio::task::spawn_blocking(move || {
297 Self::try_as_offset_delta(
298 trees
299 .into_iter()
300 .map(|entry_with_meta| entry_with_meta.inner)
301 .collect(),
302 10,
303 enable_zstdelta,
304 )
305 }),
306 tokio::task::spawn_blocking(move || {
307 Self::try_as_offset_delta(
308 blobs
309 .into_iter()
310 .map(|entry_with_meta| entry_with_meta.inner)
311 .collect(),
312 10,
313 enable_zstdelta,
314 )
315 }),
316 tokio::task::spawn_blocking(move || {
317 Self::try_as_offset_delta(
318 tags.into_iter()
319 .map(|entry_with_meta| entry_with_meta.inner)
320 .collect(),
321 10,
322 enable_zstdelta,
323 )
324 }),
325 )
326 .map_err(|e| GitError::PackEncodeError(format!("Task join error: {e}")))?;
327
328 let all_encoded_data = [
329 commit_results
330 .map_err(|e| GitError::PackEncodeError(format!("Commit encoding error: {e}")))?,
331 tree_results
332 .map_err(|e| GitError::PackEncodeError(format!("Tree encoding error: {e}")))?,
333 blob_results
334 .map_err(|e| GitError::PackEncodeError(format!("Blob encoding error: {e}")))?,
335 tag_results
336 .map_err(|e| GitError::PackEncodeError(format!("Tag encoding error: {e}")))?,
337 ]
338 .concat();
339
340 for data in all_encoded_data {
341 self.write_all_and_update(&data).await;
342 }
343
344 let hash_result = self.inner_hash.clone().finalize();
346 self.final_hash = Some(ObjectHash::from_bytes(&hash_result).unwrap());
347 self.send_data(hash_result.to_vec()).await;
348
349 self.drop_sender();
350 Ok(())
351 }
352
353 fn try_as_offset_delta(
361 mut bucket: Vec<Entry>,
362 window_size: usize,
363 enable_zstdelta: bool,
364 ) -> Result<Vec<Vec<u8>>, GitError> {
365 let mut current_offset = 0usize;
366 let mut window: VecDeque<(Entry, usize)> = VecDeque::with_capacity(window_size);
367 let mut res: Vec<Vec<u8>> = Vec::new();
368
369 for entry in bucket.iter_mut() {
370 let mut best_base: Option<&(Entry, usize)> = None;
373 let mut best_rate: f64 = 0.0;
374 let tie_epsilon: f64 = 0.15;
375
376 let candidates: Vec<_> = window
377 .par_iter()
378 .with_min_len(3)
379 .filter_map(|try_base| {
380 if try_base.0.obj_type != entry.obj_type {
381 return None;
382 }
383
384 if try_base.0.chain_len >= MAX_CHAIN_LEN {
385 return None;
386 }
387
388 if try_base.0.hash == entry.hash {
389 return None;
390 }
391
392 let sym_ratio = (try_base.0.data.len().min(entry.data.len()) as f64)
393 / (try_base.0.data.len().max(entry.data.len()) as f64);
394 if sym_ratio < 0.5 {
395 return None;
396 }
397
398 if !cheap_similar(&try_base.0.data, &entry.data) {
399 return None;
400 }
401
402 let rate = if (try_base.0.data.len() + entry.data.len()) / 2 > 64 {
403 delta::heuristic_encode_rate_parallel(&try_base.0.data, &entry.data)
404 } else {
405 delta::encode_rate(&try_base.0.data, &entry.data)
406 };
409
410 if rate > MIN_DELTA_RATE {
411 Some((rate, try_base))
412 } else {
413 None
414 }
415 })
416 .collect();
417
418 for (rate, try_base) in candidates {
419 match best_base {
420 None => {
421 best_rate = rate;
422 best_base = Some(try_base);
424 }
425 Some(best_base_ref) => {
426 let is_better = if rate > best_rate + tie_epsilon {
427 true
428 } else if (rate - best_rate).abs() <= tie_epsilon {
429 try_base.0.chain_len > best_base_ref.0.chain_len
430 } else {
431 false
432 };
433
434 if is_better {
435 best_rate = rate;
436 best_base = Some(try_base);
437 }
438 }
439 }
440 }
441
442 let mut entry_for_window = entry.clone();
443
444 let offset = best_base.map(|best_base| {
445 let delta = if enable_zstdelta {
446 entry.obj_type = ObjectType::OffsetZstdelta;
447 zstdelta::diff(&best_base.0.data, &entry.data)
448 .map_err(|e| {
449 GitError::DeltaObjectError(format!("zstdelta diff failed: {e}"))
450 })
451 .unwrap()
452 } else {
453 entry.obj_type = ObjectType::OffsetDelta;
454 delta::encode(&best_base.0.data, &entry.data)
455 };
456 entry.data = delta;
458 entry.chain_len = best_base.0.chain_len + 1;
459 current_offset - best_base.1
460 });
461
462 entry_for_window.chain_len = entry.chain_len;
463 let obj_data = encode_one_object(entry, offset)?;
464 window.push_back((entry_for_window, current_offset));
465 if window.len() > window_size {
466 window.pop_front();
467 }
468 current_offset += obj_data.len();
469 res.push(obj_data);
470 }
471 Ok(res)
472 }
473
474 pub async fn parallel_encode(
476 &mut self,
477 mut entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
478 ) -> Result<(), GitError> {
479 if self.window_size != 0 {
480 return Err(GitError::PackEncodeError(
481 "parallel encode only works when window_size == 0".to_string(),
482 ));
483 }
484
485 let head = encode_header(self.object_number);
486 self.send_data(head.clone()).await;
487 self.inner_hash.update(&head);
488
489 if self.start_encoding {
491 return Err(GitError::PackEncodeError(
492 "encoding operation is already in progress".to_string(),
493 ));
494 }
495
496 let batch_size = usize::max(1000, entry_rx.max_capacity() / 10); tracing::info!("encode with batch size: {}", batch_size);
498 loop {
499 let mut batch_entries = Vec::with_capacity(batch_size);
500 time_it!("parallel encode: receive batch", {
501 for _ in 0..batch_size {
502 match entry_rx.recv().await {
503 Some(entry) => {
504 batch_entries.push(entry.inner);
505 self.process_index += 1;
506 }
507 None => break,
508 }
509 }
510 });
511
512 if batch_entries.is_empty() {
513 break;
514 }
515
516 let batch_result: Vec<Vec<u8>> = time_it!("parallel encode: encode batch", {
518 batch_entries
519 .par_iter()
520 .map(|entry| encode_one_object(entry, None).unwrap())
521 .collect()
522 });
523
524 time_it!("parallel encode: write batch", {
525 for obj_data in batch_result {
526 self.write_all_and_update(&obj_data).await;
527 }
528 });
529 }
530
531 if self.process_index != self.object_number {
532 panic!(
533 "not all objects are encoded, process:{}, total:{}",
534 self.process_index, self.object_number
535 );
536 }
537
538 let hash_result = self.inner_hash.clone().finalize();
540 self.final_hash = Some(ObjectHash::from_bytes(&hash_result).unwrap());
541 self.send_data(hash_result.to_vec()).await;
542 self.drop_sender();
543 Ok(())
544 }
545
546 async fn write_all_and_update(&mut self, data: &[u8]) {
548 self.inner_hash.update(data);
549 self.inner_offset += data.len();
550 self.send_data(data.to_vec()).await;
551 }
552
553 pub async fn encode_async(
559 mut self,
560 rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
561 ) -> Result<JoinHandle<()>, GitError> {
562 Ok(tokio::spawn(async move {
563 if self.window_size == 0 {
564 self.parallel_encode(rx).await.unwrap()
565 } else {
566 self.encode(rx).await.unwrap()
567 }
568 }))
569 }
570
571 pub async fn encode_async_with_zstdelta(
572 mut self,
573 rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
574 ) -> Result<JoinHandle<()>, GitError> {
575 Ok(tokio::spawn(async move {
576 self.encode_with_zstdelta(rx).await.unwrap()
578 }))
579 }
580}
581
582#[cfg(test)]
583mod tests {
584 use std::env;
585 use std::sync::Arc;
586 use std::time::Instant;
587 use std::{io::Cursor, path::PathBuf};
588 use tokio::sync::Mutex;
589
590 use super::*;
591 use crate::hash::{HashKind, ObjectHash, set_hash_kind_for_test};
592 use crate::internal::object::blob::Blob;
593 use crate::internal::pack::utils::read_offset_encoding;
594 use crate::internal::pack::{Pack, tests::init_logger};
595 use crate::time_it;
596
597 fn check_format(data: &Vec<u8>) {
598 let mut p = Pack::new(
599 None,
600 Some(1024 * 1024 * 1024 * 6), Some(PathBuf::from("/tmp/.cache_temp")),
602 true,
603 );
604 let mut reader = Cursor::new(data);
605 tracing::debug!("start check format");
606 p.decode(&mut reader, |_| {}, None::<fn(ObjectHash)>)
607 .expect("pack file format error");
608 }
609
610 #[tokio::test]
611 async fn test_pack_encoder() {
612 let _guard = set_hash_kind_for_test(HashKind::Sha1);
613 async fn encode_once(window_size: usize) -> Vec<u8> {
614 let (tx, mut rx) = mpsc::channel(100);
615 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
616
617 let str_vec = vec!["hello, word", "hello, world.", "!", "123141251251"];
619 let encoder = PackEncoder::new(str_vec.len(), window_size, tx);
620 encoder.encode_async(entry_rx).await.unwrap();
621
622 for str in str_vec {
623 let blob = Blob::from_content(str);
624 let entry: Entry = blob.into();
625 entry_tx
626 .send(MetaAttached {
627 inner: entry,
628 meta: EntryMeta::new(),
629 })
630 .await
631 .unwrap();
632 }
633 drop(entry_tx);
634 let mut result = Vec::new();
636 while let Some(chunk) = rx.recv().await {
637 result.extend(chunk);
638 }
639 result
640 }
641
642 let pack_without_delta = encode_once(0).await;
644 let pack_without_delta_size = pack_without_delta.len();
645 check_format(&pack_without_delta);
646
647 let pack_with_delta = encode_once(4).await;
649 assert!(pack_with_delta.len() <= pack_without_delta_size);
650 check_format(&pack_with_delta);
651 }
652 #[tokio::test]
653 async fn test_pack_encoder_sha256() {
654 let _guard = set_hash_kind_for_test(HashKind::Sha256);
655
656 async fn encode_once(window_size: usize) -> Vec<u8> {
657 let (tx, mut rx) = mpsc::channel(100);
658 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
659
660 let str_vec = vec!["hello, word", "hello, world.", "!", "123141251251"];
661 let encoder = PackEncoder::new(str_vec.len(), window_size, tx);
662 encoder.encode_async(entry_rx).await.unwrap();
663
664 for s in str_vec {
665 let blob = Blob::from_content(s);
666 let entry: Entry = blob.into();
667 entry_tx
668 .send(MetaAttached {
669 inner: entry,
670 meta: EntryMeta::new(),
671 })
672 .await
673 .unwrap();
674 }
675 drop(entry_tx);
676
677 let mut result = Vec::new();
678 while let Some(chunk) = rx.recv().await {
679 result.extend(chunk);
680 }
681 result
682 }
683
684 let pack_without_delta = encode_once(0).await;
686 let pack_without_delta_size = pack_without_delta.len();
687 check_format(&pack_without_delta);
688
689 let pack_with_delta = encode_once(4).await;
691 assert!(pack_with_delta.len() <= pack_without_delta_size);
692 check_format(&pack_with_delta);
693 }
694
695 async fn get_entries_for_test() -> Arc<Mutex<Vec<Entry>>> {
696 let source = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
697 .join("tests/data/packs/pack-f8bbb573cef7d851957caceb491c073ee8e8de41.pack");
698
699 let mut p = Pack::new(None, None, Some(PathBuf::from("/tmp/.cache_temp")), true);
700
701 let f = std::fs::File::open(&source).unwrap();
702 tracing::info!("pack file size: {}", f.metadata().unwrap().len());
703 let mut reader = std::io::BufReader::new(f);
704 let entries = Arc::new(Mutex::new(Vec::new()));
705 let entries_clone = entries.clone();
706 p.decode(
707 &mut reader,
708 move |entry| {
709 let mut entries = entries_clone.blocking_lock();
710 entries.push(entry.inner);
711 },
712 None::<fn(ObjectHash)>,
713 )
714 .unwrap();
715 assert_eq!(p.number, entries.lock().await.len());
716 tracing::info!("total entries: {}", p.number);
717 drop(p);
718
719 entries
720 }
721 async fn get_entries_for_test_sha256() -> Arc<Mutex<Vec<Entry>>> {
722 let source = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
723 .join("tests/data/packs/pack-78047853c60a1a3bb587f59598bdeb773fefc821f6f60f4f4797644ad43dad3d.pack");
724
725 let mut p = Pack::new(None, None, Some(PathBuf::from("/tmp/.cache_temp")), true);
726
727 let f = std::fs::File::open(&source).unwrap();
728 tracing::info!("pack file size: {}", f.metadata().unwrap().len());
729 let mut reader = std::io::BufReader::new(f);
730 let entries = Arc::new(Mutex::new(Vec::new()));
731 let entries_clone = entries.clone();
732 p.decode(
733 &mut reader,
734 move |entry| {
735 let mut entries = entries_clone.blocking_lock();
736 entries.push(entry.inner);
737 },
738 None::<fn(ObjectHash)>,
739 )
740 .unwrap();
741 assert_eq!(p.number, entries.lock().await.len());
742 tracing::info!("total entries: {}", p.number);
743 drop(p);
744
745 entries
746 }
747
748 #[tokio::test]
749 async fn test_pack_encoder_parallel_large_file() {
750 let _guard = set_hash_kind_for_test(HashKind::Sha1);
751 init_logger();
752
753 let start = Instant::now();
754 let entries = get_entries_for_test().await;
755 let entries_number = entries.lock().await.len();
756
757 let total_original_size: usize = entries
758 .lock()
759 .await
760 .iter()
761 .map(|entry| entry.data.len())
762 .sum();
763
764 let (tx, mut rx) = mpsc::channel(1_000_000);
766 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1_000_000);
767
768 let mut encoder = PackEncoder::new(entries_number, 0, tx);
769 tokio::spawn(async move {
770 time_it!("test parallel encode", {
771 encoder.parallel_encode(entry_rx).await.unwrap();
772 });
773 });
774
775 tokio::spawn(async move {
777 let entries = entries.lock().await;
778 for entry in entries.iter() {
779 entry_tx
780 .send(MetaAttached {
781 inner: entry.clone(),
782 meta: EntryMeta::new(),
783 })
784 .await
785 .unwrap();
786 }
787 drop(entry_tx);
788 tracing::info!("all entries sent");
789 });
790
791 let mut result = Vec::new();
792 while let Some(chunk) = rx.recv().await {
793 result.extend(chunk);
794 }
795
796 let pack_size = result.len();
797 let compression_rate = if total_original_size > 0 {
798 1.0 - (pack_size as f64 / total_original_size as f64)
799 } else {
800 0.0
801 };
802
803 let duration = start.elapsed();
804 tracing::info!("test executed in: {:.2?}", duration);
805 tracing::info!("new pack file size: {}", result.len());
806 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
807 check_format(&result);
809 }
810 #[tokio::test]
811 async fn test_pack_encoder_parallel_large_file_sha256() {
812 let _guard = set_hash_kind_for_test(HashKind::Sha256);
813 init_logger();
814
815 let start = Instant::now();
816 let entries = get_entries_for_test_sha256().await;
818 let entries_number = entries.lock().await.len();
819
820 let total_original_size: usize = entries
821 .lock()
822 .await
823 .iter()
824 .map(|entry| entry.data.len())
825 .sum();
826
827 let (tx, mut rx) = mpsc::channel(1_000_000);
828 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1_000_000);
829
830 let mut encoder = PackEncoder::new(entries_number, 0, tx);
831 tokio::spawn(async move {
832 time_it!("test parallel encode sha256", {
833 encoder.parallel_encode(entry_rx).await.unwrap();
834 });
835 });
836
837 tokio::spawn(async move {
838 let entries = entries.lock().await;
839 for entry in entries.iter() {
840 entry_tx
841 .send(MetaAttached {
842 inner: entry.clone(),
843 meta: EntryMeta::new(),
844 })
845 .await
846 .unwrap();
847 }
848 drop(entry_tx);
849 tracing::info!("all entries sent");
850 });
851
852 let mut result = Vec::new();
853 while let Some(chunk) = rx.recv().await {
854 result.extend(chunk);
855 }
856
857 let pack_size = result.len();
858 let compression_rate = if total_original_size > 0 {
859 1.0 - (pack_size as f64 / total_original_size as f64)
860 } else {
861 0.0
862 };
863
864 let duration = start.elapsed();
865 tracing::info!("sha256 test executed in: {:.2?}", duration);
866 tracing::info!("new pack file size: {}", result.len());
867 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
868 check_format(&result);
869 }
870
871 #[tokio::test]
872 async fn test_pack_encoder_large_file() {
873 let _guard = set_hash_kind_for_test(HashKind::Sha1);
874 init_logger();
875 let entries = get_entries_for_test().await;
876 let entries_number = entries.lock().await.len();
877
878 let total_original_size: usize = entries
879 .lock()
880 .await
881 .iter()
882 .map(|entry| entry.data.len())
883 .sum();
884
885 let start = Instant::now();
886 let (tx, mut rx) = mpsc::channel(100_000);
888 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
889
890 let mut encoder = PackEncoder::new(entries_number, 0, tx);
891 tokio::spawn(async move {
892 time_it!("test encode no parallel", {
893 encoder.encode(entry_rx).await.unwrap();
894 });
895 });
896
897 tokio::spawn(async move {
899 let entries = entries.lock().await;
900 for entry in entries.iter() {
901 entry_tx
902 .send(MetaAttached {
903 inner: entry.clone(),
904 meta: EntryMeta::new(),
905 })
906 .await
907 .unwrap();
908 }
909 drop(entry_tx);
910 tracing::info!("all entries sent");
911 });
912
913 let mut result = Vec::new();
919 while let Some(chunk) = rx.recv().await {
920 result.extend(chunk);
921 }
922
923 let pack_size = result.len();
924 let compression_rate = if total_original_size > 0 {
925 1.0 - (pack_size as f64 / total_original_size as f64)
926 } else {
927 0.0
928 };
929
930 let duration = start.elapsed();
931 tracing::info!("test executed in: {:.2?}", duration);
932 tracing::info!("new pack file size: {}", pack_size);
933 tracing::info!("original total size: {}", total_original_size);
934 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
935 tracing::info!(
936 "space saved: {} bytes",
937 total_original_size.saturating_sub(pack_size)
938 );
939 }
940 #[tokio::test]
941 async fn test_pack_encoder_large_file_sha256() {
942 let _guard = set_hash_kind_for_test(HashKind::Sha256);
943 init_logger();
944 let entries = get_entries_for_test_sha256().await;
945 let entries_number = entries.lock().await.len();
946
947 let total_original_size: usize = entries
948 .lock()
949 .await
950 .iter()
951 .map(|entry| entry.data.len())
952 .sum();
953
954 let start = Instant::now();
955 let (tx, mut rx) = mpsc::channel(100_000);
957 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
958
959 let mut encoder = PackEncoder::new(entries_number, 0, tx);
960 tokio::spawn(async move {
961 time_it!("test encode no parallel sha256", {
962 encoder.encode(entry_rx).await.unwrap();
963 });
964 });
965
966 tokio::spawn(async move {
968 let entries = entries.lock().await;
969 for entry in entries.iter() {
970 entry_tx
971 .send(MetaAttached {
972 inner: entry.clone(),
973 meta: EntryMeta::new(),
974 })
975 .await
976 .unwrap();
977 }
978 drop(entry_tx);
979 tracing::info!("all entries sent");
980 });
981
982 let mut result = Vec::new();
988 while let Some(chunk) = rx.recv().await {
989 result.extend(chunk);
990 }
991
992 let pack_size = result.len();
993 let compression_rate = if total_original_size > 0 {
994 1.0 - (pack_size as f64 / total_original_size as f64)
995 } else {
996 0.0
997 };
998
999 let duration = start.elapsed();
1000 tracing::info!("test executed in: {:.2?}", duration);
1001 tracing::info!("new pack file size: {}", pack_size);
1002 tracing::info!("original total size: {}", total_original_size);
1003 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1004 tracing::info!(
1005 "space saved: {} bytes",
1006 total_original_size.saturating_sub(pack_size)
1007 );
1008 }
1009
1010 #[tokio::test]
1011 async fn test_pack_encoder_with_zstdelta() {
1012 let _guard = set_hash_kind_for_test(HashKind::Sha1);
1013 init_logger();
1014 let entries = get_entries_for_test().await;
1015 let entries_number = entries.lock().await.len();
1016
1017 let total_original_size: usize = entries
1018 .lock()
1019 .await
1020 .iter()
1021 .map(|entry| entry.data.len())
1022 .sum();
1023
1024 let start = Instant::now();
1025 let (tx, mut rx) = mpsc::channel(100_000);
1026 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1027
1028 let encoder = PackEncoder::new(entries_number, 10, tx);
1029 encoder.encode_async_with_zstdelta(entry_rx).await.unwrap();
1030
1031 tokio::spawn(async move {
1033 let entries = entries.lock().await;
1034 for entry in entries.iter() {
1035 entry_tx
1036 .send(MetaAttached {
1037 inner: entry.clone(),
1038 meta: EntryMeta::new(),
1039 })
1040 .await
1041 .unwrap();
1042 }
1043 drop(entry_tx);
1044 tracing::info!("all entries sent");
1045 });
1046
1047 let mut result = Vec::new();
1048 while let Some(chunk) = rx.recv().await {
1049 result.extend(chunk);
1050 }
1051
1052 let pack_size = result.len();
1053 let compression_rate = if total_original_size > 0 {
1054 1.0 - (pack_size as f64 / total_original_size as f64)
1055 } else {
1056 0.0
1057 };
1058
1059 let duration = start.elapsed();
1060 tracing::info!("test executed in: {:.2?}", duration);
1061 tracing::info!("new pack file size: {}", pack_size);
1062 tracing::info!("original total size: {}", total_original_size);
1063 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1064 tracing::info!(
1065 "space saved: {} bytes",
1066 total_original_size.saturating_sub(pack_size)
1067 );
1068
1069 check_format(&result);
1071 }
1072 #[tokio::test]
1073 async fn test_pack_encoder_with_zstdelta_sha256() {
1074 let _guard = set_hash_kind_for_test(HashKind::Sha256);
1075 init_logger();
1076 let entries = get_entries_for_test_sha256().await;
1077 let entries_number = entries.lock().await.len();
1078
1079 let total_original_size: usize = entries
1080 .lock()
1081 .await
1082 .iter()
1083 .map(|entry| entry.data.len())
1084 .sum();
1085
1086 let start = Instant::now();
1087 let (tx, mut rx) = mpsc::channel(100_000);
1088 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1089
1090 let encoder = PackEncoder::new(entries_number, 10, tx);
1091 encoder.encode_async_with_zstdelta(entry_rx).await.unwrap();
1092
1093 tokio::spawn(async move {
1095 let entries = entries.lock().await;
1096 for entry in entries.iter() {
1097 entry_tx
1098 .send(MetaAttached {
1099 inner: entry.clone(),
1100 meta: EntryMeta::new(),
1101 })
1102 .await
1103 .unwrap();
1104 }
1105 drop(entry_tx);
1106 tracing::info!("all entries sent");
1107 });
1108
1109 let mut result = Vec::new();
1110 while let Some(chunk) = rx.recv().await {
1111 result.extend(chunk);
1112 }
1113
1114 let pack_size = result.len();
1115 let compression_rate = if total_original_size > 0 {
1116 1.0 - (pack_size as f64 / total_original_size as f64)
1117 } else {
1118 0.0
1119 };
1120
1121 let duration = start.elapsed();
1122 tracing::info!("test executed in: {:.2?}", duration);
1123 tracing::info!("new pack file size: {}", pack_size);
1124 tracing::info!("original total size: {}", total_original_size);
1125 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1126 tracing::info!(
1127 "space saved: {} bytes",
1128 total_original_size.saturating_sub(pack_size)
1129 );
1130
1131 check_format(&result);
1133 }
1134
1135 #[test]
1136 fn test_encode_offset() {
1137 let value = 16389;
1139
1140 let data = encode_offset(value);
1141 println!("{data:?}");
1142 let mut reader = Cursor::new(data);
1143 let (result, _) = read_offset_encoding(&mut reader).unwrap();
1144 println!("result: {result}");
1145 assert_eq!(result, value as u64);
1146 }
1147
1148 #[tokio::test]
1149 async fn test_pack_encoder_large_file_with_delta() {
1150 let _guard = set_hash_kind_for_test(HashKind::Sha1);
1151 init_logger();
1152 let entries = get_entries_for_test().await;
1153 let entries_number = entries.lock().await.len();
1154
1155 let total_original_size: usize = entries
1156 .lock()
1157 .await
1158 .iter()
1159 .map(|entry| entry.data.len())
1160 .sum();
1161
1162 let (tx, mut rx) = mpsc::channel(100_000);
1163 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1164
1165 let encoder = PackEncoder::new(entries_number, 10, tx);
1166
1167 let start = Instant::now(); encoder.encode_async(entry_rx).await.unwrap();
1169
1170 tokio::spawn(async move {
1172 let entries = entries.lock().await;
1173 for entry in entries.iter() {
1174 entry_tx
1175 .send(MetaAttached {
1176 inner: entry.clone(),
1177 meta: EntryMeta::new(),
1178 })
1179 .await
1180 .unwrap();
1181 }
1182 drop(entry_tx);
1183 tracing::info!("all entries sent");
1184 });
1185
1186 let mut result = Vec::new();
1187 while let Some(chunk) = rx.recv().await {
1188 result.extend(chunk);
1189 }
1190
1191 let pack_size = result.len();
1192 let compression_rate = if total_original_size > 0 {
1193 1.0 - (pack_size as f64 / total_original_size as f64)
1194 } else {
1195 0.0
1196 };
1197
1198 let duration = start.elapsed();
1199 tracing::info!("test executed in: {:.2?}", duration);
1200 tracing::info!("new pack file size: {}", pack_size);
1201 tracing::info!("original total size: {}", total_original_size);
1202 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1203 tracing::info!(
1204 "space saved: {} bytes",
1205 total_original_size.saturating_sub(pack_size)
1206 );
1207
1208 check_format(&result);
1210 }
1211 #[tokio::test]
1212 async fn test_pack_encoder_large_file_with_delta_sha256() {
1213 let _guard = set_hash_kind_for_test(HashKind::Sha256);
1214 init_logger();
1215 let entries = get_entries_for_test_sha256().await;
1216 let entries_number = entries.lock().await.len();
1217
1218 let total_original_size: usize = entries
1219 .lock()
1220 .await
1221 .iter()
1222 .map(|entry| entry.data.len())
1223 .sum();
1224
1225 let (tx, mut rx) = mpsc::channel(100_000);
1226 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1227
1228 let encoder = PackEncoder::new(entries_number, 10, tx);
1229
1230 let start = Instant::now(); encoder.encode_async(entry_rx).await.unwrap();
1232
1233 tokio::spawn(async move {
1235 let entries = entries.lock().await;
1236 for entry in entries.iter() {
1237 entry_tx
1238 .send(MetaAttached {
1239 inner: entry.clone(),
1240 meta: EntryMeta::new(),
1241 })
1242 .await
1243 .unwrap();
1244 }
1245 drop(entry_tx);
1246 tracing::info!("all entries sent");
1247 });
1248
1249 let mut result = Vec::new();
1250 while let Some(chunk) = rx.recv().await {
1251 result.extend(chunk);
1252 }
1253
1254 let pack_size = result.len();
1255 let compression_rate = if total_original_size > 0 {
1256 1.0 - (pack_size as f64 / total_original_size as f64)
1257 } else {
1258 0.0
1259 };
1260
1261 let duration = start.elapsed();
1262 tracing::info!("test executed in: {:.2?}", duration);
1263 tracing::info!("new pack file size: {}", pack_size);
1264 tracing::info!("original total size: {}", total_original_size);
1265 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1266 tracing::info!(
1267 "space saved: {} bytes",
1268 total_original_size.saturating_sub(pack_size)
1269 );
1270
1271 check_format(&result);
1273 }
1274}