1use std::{
5 cmp::Ordering,
6 collections::VecDeque,
7 hash::{Hash, Hasher},
8 io::Write,
9 path::{Path, PathBuf},
10};
11
12use ahash::AHasher;
13use chrono::Utc;
15use flate2::write::ZlibEncoder;
16use natord::compare;
17use rayon::prelude::*;
18use tokio::io::AsyncWriteExt as TokioAsyncWriteExt;
20use tokio::{fs::File, sync::mpsc, task::JoinHandle};
21
22use crate::delta;
24use crate::{
25 errors::GitError,
26 hash::ObjectHash,
27 internal::{
28 metadata::{EntryMeta, MetaAttached},
29 object::types::ObjectType,
30 pack::{entry::Entry, index_entry::IndexEntry, pack_index::IdxBuilder},
31 },
32 time_it,
33 utils::HashAlgorithm,
34 zstdelta,
35};
36
37const MAX_CHAIN_LEN: usize = 50;
38const MIN_DELTA_RATE: f64 = 0.5; pub struct PackEncoder {
43 object_number: usize,
45 process_index: usize,
46 window_size: usize,
47 pack_sender: Option<mpsc::Sender<Vec<u8>>>,
49 idx_sender: Option<mpsc::Sender<Vec<u8>>>,
50 idx_entries: Option<Vec<IndexEntry>>,
52 inner_offset: usize, inner_hash: HashAlgorithm, final_hash: Option<ObjectHash>,
55 start_encoding: bool,
56}
57
58pub async fn encode_and_output_to_files(
69 raw_entries_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
70 object_number: usize,
71 output_dir: PathBuf,
72 window_size: usize,
73) -> Result<(), GitError> {
74 let (pack_tx, mut pack_rx) = mpsc::channel(1024);
75 let (idx_tx, mut idx_rx) = mpsc::channel(1024);
76 let mut pack_encoder = PackEncoder::new_with_idx(object_number, window_size, pack_tx, idx_tx);
77
78 let now = Utc::now();
80 let timestamp = now.format("%Y%m%d%H%M%S%.3f").to_string(); let tmp_path = output_dir.join(format!("{}objects.pack.tmp", timestamp));
82 let mut pack_file = File::create(&tmp_path).await?;
83
84 let pack_writer = tokio::spawn(async move {
85 while let Some(chunk) = pack_rx.recv().await {
86 TokioAsyncWriteExt::write_all(&mut pack_file, &chunk).await?;
87 }
88 TokioAsyncWriteExt::flush(&mut pack_file).await?;
90 Ok::<(), GitError>(())
91 });
92
93 pack_encoder.encode(raw_entries_rx).await?;
94
95 let pack_write_result = pack_writer
97 .await
98 .map_err(|e| GitError::PackEncodeError(format!("pack writer task join error: {e}")))?;
99 pack_write_result?;
100
101 let final_pack_name =
102 output_dir.join(format!("pack-{}.pack", pack_encoder.final_hash.unwrap()));
103 let final_idx_name = output_dir.join(format!("pack-{}.idx", pack_encoder.final_hash.unwrap()));
104 tokio::fs::rename(tmp_path, &final_pack_name).await?;
105
106 let mut idx_file = File::create(&final_idx_name).await?;
107 let idx_writer = tokio::spawn(async move {
108 while let Some(chunk) = idx_rx.recv().await {
109 TokioAsyncWriteExt::write_all(&mut idx_file, &chunk).await?;
111 }
112 TokioAsyncWriteExt::flush(&mut idx_file).await?;
114 Ok::<(), GitError>(())
115 });
116
117 pack_encoder.encode_idx_file().await?;
119
120 let idx_write_result = idx_writer
121 .await
122 .map_err(|e| GitError::PackEncodeError(format!("idx writer task join error: {e}")))?;
123 idx_write_result?;
124
125 Ok(())
126}
127
128fn encode_header(object_number: usize) -> Vec<u8> {
131 let mut result: Vec<u8> = vec![
132 b'P', b'A', b'C', b'K', 0, 0, 0, 2, ];
135 assert_ne!(object_number, 0); assert!(object_number < (1 << 32));
137 result.append((object_number as u32).to_be_bytes().to_vec().as_mut()); result
140}
141
142fn encode_offset(mut value: usize) -> Vec<u8> {
144 assert_ne!(value, 0, "offset can't be zero");
145 let mut bytes = Vec::new();
146
147 bytes.push((value & 0x7F) as u8);
148 value >>= 7;
149 while value != 0 {
150 value -= 1;
151 let byte = (value & 0x7F) as u8 | 0x80; value >>= 7;
153 bytes.push(byte);
154 }
155 bytes.reverse();
156 bytes
157}
158
159fn encode_one_object(entry: &Entry, offset: Option<usize>) -> Result<Vec<u8>, GitError> {
162 let obj_data = &entry.data;
164 let obj_data_len = obj_data.len();
165 let obj_type_number = entry.obj_type.to_u8();
166
167 let mut encoded_data = Vec::new();
168
169 let mut header_data = vec![(0x80 | (obj_type_number << 4)) + (obj_data_len & 0x0f) as u8];
171 let mut size = obj_data_len >> 4; if size > 0 {
173 while size > 0 {
174 if size >> 7 > 0 {
175 header_data.push((0x80 | size) as u8);
176 size >>= 7;
177 } else {
178 header_data.push(size as u8);
179 break;
180 }
181 }
182 } else {
183 header_data.push(0);
184 }
185 encoded_data.extend(header_data);
186
187 if entry.obj_type == ObjectType::OffsetDelta || entry.obj_type == ObjectType::OffsetZstdelta {
189 let offset_data = encode_offset(offset.unwrap());
190 encoded_data.extend(offset_data);
191 } else if entry.obj_type == ObjectType::HashDelta {
192 unreachable!("unsupported type")
193 }
194
195 let mut inflate = ZlibEncoder::new(Vec::new(), flate2::Compression::default());
197 inflate
198 .write_all(obj_data)
199 .expect("zlib compress should never failed");
200 inflate.flush().expect("zlib flush should never failed");
201 let compressed_data = inflate.finish().expect("zlib compress should never failed");
202 encoded_data.extend(compressed_data);
204 Ok(encoded_data)
205}
206
207fn magic_sort(a: &MetaAttached<Entry, EntryMeta>, b: &MetaAttached<Entry, EntryMeta>) -> Ordering {
209 let path_a = a.meta.file_path.as_ref();
210 let path_b = b.meta.file_path.as_ref();
211
212 match (path_a, path_b) {
214 (Some(pa), Some(pb)) => {
215 let pa = Path::new(pa);
216 let pb = Path::new(pb);
217
218 let dir_ord = pa.parent().cmp(&pb.parent());
220 if dir_ord != Ordering::Equal {
221 return dir_ord;
222 }
223
224 let name_a = pa.file_name().unwrap_or_default().to_string_lossy();
226 let name_b = pb.file_name().unwrap_or_default().to_string_lossy();
227 let name_ord = compare(&name_a, &name_b);
228 if name_ord != Ordering::Equal {
229 return name_ord;
230 }
231 }
232 (Some(_), None) => return Ordering::Less, (None, Some(_)) => return Ordering::Greater, (None, None) => {}
235 }
236
237 let ord = b.inner.data.len().cmp(&a.inner.data.len());
238 if ord != Ordering::Equal {
239 return ord;
240 }
241
242 (a as *const MetaAttached<Entry, EntryMeta>).cmp(&(b as *const MetaAttached<Entry, EntryMeta>))
244}
245
246fn calc_hash(data: &[u8]) -> u64 {
248 let mut hasher = AHasher::default();
249 data.hash(&mut hasher);
250 hasher.finish()
251}
252
253fn cheap_similar(a: &[u8], b: &[u8]) -> bool {
255 let k = a.len().min(b.len()).min(128);
256 if k == 0 {
257 return false;
258 }
259 calc_hash(&a[..k]) == calc_hash(&b[..k])
260}
261
262impl PackEncoder {
263 pub fn new(object_number: usize, window_size: usize, sender: mpsc::Sender<Vec<u8>>) -> Self {
264 PackEncoder {
265 object_number,
266 window_size,
267 process_index: 0,
268 pack_sender: Some(sender),
270 idx_sender: None,
271 idx_entries: None,
272 inner_offset: 12, inner_hash: HashAlgorithm::new(), final_hash: None,
275 start_encoding: false,
276 }
277 }
278
279 pub fn new_with_idx(
280 object_number: usize,
281 window_size: usize,
282 pack_sender: mpsc::Sender<Vec<u8>>,
283 idx_sender: mpsc::Sender<Vec<u8>>,
284 ) -> Self {
285 PackEncoder {
286 object_number,
288 window_size,
289 process_index: 0,
290 pack_sender: Some(pack_sender),
292 idx_sender: Some(idx_sender),
293 idx_entries: None,
294 inner_offset: 12, inner_hash: HashAlgorithm::new(), final_hash: None,
297 start_encoding: false,
298 }
299 }
300
301 pub fn drop_sender(&mut self) {
302 self.pack_sender.take(); }
304
305 pub async fn send_data(&mut self, data: Vec<u8>) {
306 if let Some(sender) = &self.pack_sender {
307 sender.send(data).await.unwrap();
308 }
309 }
310
311 pub fn get_hash(&self) -> Option<ObjectHash> {
313 self.final_hash
314 }
315
316 pub async fn encode(
324 &mut self,
325 entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
326 ) -> Result<(), GitError> {
327 if self.window_size == 0 {
329 self.parallel_encode(entry_rx).await
330 } else {
331 self.inner_encode(entry_rx, false).await
332 }
333 }
334
335 pub async fn encode_with_zstdelta(
337 &mut self,
338 entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
339 ) -> Result<(), GitError> {
340 self.inner_encode(entry_rx, true).await
341 }
342
343 async fn inner_encode(
346 &mut self,
347 mut entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
348 enable_zstdelta: bool,
349 ) -> Result<(), GitError> {
350 let head = encode_header(self.object_number);
351 self.send_data(head.clone()).await;
352 self.inner_hash.update(&head);
353
354 if self.start_encoding {
356 return Err(GitError::PackEncodeError(
357 "encoding operation is already in progress".to_string(),
358 ));
359 }
360
361 let mut commits: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
362 let mut trees: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
363 let mut blobs: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
364 let mut tags: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
365 while let Some(entry) = entry_rx.recv().await {
366 match entry.inner.obj_type {
367 ObjectType::Commit => {
368 commits.push(entry);
369 }
370 ObjectType::Tree => {
371 trees.push(entry);
372 }
373 ObjectType::Blob => {
374 blobs.push(entry);
375 }
376 ObjectType::Tag => {
377 tags.push(entry);
378 }
379 _ => {}
380 }
381 }
382
383 commits.sort_by(magic_sort);
384 trees.sort_by(magic_sort);
385 blobs.sort_by(magic_sort);
386 tags.sort_by(magic_sort);
387 tracing::info!(
388 "numbers : commits: {:?} trees: {:?} blobs:{:?} tag :{:?}",
389 commits.len(),
390 trees.len(),
391 blobs.len(),
392 tags.len()
393 );
394
395 let (commit_results, tree_results, blob_results, tag_results) = tokio::try_join!(
397 tokio::task::spawn_blocking(move || {
398 Self::try_as_offset_delta(
399 commits
400 .into_iter()
401 .map(|entry_with_meta| entry_with_meta.inner)
402 .collect(),
403 10,
404 enable_zstdelta,
405 )
406 }),
407 tokio::task::spawn_blocking(move || {
408 Self::try_as_offset_delta(
409 trees
410 .into_iter()
411 .map(|entry_with_meta| entry_with_meta.inner)
412 .collect(),
413 10,
414 enable_zstdelta,
415 )
416 }),
417 tokio::task::spawn_blocking(move || {
418 Self::try_as_offset_delta(
419 blobs
420 .into_iter()
421 .map(|entry_with_meta| entry_with_meta.inner)
422 .collect(),
423 10,
424 enable_zstdelta,
425 )
426 }),
427 tokio::task::spawn_blocking(move || {
428 Self::try_as_offset_delta(
429 tags.into_iter()
430 .map(|entry_with_meta| entry_with_meta.inner)
431 .collect(),
432 10,
433 enable_zstdelta,
434 )
435 }),
436 )
437 .map_err(|e| GitError::PackEncodeError(format!("Task join error: {e}")))?;
438
439 let commit_res = commit_results?;
440 let tree_res = tree_results?;
441 let blob_res = blob_results?;
442 let tag_res = tag_results?;
443
444 let mut all_res = vec![commit_res, tree_res, blob_res, tag_res];
445
446 let mut idx_entries = Vec::new();
447 for res in &mut all_res {
448 for data in res {
449 data.1.offset = self.inner_offset as u64;
450 self.write_all_and_update(&data.0).await;
451 idx_entries.push(data.1.clone());
452 }
453 }
454
455 self.idx_entries = Some(idx_entries);
456
457 let hash_result = self.inner_hash.clone().finalize();
459 self.final_hash = Some(ObjectHash::from_bytes(&hash_result).unwrap());
460 self.send_data(hash_result.to_vec()).await;
461
462 self.drop_sender();
463 Ok(())
464 }
465
466 fn try_as_offset_delta(
474 mut bucket: Vec<Entry>,
475 window_size: usize,
476 enable_zstdelta: bool,
477 ) -> Result<Vec<(Vec<u8>, IndexEntry)>, GitError> {
478 let mut current_offset = 0usize;
479 let mut window: VecDeque<(Entry, usize)> = VecDeque::with_capacity(window_size);
480 let mut res: Vec<(Vec<u8>, IndexEntry)> = Vec::new();
481 for entry in bucket.iter_mut() {
484 let mut best_base: Option<&(Entry, usize)> = None;
487 let mut best_rate: f64 = 0.0;
488 let tie_epsilon: f64 = 0.15;
489
490 let candidates: Vec<_> = window
491 .par_iter()
492 .with_min_len(3)
493 .filter_map(|try_base| {
494 if try_base.0.obj_type != entry.obj_type {
495 return None;
496 }
497
498 if try_base.0.chain_len >= MAX_CHAIN_LEN {
499 return None;
500 }
501
502 if try_base.0.hash == entry.hash {
503 return None;
504 }
505
506 let sym_ratio = (try_base.0.data.len().min(entry.data.len()) as f64)
507 / (try_base.0.data.len().max(entry.data.len()) as f64);
508 if sym_ratio < 0.5 {
509 return None;
510 }
511
512 if !cheap_similar(&try_base.0.data, &entry.data) {
513 return None;
514 }
515
516 let rate = if (try_base.0.data.len() + entry.data.len()) / 2 > 64 {
517 delta::heuristic_encode_rate_parallel(&try_base.0.data, &entry.data)
518 } else {
519 delta::encode_rate(&try_base.0.data, &entry.data)
520 };
523
524 if rate > MIN_DELTA_RATE {
525 Some((rate, try_base))
526 } else {
527 None
528 }
529 })
530 .collect();
531
532 for (rate, try_base) in candidates {
533 match best_base {
534 None => {
535 best_rate = rate;
536 best_base = Some(try_base);
538 }
539 Some(best_base_ref) => {
540 let is_better = if rate > best_rate + tie_epsilon {
541 true
542 } else if (rate - best_rate).abs() <= tie_epsilon {
543 try_base.0.chain_len > best_base_ref.0.chain_len
544 } else {
545 false
546 };
547
548 if is_better {
549 best_rate = rate;
550 best_base = Some(try_base);
551 }
552 }
553 }
554 }
555
556 let mut entry_for_window = entry.clone();
557
558 let offset = best_base.map(|best_base| {
559 let delta = if enable_zstdelta {
560 entry.obj_type = ObjectType::OffsetZstdelta;
561 zstdelta::diff(&best_base.0.data, &entry.data)
562 .map_err(|e| {
563 GitError::DeltaObjectError(format!("zstdelta diff failed: {e}"))
564 })
565 .unwrap()
566 } else {
567 entry.obj_type = ObjectType::OffsetDelta;
568 delta::encode(&best_base.0.data, &entry.data)
569 };
570 entry.data = delta;
572 entry.chain_len = best_base.0.chain_len + 1;
573 current_offset - best_base.1
574 });
575
576 entry_for_window.chain_len = entry.chain_len;
577 let obj_data = encode_one_object(entry, offset)?;
578 window.push_back((entry_for_window, current_offset));
579 if window.len() > window_size {
580 window.pop_front();
581 }
582 res.push((obj_data.clone(), IndexEntry::new(entry, 0)));
583 current_offset += obj_data.len();
584 }
585 Ok(res)
586 }
587
588 pub async fn parallel_encode(
590 &mut self,
591 mut entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
592 ) -> Result<(), GitError> {
593 if self.window_size != 0 {
594 return Err(GitError::PackEncodeError(
595 "parallel encode only works when window_size == 0".to_string(),
596 ));
597 }
598
599 let head = encode_header(self.object_number);
600 self.send_data(head.clone()).await;
601 self.inner_hash.update(&head);
602
603 if self.start_encoding {
605 return Err(GitError::PackEncodeError(
606 "encoding operation is already in progress".to_string(),
607 ));
608 }
609
610 let mut idx_entries = Vec::new();
611 let batch_size = usize::max(1000, entry_rx.max_capacity() / 10); tracing::info!("encode with batch size: {}", batch_size);
613 loop {
614 let mut batch_entries = Vec::with_capacity(batch_size);
615 time_it!("parallel encode: receive batch", {
616 for _ in 0..batch_size {
617 match entry_rx.recv().await {
618 Some(entry) => {
619 batch_entries.push(entry.inner);
620 self.process_index += 1;
621 }
622 None => break,
623 }
624 }
625 });
626
627 if batch_entries.is_empty() {
628 break;
629 }
630
631 let batch_result: Vec<(Vec<u8>, IndexEntry)> =
633 time_it!("parallel encode: encode batch", {
634 batch_entries
635 .par_iter()
636 .map(|entry| {
637 (
638 encode_one_object(entry, None).unwrap(),
639 IndexEntry::new(entry, 0),
640 )
641 })
642 .collect()
643 });
644
645 time_it!("parallel encode: write batch", {
646 for mut obj_data in batch_result {
647 obj_data.1.offset = self.inner_offset as u64;
648 self.write_all_and_update(&obj_data.0).await;
649 idx_entries.push(obj_data.1);
650 }
651 });
652 }
653
654 tracing::debug!("parallel encode idx entries: {:?}", idx_entries.len());
655 if self.process_index != self.object_number {
656 panic!(
657 "not all objects are encoded, process:{}, total:{}",
658 self.process_index, self.object_number
659 );
660 }
661
662 let hash_result = self.inner_hash.clone().finalize();
664 self.final_hash = Some(ObjectHash::from_bytes(&hash_result).unwrap());
665 self.send_data(hash_result.to_vec()).await;
666 self.drop_sender();
667
668 self.idx_entries = Some(idx_entries);
669 Ok(())
670 }
671
672 async fn write_all_and_update(&mut self, data: &[u8]) {
674 self.inner_hash.update(data);
675 self.inner_offset += data.len();
676 self.send_data(data.to_vec()).await;
677 }
678
679 async fn generate_idx_file(&mut self) -> Result<(), GitError> {
680 let final_hash = self.final_hash
681 .ok_or(GitError::PackEncodeError("final_hash is missing,The pack file must be generated before the index file is produced.".into()))?;
682 let idx_entries = self.idx_entries.clone().ok_or(GitError::PackEncodeError(
683 "The pack file must be generated before the index file is produced.".into(),
684 ))?;
685 let mut idx_builder = IdxBuilder::new(
686 self.object_number,
687 self.idx_sender.clone().unwrap(),
688 final_hash,
689 );
690 idx_builder.write_idx(idx_entries).await?;
691 Ok(())
692 }
693
694 pub async fn encode_async(
700 mut self,
701 rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
702 ) -> Result<JoinHandle<()>, GitError> {
703 Ok(tokio::spawn(async move {
704 if self.window_size == 0 {
705 self.parallel_encode(rx).await.unwrap()
706 } else {
707 self.encode(rx).await.unwrap()
708 }
709 }))
710 }
711
712 pub async fn encode_async_with_zstdelta(
714 mut self,
715 rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
716 ) -> Result<JoinHandle<()>, GitError> {
717 Ok(tokio::spawn(async move {
718 self.encode_with_zstdelta(rx).await.unwrap()
720 }))
721 }
722
723 pub async fn encode_idx_file(&mut self) -> Result<(), GitError> {
725 if self.idx_sender.is_none() {
726 return Err(GitError::PackEncodeError(String::from(
727 "idx sender is none",
728 )));
729 }
730 self.generate_idx_file().await?;
731 self.idx_sender.take();
733 Ok(())
734 }
735}
736
737#[cfg(test)]
738mod tests {
739 use std::{env, io::Cursor, path::PathBuf, sync::Arc, time::Instant};
740
741 use tempfile::tempdir;
742 use tokio::sync::Mutex;
743
744 use super::*;
745 use crate::{
746 hash::{HashKind, ObjectHash, set_hash_kind_for_test},
747 internal::{
748 object::blob::Blob,
749 pack::{Pack, tests::init_logger, utils::read_offset_encoding},
750 },
751 time_it,
752 };
753
754 fn check_format(data: &Vec<u8>) {
756 let mut p = Pack::new(
757 None,
758 Some(1024 * 1024 * 1024 * 6), Some(PathBuf::from("/tmp/.cache_temp")),
760 true,
761 );
762 let mut reader = Cursor::new(data);
763 tracing::debug!("start check format");
764 p.decode(&mut reader, |_| {}, None::<fn(ObjectHash)>)
765 .expect("pack file format error");
766 }
767
768 #[tokio::test]
769 async fn test_pack_encoder() {
770 let _guard = set_hash_kind_for_test(HashKind::Sha1);
771 async fn encode_once(window_size: usize) -> Vec<u8> {
772 let (tx, mut rx) = mpsc::channel(100);
773 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
774
775 let str_vec = vec!["hello, word", "hello, world.", "!", "123141251251"];
777 let encoder = PackEncoder::new(str_vec.len(), window_size, tx);
778 encoder.encode_async(entry_rx).await.unwrap();
779
780 for str in str_vec {
781 let blob = Blob::from_content(str);
782 let entry: Entry = blob.into();
783 entry_tx
784 .send(MetaAttached {
785 inner: entry,
786 meta: EntryMeta::new(),
787 })
788 .await
789 .unwrap();
790 }
791 drop(entry_tx);
792 let mut result = Vec::new();
794 while let Some(chunk) = rx.recv().await {
795 result.extend(chunk);
796 }
797 result
798 }
799
800 let pack_without_delta = encode_once(0).await;
802 let pack_without_delta_size = pack_without_delta.len();
803 check_format(&pack_without_delta);
804
805 let pack_with_delta = encode_once(4).await;
807 assert!(pack_with_delta.len() <= pack_without_delta_size);
808 check_format(&pack_with_delta);
809 }
810 #[tokio::test]
811 async fn test_pack_encoder_sha256() {
812 let _guard = set_hash_kind_for_test(HashKind::Sha256);
813
814 async fn encode_once(window_size: usize) -> Vec<u8> {
815 let (tx, mut rx) = mpsc::channel(100);
816 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
817
818 let str_vec = vec!["hello, word", "hello, world.", "!", "123141251251"];
819 let encoder = PackEncoder::new(str_vec.len(), window_size, tx);
820 encoder.encode_async(entry_rx).await.unwrap();
821
822 for s in str_vec {
823 let blob = Blob::from_content(s);
824 let entry: Entry = blob.into();
825 entry_tx
826 .send(MetaAttached {
827 inner: entry,
828 meta: EntryMeta::new(),
829 })
830 .await
831 .unwrap();
832 }
833 drop(entry_tx);
834
835 let mut result = Vec::new();
836 while let Some(chunk) = rx.recv().await {
837 result.extend(chunk);
838 }
839 result
840 }
841
842 let pack_without_delta = encode_once(0).await;
844 let pack_without_delta_size = pack_without_delta.len();
845 check_format(&pack_without_delta);
846
847 let pack_with_delta = encode_once(4).await;
849 assert!(pack_with_delta.len() <= pack_without_delta_size);
850 check_format(&pack_with_delta);
851 }
852
853 async fn get_entries_for_test() -> Arc<Mutex<Vec<Entry>>> {
854 let source = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
855 .join("tests/data/packs/encode-test-sha1.pack");
856
857 let mut p = Pack::new(None, None, Some(PathBuf::from("/tmp/.cache_temp")), true);
858
859 let f = std::fs::File::open(&source).unwrap();
860 tracing::info!("pack file size: {}", f.metadata().unwrap().len());
861 let mut reader = std::io::BufReader::new(f);
862 let entries = Arc::new(Mutex::new(Vec::new()));
863 let entries_clone = entries.clone();
864 p.decode(
865 &mut reader,
866 move |entry| {
867 let mut entries = entries_clone.blocking_lock();
868 entries.push(entry.inner);
869 },
870 None::<fn(ObjectHash)>,
871 )
872 .unwrap();
873 assert_eq!(p.number, entries.lock().await.len());
874 tracing::info!("total entries: {}", p.number);
875 drop(p);
876
877 entries
878 }
879 async fn get_entries_for_test_sha256() -> Arc<Mutex<Vec<Entry>>> {
880 let source = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
881 .join("tests/data/packs/encode-test-sha256.pack");
882
883 let mut p = Pack::new(None, None, Some(PathBuf::from("/tmp/.cache_temp")), true);
884
885 let f = std::fs::File::open(&source).unwrap();
886 tracing::info!("pack file size: {}", f.metadata().unwrap().len());
887 let mut reader = std::io::BufReader::new(f);
888 let entries = Arc::new(Mutex::new(Vec::new()));
889 let entries_clone = entries.clone();
890 p.decode(
891 &mut reader,
892 move |entry| {
893 let mut entries = entries_clone.blocking_lock();
894 entries.push(entry.inner);
895 },
896 None::<fn(ObjectHash)>,
897 )
898 .unwrap();
899 assert_eq!(p.number, entries.lock().await.len());
900 tracing::info!("total entries: {}", p.number);
901 drop(p);
902
903 entries
904 }
905
906 #[tokio::test]
907 async fn test_pack_encoder_parallel_large_file() {
908 let _guard = set_hash_kind_for_test(HashKind::Sha1);
909 init_logger();
910
911 let start = Instant::now();
912 let entries = get_entries_for_test().await;
913 let entries_number = entries.lock().await.len();
914
915 let total_original_size: usize = entries
916 .lock()
917 .await
918 .iter()
919 .map(|entry| entry.data.len())
920 .sum();
921
922 let (tx, mut rx) = mpsc::channel(1_000_000);
924 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1_000_000);
925
926 let mut encoder = PackEncoder::new(entries_number, 0, tx);
927 tokio::spawn(async move {
928 time_it!("test parallel encode", {
929 encoder.parallel_encode(entry_rx).await.unwrap();
930 });
931 });
932
933 tokio::spawn(async move {
935 let entries = entries.lock().await;
936 for entry in entries.iter() {
937 entry_tx
938 .send(MetaAttached {
939 inner: entry.clone(),
940 meta: EntryMeta::new(),
941 })
942 .await
943 .unwrap();
944 }
945 drop(entry_tx);
946 tracing::info!("all entries sent");
947 });
948
949 let mut result = Vec::new();
950 while let Some(chunk) = rx.recv().await {
951 result.extend(chunk);
952 }
953
954 let pack_size = result.len();
955 let compression_rate = if total_original_size > 0 {
956 1.0 - (pack_size as f64 / total_original_size as f64)
957 } else {
958 0.0
959 };
960
961 let duration = start.elapsed();
962 tracing::info!("test executed in: {:.2?}", duration);
963 tracing::info!("new pack file size: {}", result.len());
964 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
965 check_format(&result);
967 }
968 #[tokio::test]
969 async fn test_pack_encoder_parallel_large_file_sha256() {
970 let _guard = set_hash_kind_for_test(HashKind::Sha256);
971 init_logger();
972
973 let start = Instant::now();
974 let entries = get_entries_for_test_sha256().await;
976 let entries_number = entries.lock().await.len();
977
978 let total_original_size: usize = entries
979 .lock()
980 .await
981 .iter()
982 .map(|entry| entry.data.len())
983 .sum();
984
985 let (tx, mut rx) = mpsc::channel(1_000_000);
986 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1_000_000);
987
988 let mut encoder = PackEncoder::new(entries_number, 0, tx);
989 tokio::spawn(async move {
990 time_it!("test parallel encode sha256", {
991 encoder.parallel_encode(entry_rx).await.unwrap();
992 });
993 });
994
995 tokio::spawn(async move {
996 let entries = entries.lock().await;
997 for entry in entries.iter() {
998 entry_tx
999 .send(MetaAttached {
1000 inner: entry.clone(),
1001 meta: EntryMeta::new(),
1002 })
1003 .await
1004 .unwrap();
1005 }
1006 drop(entry_tx);
1007 tracing::info!("all entries sent");
1008 });
1009
1010 let mut result = Vec::new();
1011 while let Some(chunk) = rx.recv().await {
1012 result.extend(chunk);
1013 }
1014
1015 let pack_size = result.len();
1016 let compression_rate = if total_original_size > 0 {
1017 1.0 - (pack_size as f64 / total_original_size as f64)
1018 } else {
1019 0.0
1020 };
1021
1022 let duration = start.elapsed();
1023 tracing::info!("sha256 test executed in: {:.2?}", duration);
1024 tracing::info!("new pack file size: {}", result.len());
1025 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1026 check_format(&result);
1027 }
1028
1029 #[tokio::test]
1030 async fn test_pack_encoder_large_file() {
1031 let _guard = set_hash_kind_for_test(HashKind::Sha1);
1032 init_logger();
1033 let entries = get_entries_for_test().await;
1034 let entries_number = entries.lock().await.len();
1035
1036 let total_original_size: usize = entries
1037 .lock()
1038 .await
1039 .iter()
1040 .map(|entry| entry.data.len())
1041 .sum();
1042
1043 let start = Instant::now();
1044 let (tx, mut rx) = mpsc::channel(100_000);
1046 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1047
1048 let mut encoder = PackEncoder::new(entries_number, 0, tx);
1049 tokio::spawn(async move {
1050 time_it!("test encode no parallel", {
1051 encoder.encode(entry_rx).await.unwrap();
1052 });
1053 });
1054
1055 tokio::spawn(async move {
1057 let entries = entries.lock().await;
1058 for entry in entries.iter() {
1059 entry_tx
1060 .send(MetaAttached {
1061 inner: entry.clone(),
1062 meta: EntryMeta::new(),
1063 })
1064 .await
1065 .unwrap();
1066 }
1067 drop(entry_tx);
1068 tracing::info!("all entries sent");
1069 });
1070
1071 let mut result = Vec::new();
1077 while let Some(chunk) = rx.recv().await {
1078 result.extend(chunk);
1079 }
1080
1081 let pack_size = result.len();
1082 let compression_rate = if total_original_size > 0 {
1083 1.0 - (pack_size as f64 / total_original_size as f64)
1084 } else {
1085 0.0
1086 };
1087
1088 let duration = start.elapsed();
1089 tracing::info!("test executed in: {:.2?}", duration);
1090 tracing::info!("new pack file size: {}", pack_size);
1091 tracing::info!("original total size: {}", total_original_size);
1092 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1093 tracing::info!(
1094 "space saved: {} bytes",
1095 total_original_size.saturating_sub(pack_size)
1096 );
1097 }
1098 #[tokio::test]
1099 async fn test_pack_encoder_large_file_sha256() {
1100 let _guard = set_hash_kind_for_test(HashKind::Sha256);
1101 init_logger();
1102 let entries = get_entries_for_test_sha256().await;
1103 let entries_number = entries.lock().await.len();
1104
1105 let total_original_size: usize = entries
1106 .lock()
1107 .await
1108 .iter()
1109 .map(|entry| entry.data.len())
1110 .sum();
1111
1112 let start = Instant::now();
1113 let (tx, mut rx) = mpsc::channel(100_000);
1115 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1116
1117 let mut encoder = PackEncoder::new(entries_number, 0, tx);
1118 tokio::spawn(async move {
1119 time_it!("test encode no parallel sha256", {
1120 encoder.encode(entry_rx).await.unwrap();
1121 });
1122 });
1123
1124 tokio::spawn(async move {
1126 let entries = entries.lock().await;
1127 for entry in entries.iter() {
1128 entry_tx
1129 .send(MetaAttached {
1130 inner: entry.clone(),
1131 meta: EntryMeta::new(),
1132 })
1133 .await
1134 .unwrap();
1135 }
1136 drop(entry_tx);
1137 tracing::info!("all entries sent");
1138 });
1139
1140 let mut result = Vec::new();
1146 while let Some(chunk) = rx.recv().await {
1147 result.extend(chunk);
1148 }
1149
1150 let pack_size = result.len();
1151 let compression_rate = if total_original_size > 0 {
1152 1.0 - (pack_size as f64 / total_original_size as f64)
1153 } else {
1154 0.0
1155 };
1156
1157 let duration = start.elapsed();
1158 tracing::info!("test executed in: {:.2?}", duration);
1159 tracing::info!("new pack file size: {}", pack_size);
1160 tracing::info!("original total size: {}", total_original_size);
1161 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1162 tracing::info!(
1163 "space saved: {} bytes",
1164 total_original_size.saturating_sub(pack_size)
1165 );
1166 }
1167
1168 #[tokio::test]
1169 async fn test_pack_encoder_with_zstdelta() {
1170 let _guard = set_hash_kind_for_test(HashKind::Sha1);
1171 init_logger();
1172 let entries = get_entries_for_test().await;
1173 let entries_number = entries.lock().await.len();
1174
1175 let total_original_size: usize = entries
1176 .lock()
1177 .await
1178 .iter()
1179 .map(|entry| entry.data.len())
1180 .sum();
1181
1182 let start = Instant::now();
1183 let (tx, mut rx) = mpsc::channel(100_000);
1184 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1185
1186 let encoder = PackEncoder::new(entries_number, 10, tx);
1187 encoder.encode_async_with_zstdelta(entry_rx).await.unwrap();
1188
1189 tokio::spawn(async move {
1191 let entries = entries.lock().await;
1192 for entry in entries.iter() {
1193 entry_tx
1194 .send(MetaAttached {
1195 inner: entry.clone(),
1196 meta: EntryMeta::new(),
1197 })
1198 .await
1199 .unwrap();
1200 }
1201 drop(entry_tx);
1202 tracing::info!("all entries sent");
1203 });
1204
1205 let mut result = Vec::new();
1206 while let Some(chunk) = rx.recv().await {
1207 result.extend(chunk);
1208 }
1209
1210 let pack_size = result.len();
1211 let compression_rate = if total_original_size > 0 {
1212 1.0 - (pack_size as f64 / total_original_size as f64)
1213 } else {
1214 0.0
1215 };
1216
1217 let duration = start.elapsed();
1218 tracing::info!("test executed in: {:.2?}", duration);
1219 tracing::info!("new pack file size: {}", pack_size);
1220 tracing::info!("original total size: {}", total_original_size);
1221 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1222 tracing::info!(
1223 "space saved: {} bytes",
1224 total_original_size.saturating_sub(pack_size)
1225 );
1226
1227 check_format(&result);
1229 }
1230 #[tokio::test]
1231 async fn test_pack_encoder_with_zstdelta_sha256() {
1232 let _guard = set_hash_kind_for_test(HashKind::Sha256);
1233 init_logger();
1234 let entries = get_entries_for_test_sha256().await;
1235 let entries_number = entries.lock().await.len();
1236
1237 let total_original_size: usize = entries
1238 .lock()
1239 .await
1240 .iter()
1241 .map(|entry| entry.data.len())
1242 .sum();
1243
1244 let start = Instant::now();
1245 let (tx, mut rx) = mpsc::channel(100_000);
1246 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1247
1248 let encoder = PackEncoder::new(entries_number, 10, tx);
1249 encoder.encode_async_with_zstdelta(entry_rx).await.unwrap();
1250
1251 tokio::spawn(async move {
1253 let entries = entries.lock().await;
1254 for entry in entries.iter() {
1255 entry_tx
1256 .send(MetaAttached {
1257 inner: entry.clone(),
1258 meta: EntryMeta::new(),
1259 })
1260 .await
1261 .unwrap();
1262 }
1263 drop(entry_tx);
1264 tracing::info!("all entries sent");
1265 });
1266
1267 let mut result = Vec::new();
1268 while let Some(chunk) = rx.recv().await {
1269 result.extend(chunk);
1270 }
1271
1272 let pack_size = result.len();
1273 let compression_rate = if total_original_size > 0 {
1274 1.0 - (pack_size as f64 / total_original_size as f64)
1275 } else {
1276 0.0
1277 };
1278
1279 let duration = start.elapsed();
1280 tracing::info!("test executed in: {:.2?}", duration);
1281 tracing::info!("new pack file size: {}", pack_size);
1282 tracing::info!("original total size: {}", total_original_size);
1283 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1284 tracing::info!(
1285 "space saved: {} bytes",
1286 total_original_size.saturating_sub(pack_size)
1287 );
1288
1289 check_format(&result);
1291 }
1292
1293 #[test]
1294 fn test_encode_offset() {
1295 let value = 16389;
1297
1298 let data = encode_offset(value);
1299 println!("{data:?}");
1300 let mut reader = Cursor::new(data);
1301 let (result, _) = read_offset_encoding(&mut reader).unwrap();
1302 println!("result: {result}");
1303 assert_eq!(result, value as u64);
1304 }
1305
1306 #[tokio::test]
1307 async fn test_pack_encoder_large_file_with_delta() {
1308 let _guard = set_hash_kind_for_test(HashKind::Sha1);
1309 init_logger();
1310 let entries = get_entries_for_test().await;
1311 let entries_number = entries.lock().await.len();
1312
1313 let total_original_size: usize = entries
1314 .lock()
1315 .await
1316 .iter()
1317 .map(|entry| entry.data.len())
1318 .sum();
1319
1320 let (tx, mut rx) = mpsc::channel(100_000);
1321 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1322
1323 let encoder = PackEncoder::new(entries_number, 10, tx);
1324
1325 let start = Instant::now(); encoder.encode_async(entry_rx).await.unwrap();
1327
1328 tokio::spawn(async move {
1330 let entries = entries.lock().await;
1331 for entry in entries.iter() {
1332 entry_tx
1333 .send(MetaAttached {
1334 inner: entry.clone(),
1335 meta: EntryMeta::new(),
1336 })
1337 .await
1338 .unwrap();
1339 }
1340 drop(entry_tx);
1341 tracing::info!("all entries sent");
1342 });
1343
1344 let mut result = Vec::new();
1345 while let Some(chunk) = rx.recv().await {
1346 result.extend(chunk);
1347 }
1348
1349 let pack_size = result.len();
1350 let compression_rate = if total_original_size > 0 {
1351 1.0 - (pack_size as f64 / total_original_size as f64)
1352 } else {
1353 0.0
1354 };
1355
1356 let duration = start.elapsed();
1357 tracing::info!("test executed in: {:.2?}", duration);
1358 tracing::info!("new pack file size: {}", pack_size);
1359 tracing::info!("original total size: {}", total_original_size);
1360 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1361 tracing::info!(
1362 "space saved: {} bytes",
1363 total_original_size.saturating_sub(pack_size)
1364 );
1365
1366 check_format(&result);
1368 }
1369 #[tokio::test]
1370 async fn test_pack_encoder_large_file_with_delta_sha256() {
1371 let _guard = set_hash_kind_for_test(HashKind::Sha256);
1372 init_logger();
1373 let entries = get_entries_for_test_sha256().await;
1374 let entries_number = entries.lock().await.len();
1375
1376 let total_original_size: usize = entries
1377 .lock()
1378 .await
1379 .iter()
1380 .map(|entry| entry.data.len())
1381 .sum();
1382
1383 let (tx, mut rx) = mpsc::channel(100_000);
1384 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1385
1386 let encoder = PackEncoder::new(entries_number, 10, tx);
1387
1388 let start = Instant::now(); encoder.encode_async(entry_rx).await.unwrap();
1390
1391 tokio::spawn(async move {
1393 let entries = entries.lock().await;
1394 for entry in entries.iter() {
1395 entry_tx
1396 .send(MetaAttached {
1397 inner: entry.clone(),
1398 meta: EntryMeta::new(),
1399 })
1400 .await
1401 .unwrap();
1402 }
1403 drop(entry_tx);
1404 tracing::info!("all entries sent");
1405 });
1406
1407 let mut result = Vec::new();
1408 while let Some(chunk) = rx.recv().await {
1409 result.extend(chunk);
1410 }
1411
1412 let pack_size = result.len();
1413 let compression_rate = if total_original_size > 0 {
1414 1.0 - (pack_size as f64 / total_original_size as f64)
1415 } else {
1416 0.0
1417 };
1418
1419 let duration = start.elapsed();
1420 tracing::info!("test executed in: {:.2?}", duration);
1421 tracing::info!("new pack file size: {}", pack_size);
1422 tracing::info!("original total size: {}", total_original_size);
1423 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1424 tracing::info!(
1425 "space saved: {} bytes",
1426 total_original_size.saturating_sub(pack_size)
1427 );
1428
1429 check_format(&result);
1431 }
1432
1433 #[tokio::test]
1434 async fn test_pack_encoder_output_to_files() {
1435 let _guard = set_hash_kind_for_test(HashKind::Sha1);
1436 init_logger();
1437 let entries = get_entries_for_test().await;
1438 let entries_number = entries.lock().await.len();
1439
1440 let total_original_size: usize = entries
1441 .lock()
1442 .await
1443 .iter()
1444 .map(|entry| entry.data.len())
1445 .sum();
1446
1447 let start = Instant::now();
1448
1449 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1450 let dir = tempdir().unwrap();
1452 let path = dir.path();
1453
1454 tokio::spawn(async move {
1456 let entries = entries.lock().await;
1457 for entry in entries.iter() {
1458 entry_tx
1459 .send(MetaAttached {
1460 inner: entry.clone(),
1461 meta: EntryMeta::new(),
1462 })
1463 .await
1464 .unwrap();
1465 }
1466 drop(entry_tx);
1467 tracing::info!("all entries sent");
1468 });
1469
1470 encode_and_output_to_files(entry_rx, entries_number, path.to_path_buf(), 0)
1471 .await
1472 .unwrap();
1473
1474 let mut pack_file = None;
1476 let mut idx_file = None;
1477 for entry in std::fs::read_dir(path).unwrap() {
1478 let entry = entry.unwrap();
1479 let file_name = entry.file_name();
1480 tracing::info!("file name: {:?}", file_name);
1481 let file_name = file_name.to_string_lossy();
1482 if file_name.ends_with(".pack") {
1483 pack_file = Some(entry.path());
1484 } else if file_name.ends_with(".idx") {
1485 idx_file = Some(entry.path());
1486 }
1487 }
1488 let pack_file = pack_file.expect("pack file not generated");
1489 let idx_file = idx_file.expect("idx file not generated");
1490 assert!(
1491 pack_file.metadata().unwrap().len() > 0,
1492 "pack file is empty"
1493 );
1494 assert!(idx_file.metadata().unwrap().len() > 0, "idx file is empty");
1495
1496 let duration = start.elapsed();
1497 tracing::info!("test executed in: {:.2?}", duration);
1498 tracing::info!("original total size: {}", total_original_size);
1499 }
1500
1501 #[tokio::test]
1502 async fn test_pack_encoder_output_to_files_with_delta() {
1503 let _guard = set_hash_kind_for_test(HashKind::Sha1);
1504 init_logger();
1505 let entries = get_entries_for_test().await;
1506 let entries_number = entries.lock().await.len();
1507
1508 let total_original_size: usize = entries
1509 .lock()
1510 .await
1511 .iter()
1512 .map(|entry| entry.data.len())
1513 .sum();
1514
1515 let start = Instant::now();
1516
1517 let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1518 let dir = tempdir().unwrap();
1520 let path = dir.path();
1521
1522 tokio::spawn(async move {
1524 let entries = entries.lock().await;
1525 for entry in entries.iter() {
1526 entry_tx
1527 .send(MetaAttached {
1528 inner: entry.clone(),
1529 meta: EntryMeta::new(),
1530 })
1531 .await
1532 .unwrap();
1533 }
1534 drop(entry_tx);
1535 tracing::info!("all entries sent");
1536 });
1537
1538 encode_and_output_to_files(entry_rx, entries_number, path.to_path_buf(), 10)
1539 .await
1540 .unwrap();
1541
1542 let mut pack_file = None;
1544 let mut idx_file = None;
1545 for entry in std::fs::read_dir(path).unwrap() {
1546 let entry = entry.unwrap();
1547 let file_name = entry.file_name();
1548 tracing::info!("file name: {:?}", file_name);
1549 let file_name = file_name.to_string_lossy();
1550 if file_name.ends_with(".pack") {
1551 pack_file = Some(entry.path());
1552 } else if file_name.ends_with(".idx") {
1553 idx_file = Some(entry.path());
1554 }
1555 }
1556 let pack_file = pack_file.expect("pack file not generated");
1557 let idx_file = idx_file.expect("idx file not generated");
1558 assert!(
1559 pack_file.metadata().unwrap().len() > 0,
1560 "pack file is empty"
1561 );
1562 assert!(idx_file.metadata().unwrap().len() > 0, "idx file is empty");
1563
1564 let duration = start.elapsed();
1565 tracing::info!("test executed in: {:.2?}", duration);
1566 tracing::info!("original total size: {}", total_original_size);
1567 }
1568}