1use std::cmp::Ordering;
2use std::collections::VecDeque;
3use std::io::Write;
4
5use crate::delta;
6use crate::zstdelta;
7use crate::internal::object::types::ObjectType;
8use crate::time_it;
9use crate::{errors::GitError, hash::SHA1, internal::pack::entry::Entry};
10use ahash::AHasher;
11use flate2::write::ZlibEncoder;
12use rayon::prelude::*;
13use sha1::{Digest, Sha1};
14use std::hash::{Hash, Hasher};
15use tokio::sync::mpsc;
16use tokio::task::JoinHandle;
17
18const MAX_CHAIN_LEN: usize = 50;
19const MIN_DELTA_RATE: f64 = 0.5; pub struct PackEncoder {
24 object_number: usize,
25 process_index: usize,
26 window_size: usize,
27 sender: Option<mpsc::Sender<Vec<u8>>>,
29 inner_offset: usize, inner_hash: Sha1, final_hash: Option<SHA1>,
32 start_encoding: bool,
33}
34
35fn encode_header(object_number: usize) -> Vec<u8> {
38 let mut result: Vec<u8> = vec![
39 b'P', b'A', b'C', b'K', 0, 0, 0, 2, ];
42 assert_ne!(object_number, 0); assert!(object_number < (1 << 32));
44 result.append((object_number as u32).to_be_bytes().to_vec().as_mut()); result
47}
48
49fn encode_offset(mut value: usize) -> Vec<u8> {
51 assert_ne!(value, 0, "offset can't be zero");
52 let mut bytes = Vec::new();
53
54 bytes.push((value & 0x7F) as u8);
55 value >>= 7;
56 while value != 0 {
57 value -= 1;
58 let byte = (value & 0x7F) as u8 | 0x80; value >>= 7;
60 bytes.push(byte);
61 }
62 bytes.reverse();
63 bytes
64}
65
66fn encode_one_object(entry: &Entry, offset: Option<usize>) -> Result<Vec<u8>, GitError> {
69 let obj_data = &entry.data;
71 let obj_data_len = obj_data.len();
72 let obj_type_number = entry.obj_type.to_u8();
73
74 let mut encoded_data = Vec::new();
75
76 let mut header_data = vec![(0x80 | (obj_type_number << 4)) + (obj_data_len & 0x0f) as u8];
78 let mut size = obj_data_len >> 4; if size > 0 {
80 while size > 0 {
81 if size >> 7 > 0 {
82 header_data.push((0x80 | size) as u8);
83 size >>= 7;
84 } else {
85 header_data.push(size as u8);
86 break;
87 }
88 }
89 } else {
90 header_data.push(0);
91 }
92 encoded_data.extend(header_data);
93
94 if entry.obj_type == ObjectType::OffsetDelta || entry.obj_type == ObjectType::OffsetZstdelta {
96 let offset_data = encode_offset(offset.unwrap());
97 encoded_data.extend(offset_data);
98 } else if entry.obj_type == ObjectType::HashDelta {
99 unreachable!("unsupported type")
100 }
101
102 let mut inflate = ZlibEncoder::new(Vec::new(), flate2::Compression::default());
104 inflate
105 .write_all(obj_data)
106 .expect("zlib compress should never failed");
107 inflate.flush().expect("zlib flush should never failed");
108 let compressed_data = inflate.finish().expect("zlib compress should never failed");
109 encoded_data.extend(compressed_data);
111 Ok(encoded_data)
112}
113
114fn magic_sort(a: &Entry, b: &Entry) -> Ordering {
115 let ord = b.data.len().cmp(&a.data.len());
126 if ord != Ordering::Equal {
127 return ord;
128 }
129
130 (a as *const Entry).cmp(&(b as *const Entry))
132}
133
134fn calc_hash(data: &[u8]) -> u64 {
135 let mut hasher = AHasher::default();
136 data.hash(&mut hasher);
137 hasher.finish()
138}
139
140fn cheap_similar(a: &[u8], b: &[u8]) -> bool {
141 let k = a.len().min(b.len()).min(128);
142 if k == 0 {
143 return false;
144 }
145 calc_hash(&a[..k]) == calc_hash(&b[..k])
146}
147
148impl PackEncoder {
149 pub fn new(object_number: usize, window_size: usize, sender: mpsc::Sender<Vec<u8>>) -> Self {
150 PackEncoder {
151 object_number,
152 window_size,
153 process_index: 0,
154 sender: Some(sender),
156 inner_offset: 12, inner_hash: Sha1::new(),
158 final_hash: None,
159 start_encoding: false,
160 }
161 }
162
163 pub fn drop_sender(&mut self) {
164 self.sender.take(); }
166
167 pub async fn send_data(&mut self, data: Vec<u8>) {
168 if let Some(sender) = &self.sender {
169 sender.send(data).await.unwrap();
170 }
171 }
172
173 pub fn get_hash(&self) -> Option<SHA1> {
175 self.final_hash
176 }
177
178 pub async fn encode(&mut self, entry_rx: mpsc::Receiver<Entry>) -> Result<(), GitError> {
186 self.inner_encode(entry_rx, false).await
187 }
188
189 pub async fn encode_with_zstdelta(
190 &mut self,
191 entry_rx: mpsc::Receiver<Entry>,
192 ) -> Result<(), GitError> {
193 self.inner_encode(entry_rx, true).await
194 }
195
196 async fn inner_encode(
199 &mut self,
200 mut entry_rx: mpsc::Receiver<Entry>,
201 enable_zstdelta: bool,
202 ) -> Result<(), GitError> {
203 let head = encode_header(self.object_number);
204 self.send_data(head.clone()).await;
205 self.inner_hash.update(&head);
206
207 if self.start_encoding {
209 return Err(GitError::PackEncodeError(
210 "encoding operation is already in progress".to_string(),
211 ));
212 }
213
214 let mut commits: Vec<Entry> = Vec::new();
215 let mut trees: Vec<Entry> = Vec::new();
216 let mut blobs: Vec<Entry> = Vec::new();
217 let mut tags: Vec<Entry> = Vec::new();
218 while let Some(entry) = entry_rx.recv().await {
219 match entry.obj_type {
220 ObjectType::Commit => {
221 commits.push(entry);
222 }
223 ObjectType::Tree => {
224 trees.push(entry);
225 }
226 ObjectType::Blob => {
227 blobs.push(entry);
228 }
229 ObjectType::Tag => {
230 tags.push(entry);
231 }
232 _ => {}
233 }
234 }
235
236 commits.sort_by(magic_sort);
237 trees.sort_by(magic_sort);
238 blobs.sort_by(magic_sort);
239 tags.sort_by(magic_sort);
240 tracing::info!(
241 "numbers : commits: {:?} trees: {:?} blobs:{:?} tag :{:?}",
242 commits.len(),
243 trees.len(),
244 blobs.len(),
245 tags.len()
246 );
247
248 let (commit_results, tree_results, blob_results, tag_results) = tokio::try_join!(
250 tokio::task::spawn_blocking(move || {
251 Self::try_as_offset_delta(commits, 10, enable_zstdelta)
252 }),
253 tokio::task::spawn_blocking(move || {
254 Self::try_as_offset_delta(trees, 10, enable_zstdelta)
255 }),
256 tokio::task::spawn_blocking(move || {
257 Self::try_as_offset_delta(blobs, 10, enable_zstdelta)
258 }),
259 tokio::task::spawn_blocking(move || {
260 Self::try_as_offset_delta(tags, 10, enable_zstdelta)
261 }),
262 )
263 .map_err(|e| GitError::PackEncodeError(format!("Task join error: {e}")))?;
264
265 let all_encoded_data = [
266 commit_results
267 .map_err(|e| GitError::PackEncodeError(format!("Commit encoding error: {e}")))?,
268 tree_results
269 .map_err(|e| GitError::PackEncodeError(format!("Tree encoding error: {e}")))?,
270 blob_results
271 .map_err(|e| GitError::PackEncodeError(format!("Blob encoding error: {e}")))?,
272 tag_results
273 .map_err(|e| GitError::PackEncodeError(format!("Tag encoding error: {e}")))?,
274 ]
275 .concat();
276
277 for data in all_encoded_data {
278 self.write_all_and_update(&data).await;
279 }
280
281 let hash_result = self.inner_hash.clone().finalize();
283 self.final_hash = Some(SHA1::from_bytes(&hash_result));
284 self.send_data(hash_result.to_vec()).await;
285
286 self.drop_sender();
287 Ok(())
288 }
289
290 fn try_as_offset_delta(
298 mut bucket: Vec<Entry>,
299 window_size: usize,
300 enable_zstdelta: bool,
301 ) -> Result<Vec<Vec<u8>>, GitError> {
302 let mut current_offset = 0usize;
303 let mut window: VecDeque<(Entry, usize)> = VecDeque::with_capacity(window_size);
304 let mut res: Vec<Vec<u8>> = Vec::new();
305
306 for entry in bucket.iter_mut() {
307 let mut best_base: Option<&(Entry, usize)> = None;
310 let mut best_rate: f64 = 0.0;
311 let tie_epsilon: f64 = 0.15;
312
313 let candidates: Vec<_> = window
314 .par_iter()
315 .with_min_len(3)
316 .filter_map(|try_base| {
317 if try_base.0.obj_type != entry.obj_type {
318 return None;
319 }
320
321 if try_base.0.chain_len >= MAX_CHAIN_LEN {
322 return None;
323 }
324
325 if try_base.0.hash == entry.hash {
326 return None;
327 }
328
329 let sym_ratio = (try_base.0.data.len().min(entry.data.len()) as f64)
330 / (try_base.0.data.len().max(entry.data.len()) as f64);
331 if sym_ratio < 0.5 {
332 return None;
333 }
334
335 if !cheap_similar(&try_base.0.data, &entry.data) {
336 return None;
337 }
338
339 let rate = if (try_base.0.data.len() + entry.data.len()) / 2 > 64 {
340 delta::heuristic_encode_rate_parallel(&try_base.0.data, &entry.data)
341 } else {
342 delta::encode_rate(&try_base.0.data, &entry.data)
343 };
346
347 if rate > MIN_DELTA_RATE {
348 Some((rate, try_base))
349 } else {
350 None
351 }
352 })
353 .collect();
354
355 for (rate, try_base) in candidates {
356 match best_base {
357 None => {
358 best_rate = rate;
359 best_base = Some(try_base);
361 }
362 Some(best_base_ref) => {
363 let is_better = if rate > best_rate + tie_epsilon {
364 true
365 } else if (rate - best_rate).abs() <= tie_epsilon {
366 try_base.0.chain_len > best_base_ref.0.chain_len
367 } else {
368 false
369 };
370
371 if is_better {
372 best_rate = rate;
373 best_base = Some(try_base);
374 }
375 }
376 }
377 }
378
379 let mut entry_for_window = entry.clone();
380
381 let offset = best_base.map(|best_base| {
382 let delta = if enable_zstdelta {
383 entry.obj_type = ObjectType::OffsetZstdelta;
384 zstdelta::diff(&best_base.0.data, &entry.data)
385 .map_err(|e| {
386 GitError::DeltaObjectError(format!("zstdelta diff failed: {e}"))
387 })
388 .unwrap()
389 } else {
390 entry.obj_type = ObjectType::OffsetDelta;
391 delta::encode(&best_base.0.data, &entry.data)
392 };
393 entry.data = delta;
395 entry.chain_len = best_base.0.chain_len + 1;
396 current_offset - best_base.1
397 });
398
399 entry_for_window.chain_len = entry.chain_len;
400 let obj_data = encode_one_object(entry, offset)?;
401 window.push_back((entry_for_window, current_offset));
402 if window.len() > window_size {
403 window.pop_front();
404 }
405 current_offset += obj_data.len();
406 res.push(obj_data);
407 }
408 Ok(res)
409 }
410
411 pub async fn parallel_encode(
413 &mut self,
414 mut entry_rx: mpsc::Receiver<Entry>,
415 ) -> Result<(), GitError> {
416 if self.window_size != 0 {
417 return Err(GitError::PackEncodeError(
418 "parallel encode only works when window_size == 0".to_string(),
419 ));
420 }
421
422 let head = encode_header(self.object_number);
423 self.send_data(head.clone()).await;
424 self.inner_hash.update(&head);
425
426 if self.start_encoding {
428 return Err(GitError::PackEncodeError(
429 "encoding operation is already in progress".to_string(),
430 ));
431 }
432
433 let batch_size = usize::max(1000, entry_rx.max_capacity() / 10); tracing::info!("encode with batch size: {}", batch_size);
435 loop {
436 let mut batch_entries = Vec::with_capacity(batch_size);
437 time_it!("parallel encode: receive batch", {
438 for _ in 0..batch_size {
439 match entry_rx.recv().await {
440 Some(entry) => {
441 batch_entries.push(entry);
442 self.process_index += 1;
443 }
444 None => break,
445 }
446 }
447 });
448
449 if batch_entries.is_empty() {
450 break;
451 }
452
453 let batch_result: Vec<Vec<u8>> = time_it!("parallel encode: encode batch", {
455 batch_entries
456 .par_iter()
457 .map(|entry| encode_one_object(entry, None).unwrap())
458 .collect()
459 });
460
461 time_it!("parallel encode: write batch", {
462 for obj_data in batch_result {
463 self.write_all_and_update(&obj_data).await;
464 }
465 });
466 }
467
468 if self.process_index != self.object_number {
469 panic!(
470 "not all objects are encoded, process:{}, total:{}",
471 self.process_index, self.object_number
472 );
473 }
474
475 let hash_result = self.inner_hash.clone().finalize();
477 self.final_hash = Some(SHA1::from_bytes(&hash_result));
478 self.send_data(hash_result.to_vec()).await;
479 self.drop_sender();
480 Ok(())
481 }
482
483 async fn write_all_and_update(&mut self, data: &[u8]) {
485 self.inner_hash.update(data);
486 self.inner_offset += data.len();
487 self.send_data(data.to_vec()).await;
488 }
489
490 pub async fn encode_async(
496 mut self,
497 rx: mpsc::Receiver<Entry>,
498 ) -> Result<JoinHandle<()>, GitError> {
499 Ok(tokio::spawn(async move {
500 if self.window_size == 0 {
501 self.parallel_encode(rx).await.unwrap()
502 } else {
503 self.encode(rx).await.unwrap()
504 }
505 }))
506 }
507
508 pub async fn encode_async_with_zstdelta(
509 mut self,
510 rx: mpsc::Receiver<Entry>,
511 ) -> Result<JoinHandle<()>, GitError> {
512 Ok(tokio::spawn(async move {
513 self.encode_with_zstdelta(rx).await.unwrap()
515 }))
516 }
517}
518
519#[cfg(test)]
520mod tests {
521 use std::env;
522 use std::sync::Arc;
523 use std::time::Instant;
524 use std::{io::Cursor, path::PathBuf};
525 use tokio::sync::Mutex;
526
527 use crate::internal::object::blob::Blob;
528 use crate::internal::pack::utils::read_offset_encoding;
529 use crate::internal::pack::{Pack, tests::init_logger};
530 use crate::time_it;
531
532 use super::*;
533
534 fn check_format(data: &Vec<u8>) {
535 let mut p = Pack::new(
536 None,
537 Some(1024 * 1024 * 1024 * 6), Some(PathBuf::from("/tmp/.cache_temp")),
539 true,
540 );
541 let mut reader = Cursor::new(data);
542 tracing::debug!("start check format");
543 p.decode(&mut reader, |_, _| {})
544 .expect("pack file format error");
545 }
546
547 #[tokio::test]
548 async fn test_pack_encoder() {
549 async fn encode_once(window_size: usize) -> Vec<u8> {
550 let (tx, mut rx) = mpsc::channel(100);
551 let (entry_tx, entry_rx) = mpsc::channel::<Entry>(1);
552
553 let str_vec = vec!["hello, word", "hello, world.", "!", "123141251251"];
555 let encoder = PackEncoder::new(str_vec.len(), window_size, tx);
556 encoder.encode_async(entry_rx).await.unwrap();
557
558 for str in str_vec {
559 let blob = Blob::from_content(str);
560 let entry: Entry = blob.into();
561 entry_tx.send(entry).await.unwrap();
562 }
563 drop(entry_tx);
564 let mut result = Vec::new();
566 while let Some(chunk) = rx.recv().await {
567 result.extend(chunk);
568 }
569 result
570 }
571
572 let pack_without_delta = encode_once(0).await;
574 let pack_without_delta_size = pack_without_delta.len();
575 check_format(&pack_without_delta);
576
577 let pack_with_delta = encode_once(4).await;
579 assert!(pack_with_delta.len() <= pack_without_delta_size);
580 check_format(&pack_with_delta);
581 }
582
583 async fn get_entries_for_test() -> Arc<Mutex<Vec<Entry>>> {
584 let source = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
585 .join("tests/data/packs/pack-f8bbb573cef7d851957caceb491c073ee8e8de41.pack");
586
587 let mut p = Pack::new(None, None, Some(PathBuf::from("/tmp/.cache_temp")), true);
588
589 let f = std::fs::File::open(&source).unwrap();
590 tracing::info!("pack file size: {}", f.metadata().unwrap().len());
591 let mut reader = std::io::BufReader::new(f);
592 let entries = Arc::new(Mutex::new(Vec::new()));
593 let entries_clone = entries.clone();
594 p.decode(&mut reader, move |entry, _| {
595 let mut entries = entries_clone.blocking_lock();
596 entries.push(entry);
597 })
598 .unwrap();
599 assert_eq!(p.number, entries.lock().await.len());
600 tracing::info!("total entries: {}", p.number);
601 drop(p);
602
603 entries
604 }
605
606 #[tokio::test]
607 async fn test_pack_encoder_parallel_large_file() {
608 init_logger();
609
610 let start = Instant::now();
611 let entries = get_entries_for_test().await;
612 let entries_number = entries.lock().await.len();
613
614 let total_original_size: usize = entries
615 .lock()
616 .await
617 .iter()
618 .map(|entry| entry.data.len())
619 .sum();
620
621 let (tx, mut rx) = mpsc::channel(1_000_000);
623 let (entry_tx, entry_rx) = mpsc::channel::<Entry>(1_000_000);
624
625 let mut encoder = PackEncoder::new(entries_number, 0, tx);
626 tokio::spawn(async move {
627 time_it!("test parallel encode", {
628 encoder.parallel_encode(entry_rx).await.unwrap();
629 });
630 });
631
632 tokio::spawn(async move {
634 let entries = entries.lock().await;
635 for entry in entries.iter() {
636 entry_tx.send(entry.clone()).await.unwrap();
637 }
638 drop(entry_tx);
639 tracing::info!("all entries sent");
640 });
641
642 let mut result = Vec::new();
643 while let Some(chunk) = rx.recv().await {
644 result.extend(chunk);
645 }
646
647 let pack_size = result.len();
648 let compression_rate = if total_original_size > 0 {
649 1.0 - (pack_size as f64 / total_original_size as f64)
650 } else {
651 0.0
652 };
653
654 let duration = start.elapsed();
655 tracing::info!("test executed in: {:.2?}", duration);
656 tracing::info!("new pack file size: {}", result.len());
657 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
658 check_format(&result);
660 }
661
662 #[tokio::test]
663 async fn test_pack_encoder_large_file() {
664 init_logger();
665 let entries = get_entries_for_test().await;
666 let entries_number = entries.lock().await.len();
667
668 let total_original_size: usize = entries
669 .lock()
670 .await
671 .iter()
672 .map(|entry| entry.data.len())
673 .sum();
674
675 let start = Instant::now();
676 let (tx, mut rx) = mpsc::channel(100_000);
678 let (entry_tx, entry_rx) = mpsc::channel::<Entry>(100_000);
679
680 let mut encoder = PackEncoder::new(entries_number, 0, tx);
681 tokio::spawn(async move {
682 time_it!("test encode no parallel", {
683 encoder.encode(entry_rx).await.unwrap();
684 });
685 });
686
687 tokio::spawn(async move {
689 let entries = entries.lock().await;
690 for entry in entries.iter() {
691 entry_tx.send(entry.clone()).await.unwrap();
692 }
693 drop(entry_tx);
694 tracing::info!("all entries sent");
695 });
696
697 let mut result = Vec::new();
703 while let Some(chunk) = rx.recv().await {
704 result.extend(chunk);
705 }
706
707 let pack_size = result.len();
708 let compression_rate = if total_original_size > 0 {
709 1.0 - (pack_size as f64 / total_original_size as f64)
710 } else {
711 0.0
712 };
713
714 let duration = start.elapsed();
715 tracing::info!("test executed in: {:.2?}", duration);
716 tracing::info!("new pack file size: {}", pack_size);
717 tracing::info!("original total size: {}", total_original_size);
718 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
719 tracing::info!(
720 "space saved: {} bytes",
721 total_original_size.saturating_sub(pack_size)
722 );
723 }
724
725 #[tokio::test]
726 async fn test_pack_encoder_with_zstdelta() {
727 init_logger();
728 let entries = get_entries_for_test().await;
729 let entries_number = entries.lock().await.len();
730
731 let total_original_size: usize = entries
732 .lock()
733 .await
734 .iter()
735 .map(|entry| entry.data.len())
736 .sum();
737
738 let start = Instant::now();
739 let (tx, mut rx) = mpsc::channel(100_000);
740 let (entry_tx, entry_rx) = mpsc::channel::<Entry>(100_000);
741
742 let encoder = PackEncoder::new(entries_number, 10, tx);
743 encoder.encode_async_with_zstdelta(entry_rx).await.unwrap();
744
745 tokio::spawn(async move {
747 let entries = entries.lock().await;
748 for entry in entries.iter() {
749 entry_tx.send(entry.clone()).await.unwrap();
750 }
751 drop(entry_tx);
752 tracing::info!("all entries sent");
753 });
754
755 let mut result = Vec::new();
756 while let Some(chunk) = rx.recv().await {
757 result.extend(chunk);
758 }
759
760 let pack_size = result.len();
761 let compression_rate = if total_original_size > 0 {
762 1.0 - (pack_size as f64 / total_original_size as f64)
763 } else {
764 0.0
765 };
766
767 let duration = start.elapsed();
768 tracing::info!("test executed in: {:.2?}", duration);
769 tracing::info!("new pack file size: {}", pack_size);
770 tracing::info!("original total size: {}", total_original_size);
771 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
772 tracing::info!(
773 "space saved: {} bytes",
774 total_original_size.saturating_sub(pack_size)
775 );
776
777 check_format(&result);
779 }
780
781 #[test]
782 fn test_encode_offset() {
783 let value = 16389;
785
786 let data = encode_offset(value);
787 println!("{data:?}");
788 let mut reader = Cursor::new(data);
789 let (result, _) = read_offset_encoding(&mut reader).unwrap();
790 println!("result: {result}");
791 assert_eq!(result, value as u64);
792 }
793
794 #[tokio::test]
795 async fn test_pack_encoder_large_file_with_delta() {
796 init_logger();
797 let entries = get_entries_for_test().await;
798 let entries_number = entries.lock().await.len();
799
800 let total_original_size: usize = entries
801 .lock()
802 .await
803 .iter()
804 .map(|entry| entry.data.len())
805 .sum();
806
807 let (tx, mut rx) = mpsc::channel(100_000);
808 let (entry_tx, entry_rx) = mpsc::channel::<Entry>(100_000);
809
810 let encoder = PackEncoder::new(entries_number, 10, tx);
811
812 let start = Instant::now(); encoder.encode_async(entry_rx).await.unwrap();
814
815 tokio::spawn(async move {
817 let entries = entries.lock().await;
818 for entry in entries.iter() {
819 entry_tx.send(entry.clone()).await.unwrap();
820 }
821 drop(entry_tx);
822 tracing::info!("all entries sent");
823 });
824
825 let mut result = Vec::new();
826 while let Some(chunk) = rx.recv().await {
827 result.extend(chunk);
828 }
829
830 let pack_size = result.len();
831 let compression_rate = if total_original_size > 0 {
832 1.0 - (pack_size as f64 / total_original_size as f64)
833 } else {
834 0.0
835 };
836
837 let duration = start.elapsed();
838 tracing::info!("test executed in: {:.2?}", duration);
839 tracing::info!("new pack file size: {}", pack_size);
840 tracing::info!("original total size: {}", total_original_size);
841 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
842 tracing::info!(
843 "space saved: {} bytes",
844 total_original_size.saturating_sub(pack_size)
845 );
846
847 check_format(&result);
849 }
850}