1use std::cmp::Ordering;
2use std::collections::VecDeque;
3use std::io::Write;
4
5use crate::delta;
6use crate::internal::metadata::{EntryMeta, MetaAttached};
7use crate::internal::object::types::ObjectType;
8use crate::time_it;
9use crate::zstdelta;
10use crate::{errors::GitError, hash::SHA1, internal::pack::entry::Entry};
11use ahash::AHasher;
12use flate2::write::ZlibEncoder;
13use natord::compare;
14use rayon::prelude::*;
15use sha1::{Digest, Sha1};
16use std::hash::{Hash, Hasher};
17use std::path::Path;
18use tokio::sync::mpsc;
19use tokio::task::JoinHandle;
20
21const MAX_CHAIN_LEN: usize = 50;
22const MIN_DELTA_RATE: f64 = 0.5; pub struct PackEncoder {
27 object_number: usize,
28 process_index: usize,
29 window_size: usize,
30 sender: Option<mpsc::Sender<Vec<u8>>>,
32 inner_offset: usize, inner_hash: Sha1, final_hash: Option<SHA1>,
35 start_encoding: bool,
36}
37
38fn encode_header(object_number: usize) -> Vec<u8> {
41 let mut result: Vec<u8> = vec![
42 b'P', b'A', b'C', b'K', 0, 0, 0, 2, ];
45 assert_ne!(object_number, 0); assert!(object_number < (1 << 32));
47 result.append((object_number as u32).to_be_bytes().to_vec().as_mut()); result
50}
51
52fn encode_offset(mut value: usize) -> Vec<u8> {
54 assert_ne!(value, 0, "offset can't be zero");
55 let mut bytes = Vec::new();
56
57 bytes.push((value & 0x7F) as u8);
58 value >>= 7;
59 while value != 0 {
60 value -= 1;
61 let byte = (value & 0x7F) as u8 | 0x80; value >>= 7;
63 bytes.push(byte);
64 }
65 bytes.reverse();
66 bytes
67}
68
69fn encode_one_object(entry: &Entry, offset: Option<usize>) -> Result<Vec<u8>, GitError> {
72 let obj_data = &entry.data;
74 let obj_data_len = obj_data.len();
75 let obj_type_number = entry.obj_type.to_u8();
76
77 let mut encoded_data = Vec::new();
78
79 let mut header_data = vec![(0x80 | (obj_type_number << 4)) + (obj_data_len & 0x0f) as u8];
81 let mut size = obj_data_len >> 4; if size > 0 {
83 while size > 0 {
84 if size >> 7 > 0 {
85 header_data.push((0x80 | size) as u8);
86 size >>= 7;
87 } else {
88 header_data.push(size as u8);
89 break;
90 }
91 }
92 } else {
93 header_data.push(0);
94 }
95 encoded_data.extend(header_data);
96
97 if entry.obj_type == ObjectType::OffsetDelta || entry.obj_type == ObjectType::OffsetZstdelta {
99 let offset_data = encode_offset(offset.unwrap());
100 encoded_data.extend(offset_data);
101 } else if entry.obj_type == ObjectType::HashDelta {
102 unreachable!("unsupported type")
103 }
104
105 let mut inflate = ZlibEncoder::new(Vec::new(), flate2::Compression::default());
107 inflate
108 .write_all(obj_data)
109 .expect("zlib compress should never failed");
110 inflate.flush().expect("zlib flush should never failed");
111 let compressed_data = inflate.finish().expect("zlib compress should never failed");
112 encoded_data.extend(compressed_data);
114 Ok(encoded_data)
115}
116
117fn magic_sort(a: &MetaAttached<Entry, EntryMeta>, b: &MetaAttached<Entry, EntryMeta>) -> Ordering {
118 let path_a = a.meta.file_path.as_ref();
119 let path_b = b.meta.file_path.as_ref();
120
121 match (path_a, path_b) {
123 (Some(pa), Some(pb)) => {
124 let pa = Path::new(pa);
125 let pb = Path::new(pb);
126
127 let dir_ord = pa.parent().cmp(&pb.parent());
129 if dir_ord != Ordering::Equal {
130 return dir_ord;
131 }
132
133 let name_a = pa.file_name().unwrap_or_default().to_string_lossy();
135 let name_b = pb.file_name().unwrap_or_default().to_string_lossy();
136 let name_ord = compare(&name_a, &name_b);
137 if name_ord != Ordering::Equal {
138 return name_ord;
139 }
140 }
141 (Some(_), None) => return Ordering::Less, (None, Some(_)) => return Ordering::Greater, (None, None) => {}
144 }
145
146 let ord = b.inner.data.len().cmp(&a.inner.data.len());
157 if ord != Ordering::Equal {
158 return ord;
159 }
160
161 (a as *const MetaAttached<Entry, EntryMeta>).cmp(&(b as *const MetaAttached<Entry, EntryMeta>))
163}
164
165fn calc_hash(data: &[u8]) -> u64 {
166 let mut hasher = AHasher::default();
167 data.hash(&mut hasher);
168 hasher.finish()
169}
170
171fn cheap_similar(a: &[u8], b: &[u8]) -> bool {
172 let k = a.len().min(b.len()).min(128);
173 if k == 0 {
174 return false;
175 }
176 calc_hash(&a[..k]) == calc_hash(&b[..k])
177}
178
179impl PackEncoder {
180 pub fn new(object_number: usize, window_size: usize, sender: mpsc::Sender<Vec<u8>>) -> Self {
181 PackEncoder {
182 object_number,
183 window_size,
184 process_index: 0,
185 sender: Some(sender),
187 inner_offset: 12, inner_hash: Sha1::new(),
189 final_hash: None,
190 start_encoding: false,
191 }
192 }
193
194 pub fn drop_sender(&mut self) {
195 self.sender.take(); }
197
198 pub async fn send_data(&mut self, data: Vec<u8>) {
199 if let Some(sender) = &self.sender {
200 sender.send(data).await.unwrap();
201 }
202 }
203
204 pub fn get_hash(&self) -> Option<SHA1> {
206 self.final_hash
207 }
208
209 pub async fn encode(
217 &mut self,
218 entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
219 ) -> Result<(), GitError> {
220 self.inner_encode(entry_rx, false).await
221 }
222
223 pub async fn encode_with_zstdelta(
224 &mut self,
225 entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
226 ) -> Result<(), GitError> {
227 self.inner_encode(entry_rx, true).await
228 }
229
230 async fn inner_encode(
233 &mut self,
234 mut entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
235 enable_zstdelta: bool,
236 ) -> Result<(), GitError> {
237 let head = encode_header(self.object_number);
238 self.send_data(head.clone()).await;
239 self.inner_hash.update(&head);
240
241 if self.start_encoding {
243 return Err(GitError::PackEncodeError(
244 "encoding operation is already in progress".to_string(),
245 ));
246 }
247
248 let mut commits: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
249 let mut trees: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
250 let mut blobs: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
251 let mut tags: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
252 while let Some(entry) = entry_rx.recv().await {
253 match entry.inner.obj_type {
254 ObjectType::Commit => {
255 commits.push(entry);
256 }
257 ObjectType::Tree => {
258 trees.push(entry);
259 }
260 ObjectType::Blob => {
261 blobs.push(entry);
262 }
263 ObjectType::Tag => {
264 tags.push(entry);
265 }
266 _ => {}
267 }
268 }
269
270 commits.sort_by(magic_sort);
271 trees.sort_by(magic_sort);
272 blobs.sort_by(magic_sort);
273 tags.sort_by(magic_sort);
274 tracing::info!(
275 "numbers : commits: {:?} trees: {:?} blobs:{:?} tag :{:?}",
276 commits.len(),
277 trees.len(),
278 blobs.len(),
279 tags.len()
280 );
281
282 let (commit_results, tree_results, blob_results, tag_results) = tokio::try_join!(
284 tokio::task::spawn_blocking(move || {
285 Self::try_as_offset_delta(
286 commits
287 .into_iter()
288 .map(|entry_with_meta| entry_with_meta.inner)
289 .collect(),
290 10,
291 enable_zstdelta,
292 )
293 }),
294 tokio::task::spawn_blocking(move || {
295 Self::try_as_offset_delta(
296 trees
297 .into_iter()
298 .map(|entry_with_meta| entry_with_meta.inner)
299 .collect(),
300 10,
301 enable_zstdelta,
302 )
303 }),
304 tokio::task::spawn_blocking(move || {
305 Self::try_as_offset_delta(
306 blobs
307 .into_iter()
308 .map(|entry_with_meta| entry_with_meta.inner)
309 .collect(),
310 10,
311 enable_zstdelta,
312 )
313 }),
314 tokio::task::spawn_blocking(move || {
315 Self::try_as_offset_delta(
316 tags.into_iter()
317 .map(|entry_with_meta| entry_with_meta.inner)
318 .collect(),
319 10,
320 enable_zstdelta,
321 )
322 }),
323 )
324 .map_err(|e| GitError::PackEncodeError(format!("Task join error: {e}")))?;
325
326 let all_encoded_data = [
327 commit_results
328 .map_err(|e| GitError::PackEncodeError(format!("Commit encoding error: {e}")))?,
329 tree_results
330 .map_err(|e| GitError::PackEncodeError(format!("Tree encoding error: {e}")))?,
331 blob_results
332 .map_err(|e| GitError::PackEncodeError(format!("Blob encoding error: {e}")))?,
333 tag_results
334 .map_err(|e| GitError::PackEncodeError(format!("Tag encoding error: {e}")))?,
335 ]
336 .concat();
337
338 for data in all_encoded_data {
339 self.write_all_and_update(&data).await;
340 }
341
342 let hash_result = self.inner_hash.clone().finalize();
344 self.final_hash = Some(SHA1::from_bytes(&hash_result));
345 self.send_data(hash_result.to_vec()).await;
346
347 self.drop_sender();
348 Ok(())
349 }
350
351 fn try_as_offset_delta(
359 mut bucket: Vec<Entry>,
360 window_size: usize,
361 enable_zstdelta: bool,
362 ) -> Result<Vec<Vec<u8>>, GitError> {
363 let mut current_offset = 0usize;
364 let mut window: VecDeque<(Entry, usize)> = VecDeque::with_capacity(window_size);
365 let mut res: Vec<Vec<u8>> = Vec::new();
366
367 for entry in bucket.iter_mut() {
368 let mut best_base: Option<&(Entry, usize)> = None;
371 let mut best_rate: f64 = 0.0;
372 let tie_epsilon: f64 = 0.15;
373
374 let candidates: Vec<_> = window
375 .par_iter()
376 .with_min_len(3)
377 .filter_map(|try_base| {
378 if try_base.0.obj_type != entry.obj_type {
379 return None;
380 }
381
382 if try_base.0.chain_len >= MAX_CHAIN_LEN {
383 return None;
384 }
385
386 if try_base.0.hash == entry.hash {
387 return None;
388 }
389
390 let sym_ratio = (try_base.0.data.len().min(entry.data.len()) as f64)
391 / (try_base.0.data.len().max(entry.data.len()) as f64);
392 if sym_ratio < 0.5 {
393 return None;
394 }
395
396 if !cheap_similar(&try_base.0.data, &entry.data) {
397 return None;
398 }
399
400 let rate = if (try_base.0.data.len() + entry.data.len()) / 2 > 64 {
401 delta::heuristic_encode_rate_parallel(&try_base.0.data, &entry.data)
402 } else {
403 delta::encode_rate(&try_base.0.data, &entry.data)
404 };
407
408 if rate > MIN_DELTA_RATE {
409 Some((rate, try_base))
410 } else {
411 None
412 }
413 })
414 .collect();
415
416 for (rate, try_base) in candidates {
417 match best_base {
418 None => {
419 best_rate = rate;
420 best_base = Some(try_base);
422 }
423 Some(best_base_ref) => {
424 let is_better = if rate > best_rate + tie_epsilon {
425 true
426 } else if (rate - best_rate).abs() <= tie_epsilon {
427 try_base.0.chain_len > best_base_ref.0.chain_len
428 } else {
429 false
430 };
431
432 if is_better {
433 best_rate = rate;
434 best_base = Some(try_base);
435 }
436 }
437 }
438 }
439
440 let mut entry_for_window = entry.clone();
441
442 let offset = best_base.map(|best_base| {
443 let delta = if enable_zstdelta {
444 entry.obj_type = ObjectType::OffsetZstdelta;
445 zstdelta::diff(&best_base.0.data, &entry.data)
446 .map_err(|e| {
447 GitError::DeltaObjectError(format!("zstdelta diff failed: {e}"))
448 })
449 .unwrap()
450 } else {
451 entry.obj_type = ObjectType::OffsetDelta;
452 delta::encode(&best_base.0.data, &entry.data)
453 };
454 entry.data = delta;
456 entry.chain_len = best_base.0.chain_len + 1;
457 current_offset - best_base.1
458 });
459
460 entry_for_window.chain_len = entry.chain_len;
461 let obj_data = encode_one_object(entry, offset)?;
462 window.push_back((entry_for_window, current_offset));
463 if window.len() > window_size {
464 window.pop_front();
465 }
466 current_offset += obj_data.len();
467 res.push(obj_data);
468 }
469 Ok(res)
470 }
471
472 pub async fn parallel_encode(
474 &mut self,
475 mut entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
476 ) -> Result<(), GitError> {
477 if self.window_size != 0 {
478 return Err(GitError::PackEncodeError(
479 "parallel encode only works when window_size == 0".to_string(),
480 ));
481 }
482
483 let head = encode_header(self.object_number);
484 self.send_data(head.clone()).await;
485 self.inner_hash.update(&head);
486
487 if self.start_encoding {
489 return Err(GitError::PackEncodeError(
490 "encoding operation is already in progress".to_string(),
491 ));
492 }
493
494 let batch_size = usize::max(1000, entry_rx.max_capacity() / 10); tracing::info!("encode with batch size: {}", batch_size);
496 loop {
497 let mut batch_entries = Vec::with_capacity(batch_size);
498 time_it!("parallel encode: receive batch", {
499 for _ in 0..batch_size {
500 match entry_rx.recv().await {
501 Some(entry) => {
502 batch_entries.push(entry.inner);
503 self.process_index += 1;
504 }
505 None => break,
506 }
507 }
508 });
509
510 if batch_entries.is_empty() {
511 break;
512 }
513
514 let batch_result: Vec<Vec<u8>> = time_it!("parallel encode: encode batch", {
516 batch_entries
517 .par_iter()
518 .map(|entry| encode_one_object(entry, None).unwrap())
519 .collect()
520 });
521
522 time_it!("parallel encode: write batch", {
523 for obj_data in batch_result {
524 self.write_all_and_update(&obj_data).await;
525 }
526 });
527 }
528
529 if self.process_index != self.object_number {
530 panic!(
531 "not all objects are encoded, process:{}, total:{}",
532 self.process_index, self.object_number
533 );
534 }
535
536 let hash_result = self.inner_hash.clone().finalize();
538 self.final_hash = Some(SHA1::from_bytes(&hash_result));
539 self.send_data(hash_result.to_vec()).await;
540 self.drop_sender();
541 Ok(())
542 }
543
544 async fn write_all_and_update(&mut self, data: &[u8]) {
546 self.inner_hash.update(data);
547 self.inner_offset += data.len();
548 self.send_data(data.to_vec()).await;
549 }
550
551 pub async fn encode_async(
557 mut self,
558 rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
559 ) -> Result<JoinHandle<()>, GitError> {
560 Ok(tokio::spawn(async move {
561 if self.window_size == 0 {
562 self.parallel_encode(rx).await.unwrap()
563 } else {
564 self.encode(rx).await.unwrap()
565 }
566 }))
567 }
568
569 pub async fn encode_async_with_zstdelta(
570 mut self,
571 rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
572 ) -> Result<JoinHandle<()>, GitError> {
573 Ok(tokio::spawn(async move {
574 self.encode_with_zstdelta(rx).await.unwrap()
576 }))
577 }
578}
579
580#[cfg(test)]
581mod tests {
582 use std::env;
583 use std::sync::Arc;
584 use std::time::Instant;
585 use std::{io::Cursor, path::PathBuf};
586 use tokio::sync::Mutex;
587
588 use crate::internal::object::blob::Blob;
589 use crate::internal::pack::utils::read_offset_encoding;
590 use crate::internal::pack::{Pack, tests::init_logger};
591 use crate::time_it;
592
593 use super::*;
594
595 fn check_format(data: &Vec<u8>) {
596 let mut p = Pack::new(
597 None,
598 Some(1024 * 1024 * 1024 * 6), Some(PathBuf::from("/tmp/.cache_temp")),
600 true,
601 );
602 let mut reader = Cursor::new(data);
603 tracing::debug!("start check format");
604 p.decode(&mut reader, |_| {}, None::<fn(SHA1)>)
605 .expect("pack file format error");
606 }
607
608 #[tokio::test]
609 async fn test_pack_encoder() {
610 async fn encode_once(window_size: usize) -> Vec<u8> {
611 let (tx, mut rx) = mpsc::channel(100);
612 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
613
614 let str_vec = vec!["hello, word", "hello, world.", "!", "123141251251"];
616 let encoder = PackEncoder::new(str_vec.len(), window_size, tx);
617 encoder.encode_async(entry_rx).await.unwrap();
618
619 for str in str_vec {
620 let blob = Blob::from_content(str);
621 let entry: Entry = blob.into();
622 entry_tx
623 .send(MetaAttached {
624 inner: entry,
625 meta: EntryMeta::new(),
626 })
627 .await
628 .unwrap();
629 }
630 drop(entry_tx);
631 let mut result = Vec::new();
633 while let Some(chunk) = rx.recv().await {
634 result.extend(chunk);
635 }
636 result
637 }
638
639 let pack_without_delta = encode_once(0).await;
641 let pack_without_delta_size = pack_without_delta.len();
642 check_format(&pack_without_delta);
643
644 let pack_with_delta = encode_once(4).await;
646 assert!(pack_with_delta.len() <= pack_without_delta_size);
647 check_format(&pack_with_delta);
648 }
649
650 async fn get_entries_for_test() -> Arc<Mutex<Vec<Entry>>> {
651 let source = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
652 .join("tests/data/packs/pack-f8bbb573cef7d851957caceb491c073ee8e8de41.pack");
653
654 let mut p = Pack::new(None, None, Some(PathBuf::from("/tmp/.cache_temp")), true);
655
656 let f = std::fs::File::open(&source).unwrap();
657 tracing::info!("pack file size: {}", f.metadata().unwrap().len());
658 let mut reader = std::io::BufReader::new(f);
659 let entries = Arc::new(Mutex::new(Vec::new()));
660 let entries_clone = entries.clone();
661 p.decode(
662 &mut reader,
663 move |entry| {
664 let mut entries = entries_clone.blocking_lock();
665 entries.push(entry.inner);
666 },
667 None::<fn(SHA1)>,
668 )
669 .unwrap();
670 assert_eq!(p.number, entries.lock().await.len());
671 tracing::info!("total entries: {}", p.number);
672 drop(p);
673
674 entries
675 }
676
677 #[tokio::test]
678 async fn test_pack_encoder_parallel_large_file() {
679 init_logger();
680
681 let start = Instant::now();
682 let entries = get_entries_for_test().await;
683 let entries_number = entries.lock().await.len();
684
685 let total_original_size: usize = entries
686 .lock()
687 .await
688 .iter()
689 .map(|entry| entry.data.len())
690 .sum();
691
692 let (tx, mut rx) = mpsc::channel(1_000_000);
694 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1_000_000);
695
696 let mut encoder = PackEncoder::new(entries_number, 0, tx);
697 tokio::spawn(async move {
698 time_it!("test parallel encode", {
699 encoder.parallel_encode(entry_rx).await.unwrap();
700 });
701 });
702
703 tokio::spawn(async move {
705 let entries = entries.lock().await;
706 for entry in entries.iter() {
707 entry_tx
708 .send(MetaAttached {
709 inner: entry.clone(),
710 meta: EntryMeta::new(),
711 })
712 .await
713 .unwrap();
714 }
715 drop(entry_tx);
716 tracing::info!("all entries sent");
717 });
718
719 let mut result = Vec::new();
720 while let Some(chunk) = rx.recv().await {
721 result.extend(chunk);
722 }
723
724 let pack_size = result.len();
725 let compression_rate = if total_original_size > 0 {
726 1.0 - (pack_size as f64 / total_original_size as f64)
727 } else {
728 0.0
729 };
730
731 let duration = start.elapsed();
732 tracing::info!("test executed in: {:.2?}", duration);
733 tracing::info!("new pack file size: {}", result.len());
734 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
735 check_format(&result);
737 }
738
739 #[tokio::test]
740 async fn test_pack_encoder_large_file() {
741 init_logger();
742 let entries = get_entries_for_test().await;
743 let entries_number = entries.lock().await.len();
744
745 let total_original_size: usize = entries
746 .lock()
747 .await
748 .iter()
749 .map(|entry| entry.data.len())
750 .sum();
751
752 let start = Instant::now();
753 let (tx, mut rx) = mpsc::channel(100_000);
755 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
756
757 let mut encoder = PackEncoder::new(entries_number, 0, tx);
758 tokio::spawn(async move {
759 time_it!("test encode no parallel", {
760 encoder.encode(entry_rx).await.unwrap();
761 });
762 });
763
764 tokio::spawn(async move {
766 let entries = entries.lock().await;
767 for entry in entries.iter() {
768 entry_tx
769 .send(MetaAttached {
770 inner: entry.clone(),
771 meta: EntryMeta::new(),
772 })
773 .await
774 .unwrap();
775 }
776 drop(entry_tx);
777 tracing::info!("all entries sent");
778 });
779
780 let mut result = Vec::new();
786 while let Some(chunk) = rx.recv().await {
787 result.extend(chunk);
788 }
789
790 let pack_size = result.len();
791 let compression_rate = if total_original_size > 0 {
792 1.0 - (pack_size as f64 / total_original_size as f64)
793 } else {
794 0.0
795 };
796
797 let duration = start.elapsed();
798 tracing::info!("test executed in: {:.2?}", duration);
799 tracing::info!("new pack file size: {}", pack_size);
800 tracing::info!("original total size: {}", total_original_size);
801 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
802 tracing::info!(
803 "space saved: {} bytes",
804 total_original_size.saturating_sub(pack_size)
805 );
806 }
807
808 #[tokio::test]
809 async fn test_pack_encoder_with_zstdelta() {
810 init_logger();
811 let entries = get_entries_for_test().await;
812 let entries_number = entries.lock().await.len();
813
814 let total_original_size: usize = entries
815 .lock()
816 .await
817 .iter()
818 .map(|entry| entry.data.len())
819 .sum();
820
821 let start = Instant::now();
822 let (tx, mut rx) = mpsc::channel(100_000);
823 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
824
825 let encoder = PackEncoder::new(entries_number, 10, tx);
826 encoder.encode_async_with_zstdelta(entry_rx).await.unwrap();
827
828 tokio::spawn(async move {
830 let entries = entries.lock().await;
831 for entry in entries.iter() {
832 entry_tx
833 .send(MetaAttached {
834 inner: entry.clone(),
835 meta: EntryMeta::new(),
836 })
837 .await
838 .unwrap();
839 }
840 drop(entry_tx);
841 tracing::info!("all entries sent");
842 });
843
844 let mut result = Vec::new();
845 while let Some(chunk) = rx.recv().await {
846 result.extend(chunk);
847 }
848
849 let pack_size = result.len();
850 let compression_rate = if total_original_size > 0 {
851 1.0 - (pack_size as f64 / total_original_size as f64)
852 } else {
853 0.0
854 };
855
856 let duration = start.elapsed();
857 tracing::info!("test executed in: {:.2?}", duration);
858 tracing::info!("new pack file size: {}", pack_size);
859 tracing::info!("original total size: {}", total_original_size);
860 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
861 tracing::info!(
862 "space saved: {} bytes",
863 total_original_size.saturating_sub(pack_size)
864 );
865
866 check_format(&result);
868 }
869
870 #[test]
871 fn test_encode_offset() {
872 let value = 16389;
874
875 let data = encode_offset(value);
876 println!("{data:?}");
877 let mut reader = Cursor::new(data);
878 let (result, _) = read_offset_encoding(&mut reader).unwrap();
879 println!("result: {result}");
880 assert_eq!(result, value as u64);
881 }
882
883 #[tokio::test]
884 async fn test_pack_encoder_large_file_with_delta() {
885 init_logger();
886 let entries = get_entries_for_test().await;
887 let entries_number = entries.lock().await.len();
888
889 let total_original_size: usize = entries
890 .lock()
891 .await
892 .iter()
893 .map(|entry| entry.data.len())
894 .sum();
895
896 let (tx, mut rx) = mpsc::channel(100_000);
897 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
898
899 let encoder = PackEncoder::new(entries_number, 10, tx);
900
901 let start = Instant::now(); encoder.encode_async(entry_rx).await.unwrap();
903
904 tokio::spawn(async move {
906 let entries = entries.lock().await;
907 for entry in entries.iter() {
908 entry_tx
909 .send(MetaAttached {
910 inner: entry.clone(),
911 meta: EntryMeta::new(),
912 })
913 .await
914 .unwrap();
915 }
916 drop(entry_tx);
917 tracing::info!("all entries sent");
918 });
919
920 let mut result = Vec::new();
921 while let Some(chunk) = rx.recv().await {
922 result.extend(chunk);
923 }
924
925 let pack_size = result.len();
926 let compression_rate = if total_original_size > 0 {
927 1.0 - (pack_size as f64 / total_original_size as f64)
928 } else {
929 0.0
930 };
931
932 let duration = start.elapsed();
933 tracing::info!("test executed in: {:.2?}", duration);
934 tracing::info!("new pack file size: {}", pack_size);
935 tracing::info!("original total size: {}", total_original_size);
936 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
937 tracing::info!(
938 "space saved: {} bytes",
939 total_original_size.saturating_sub(pack_size)
940 );
941
942 check_format(&result);
944 }
945}