1use std::cmp::Ordering;
2use std::collections::VecDeque;
3use std::io::Write;
4
5use crate::delta;
6use crate::zstdelta;
7use crate::internal::object::types::ObjectType;
8use crate::time_it;
9use crate::{errors::GitError, hash::SHA1, internal::pack::entry::Entry};
10use ahash::AHasher;
11use flate2::write::ZlibEncoder;
12use rayon::prelude::*;
13use sha1::{Digest, Sha1};
14use std::hash::{Hash, Hasher};
15use tokio::sync::mpsc;
16use tokio::task::JoinHandle;
17
18const MAX_CHAIN_LEN: usize = 50;
19const MIN_DELTA_RATE: f64 = 0.5; pub struct PackEncoder {
24 object_number: usize,
25 process_index: usize,
26 window_size: usize,
27 sender: Option<mpsc::Sender<Vec<u8>>>,
29 inner_offset: usize, inner_hash: Sha1, final_hash: Option<SHA1>,
32 start_encoding: bool,
33}
34
35fn encode_header(object_number: usize) -> Vec<u8> {
38 let mut result: Vec<u8> = vec![
39 b'P', b'A', b'C', b'K', 0, 0, 0, 2, ];
42 assert_ne!(object_number, 0); assert!(object_number < (1 << 32));
44 result.append((object_number as u32).to_be_bytes().to_vec().as_mut()); result
47}
48
49fn encode_offset(mut value: usize) -> Vec<u8> {
51 assert_ne!(value, 0, "offset can't be zero");
52 let mut bytes = Vec::new();
53
54 bytes.push((value & 0x7F) as u8);
55 value >>= 7;
56 while value != 0 {
57 value -= 1;
58 let byte = (value & 0x7F) as u8 | 0x80; value >>= 7;
60 bytes.push(byte);
61 }
62 bytes.reverse();
63 bytes
64}
65
66fn encode_one_object(entry: &Entry, offset: Option<usize>) -> Result<Vec<u8>, GitError> {
69 let obj_data = &entry.data;
71 let obj_data_len = obj_data.len();
72 let obj_type_number = entry.obj_type.to_u8();
73
74 let mut encoded_data = Vec::new();
75
76 let mut header_data = vec![(0x80 | (obj_type_number << 4)) + (obj_data_len & 0x0f) as u8];
78 let mut size = obj_data_len >> 4; if size > 0 {
80 while size > 0 {
81 if size >> 7 > 0 {
82 header_data.push((0x80 | size) as u8);
83 size >>= 7;
84 } else {
85 header_data.push(size as u8);
86 break;
87 }
88 }
89 } else {
90 header_data.push(0);
91 }
92 encoded_data.extend(header_data);
93
94 if entry.obj_type == ObjectType::OffsetDelta || entry.obj_type == ObjectType::OffsetZstdelta {
96 let offset_data = encode_offset(offset.unwrap());
97 encoded_data.extend(offset_data);
98 } else if entry.obj_type == ObjectType::HashDelta {
99 unreachable!("unsupported type")
100 }
101
102 let mut inflate = ZlibEncoder::new(Vec::new(), flate2::Compression::default());
104 inflate
105 .write_all(obj_data)
106 .expect("zlib compress should never failed");
107 inflate.flush().expect("zlib flush should never failed");
108 let compressed_data = inflate.finish().expect("zlib compress should never failed");
109 encoded_data.extend(compressed_data);
111 Ok(encoded_data)
112}
113
114fn magic_sort(a: &Entry, b: &Entry) -> Ordering {
115 let ord = b.data.len().cmp(&a.data.len());
126 if ord != Ordering::Equal {
127 return ord;
128 }
129
130 (a as *const Entry).cmp(&(b as *const Entry))
132}
133
134fn calc_hash(data: &[u8]) -> u64 {
135 let mut hasher = AHasher::default();
136 data.hash(&mut hasher);
137 hasher.finish()
138}
139
140fn cheap_similar(a: &[u8], b: &[u8]) -> bool {
141 let k = a.len().min(b.len()).min(128);
142 if k == 0 {
143 return false;
144 }
145 calc_hash(&a[..k]) == calc_hash(&b[..k])
146}
147
148impl PackEncoder {
149 pub fn new(object_number: usize, window_size: usize, sender: mpsc::Sender<Vec<u8>>) -> Self {
150 PackEncoder {
151 object_number,
152 window_size,
153 process_index: 0,
154 sender: Some(sender),
156 inner_offset: 12, inner_hash: Sha1::new(),
158 final_hash: None,
159 start_encoding: false,
160 }
161 }
162
163 pub fn drop_sender(&mut self) {
164 self.sender.take(); }
166
167 pub async fn send_data(&mut self, data: Vec<u8>) {
168 if let Some(sender) = &self.sender {
169 sender.send(data).await.unwrap();
170 }
171 }
172
173 pub fn get_hash(&self) -> Option<SHA1> {
175 self.final_hash
176 }
177
178 pub async fn encode(&mut self, entry_rx: mpsc::Receiver<Entry>) -> Result<(), GitError> {
186 self.inner_encode(entry_rx, false).await
187 }
188
189 pub async fn encode_with_zstdelta(
190 &mut self,
191 entry_rx: mpsc::Receiver<Entry>,
192 ) -> Result<(), GitError> {
193 self.inner_encode(entry_rx, true).await
194 }
195
196 async fn inner_encode(
199 &mut self,
200 mut entry_rx: mpsc::Receiver<Entry>,
201 enable_zstdelta: bool,
202 ) -> Result<(), GitError> {
203 let head = encode_header(self.object_number);
204 self.send_data(head.clone()).await;
205 self.inner_hash.update(&head);
206
207 if self.start_encoding {
209 return Err(GitError::PackEncodeError(
210 "encoding operation is already in progress".to_string(),
211 ));
212 }
213
214 let mut commits: Vec<Entry> = Vec::new();
215 let mut trees: Vec<Entry> = Vec::new();
216 let mut blobs: Vec<Entry> = Vec::new();
217 let mut tags: Vec<Entry> = Vec::new();
218 while let Some(entry) = entry_rx.recv().await {
219 match entry.obj_type {
220 ObjectType::Commit => {
221 commits.push(entry);
222 }
223 ObjectType::Tree => {
224 trees.push(entry);
225 }
226 ObjectType::Blob => {
227 blobs.push(entry);
228 }
229 ObjectType::Tag => {
230 tags.push(entry);
231 }
232 _ => {}
233 }
234 }
235
236 commits.sort_by(magic_sort);
237 trees.sort_by(magic_sort);
238 blobs.sort_by(magic_sort);
239 tags.sort_by(magic_sort);
240 tracing::info!(
241 "numbers : commits: {:?} trees: {:?} blobs:{:?} tag :{:?}",
242 commits.len(),
243 trees.len(),
244 blobs.len(),
245 tags.len()
246 );
247
248 let (commit_results, tree_results, blob_results, tag_results) = tokio::try_join!(
250 tokio::task::spawn_blocking(move || {
251 Self::try_as_offset_delta(commits, 10, enable_zstdelta)
252 }),
253 tokio::task::spawn_blocking(move || {
254 Self::try_as_offset_delta(trees, 10, enable_zstdelta)
255 }),
256 tokio::task::spawn_blocking(move || {
257 Self::try_as_offset_delta(blobs, 10, enable_zstdelta)
258 }),
259 tokio::task::spawn_blocking(move || {
260 Self::try_as_offset_delta(tags, 10, enable_zstdelta)
261 }),
262 )
263 .map_err(|e| GitError::PackEncodeError(format!("Task join error: {e}")))?;
264
265 let all_encoded_data = [
266 commit_results
267 .map_err(|e| GitError::PackEncodeError(format!("Commit encoding error: {e}")))?,
268 tree_results
269 .map_err(|e| GitError::PackEncodeError(format!("Tree encoding error: {e}")))?,
270 blob_results
271 .map_err(|e| GitError::PackEncodeError(format!("Blob encoding error: {e}")))?,
272 tag_results
273 .map_err(|e| GitError::PackEncodeError(format!("Tag encoding error: {e}")))?,
274 ]
275 .concat();
276
277 for data in all_encoded_data {
279 self.write_all_and_update(&data).await;
280 }
281
282 let hash_result = self.inner_hash.clone().finalize();
284 self.final_hash = Some(SHA1::from_bytes(&hash_result));
285 self.send_data(hash_result.to_vec()).await;
286
287 self.drop_sender();
288 Ok(())
289 }
290
291 fn try_as_offset_delta(
299 mut bucket: Vec<Entry>,
300 window_size: usize,
301 enable_zstdelta: bool,
302 ) -> Result<Vec<Vec<u8>>, GitError> {
303 let mut current_offset = 0usize;
304 let mut window: VecDeque<(Entry, usize)> = VecDeque::with_capacity(window_size);
305 let mut res: Vec<Vec<u8>> = Vec::new();
306
307 for entry in bucket.iter_mut() {
308 let mut best_base: Option<&(Entry, usize)> = None;
311 let mut best_rate: f64 = 0.0;
312 let tie_epsilon: f64 = 0.15;
313
314 let candidates: Vec<_> = window
315 .par_iter()
316 .with_min_len(3)
317 .filter_map(|try_base| {
318 if try_base.0.obj_type != entry.obj_type {
319 return None;
320 }
321
322 if try_base.0.chain_len >= MAX_CHAIN_LEN {
323 return None;
324 }
325
326 if try_base.0.hash == entry.hash {
327 return None;
328 }
329
330 let sym_ratio = (try_base.0.data.len().min(entry.data.len()) as f64)
331 / (try_base.0.data.len().max(entry.data.len()) as f64);
332 if sym_ratio < 0.5 {
333 return None;
334 }
335
336 if !cheap_similar(&try_base.0.data, &entry.data) {
337 return None;
338 }
339
340 let rate = if (try_base.0.data.len() + entry.data.len()) / 2 > 64 {
341 delta::heuristic_encode_rate_parallel(&try_base.0.data, &entry.data)
342 } else {
343 delta::encode_rate(&try_base.0.data, &entry.data)
344 };
347
348 if rate > MIN_DELTA_RATE {
349 Some((rate, try_base))
350 } else {
351 None
352 }
353 })
354 .collect();
355
356 for (rate, try_base) in candidates {
357 match best_base {
358 None => {
359 best_rate = rate;
360 best_base = Some(try_base);
362 }
363 Some(best_base_ref) => {
364 let is_better = if rate > best_rate + tie_epsilon {
365 true
366 } else if (rate - best_rate).abs() <= tie_epsilon {
367 try_base.0.chain_len > best_base_ref.0.chain_len
368 } else {
369 false
370 };
371
372 if is_better {
373 best_rate = rate;
374 best_base = Some(try_base);
375 }
376 }
377 }
378 }
379
380 let mut entry_for_window = entry.clone();
381
382 let offset = best_base.map(|best_base| {
383 let delta = if enable_zstdelta {
384 entry.obj_type = ObjectType::OffsetZstdelta;
385 zstdelta::diff(&best_base.0.data, &entry.data)
386 .map_err(|e| {
387 GitError::DeltaObjectError(format!("zstdelta diff failed: {e}"))
388 })
389 .unwrap()
390 } else {
391 entry.obj_type = ObjectType::OffsetDelta;
392 delta::encode(&best_base.0.data, &entry.data)
393 };
394 entry.data = delta;
396 entry.chain_len = best_base.0.chain_len + 1;
397 current_offset - best_base.1
398 });
399
400 entry_for_window.chain_len = entry.chain_len;
401 let obj_data = encode_one_object(entry, offset)?;
402 window.push_back((entry_for_window, current_offset));
403 if window.len() > window_size {
404 window.pop_front();
405 }
406 current_offset += obj_data.len();
407 res.push(obj_data);
408 }
409 Ok(res)
410 }
411
412 pub async fn parallel_encode(
414 &mut self,
415 mut entry_rx: mpsc::Receiver<Entry>,
416 ) -> Result<(), GitError> {
417 if self.window_size != 0 {
418 return Err(GitError::PackEncodeError(
419 "parallel encode only works when window_size == 0".to_string(),
420 ));
421 }
422
423 let head = encode_header(self.object_number);
424 self.send_data(head.clone()).await;
425 self.inner_hash.update(&head);
426
427 if self.start_encoding {
429 return Err(GitError::PackEncodeError(
430 "encoding operation is already in progress".to_string(),
431 ));
432 }
433
434 let batch_size = usize::max(1000, entry_rx.max_capacity() / 10); tracing::info!("encode with batch size: {}", batch_size);
436 loop {
437 let mut batch_entries = Vec::with_capacity(batch_size);
438 time_it!("parallel encode: receive batch", {
439 for _ in 0..batch_size {
440 match entry_rx.recv().await {
441 Some(entry) => {
442 batch_entries.push(entry);
443 self.process_index += 1;
444 }
445 None => break,
446 }
447 }
448 });
449
450 if batch_entries.is_empty() {
451 break;
452 }
453
454 let batch_result: Vec<Vec<u8>> = time_it!("parallel encode: encode batch", {
456 batch_entries
457 .par_iter()
458 .map(|entry| encode_one_object(entry, None).unwrap())
459 .collect()
460 });
461
462 time_it!("parallel encode: write batch", {
463 for obj_data in batch_result {
464 self.write_all_and_update(&obj_data).await;
465 }
466 });
467 }
468
469 if self.process_index != self.object_number {
470 panic!(
471 "not all objects are encoded, process:{}, total:{}",
472 self.process_index, self.object_number
473 );
474 }
475
476 let hash_result = self.inner_hash.clone().finalize();
478 self.final_hash = Some(SHA1::from_bytes(&hash_result));
479 self.send_data(hash_result.to_vec()).await;
480 self.drop_sender();
481 Ok(())
482 }
483
484 async fn write_all_and_update(&mut self, data: &[u8]) {
486 self.inner_hash.update(data);
487 self.inner_offset += data.len();
488 self.send_data(data.to_vec()).await;
489 }
490
491 pub async fn encode_async(
497 mut self,
498 rx: mpsc::Receiver<Entry>,
499 ) -> Result<JoinHandle<()>, GitError> {
500 Ok(tokio::spawn(async move {
501 if self.window_size == 0 {
502 self.parallel_encode(rx).await.unwrap()
503 } else {
504 self.encode(rx).await.unwrap()
505 }
506 }))
507 }
508
509 pub async fn encode_async_with_zstdelta(
510 mut self,
511 rx: mpsc::Receiver<Entry>,
512 ) -> Result<JoinHandle<()>, GitError> {
513 Ok(tokio::spawn(async move {
514 self.encode_with_zstdelta(rx).await.unwrap()
516 }))
517 }
518}
519
520#[cfg(test)]
521mod tests {
522 use std::env;
523 use std::sync::Arc;
524 use std::time::Instant;
525 use std::{io::Cursor, path::PathBuf};
526 use tokio::sync::Mutex;
527
528 use crate::internal::object::blob::Blob;
529 use crate::internal::pack::utils::read_offset_encoding;
530 use crate::internal::pack::{Pack, tests::init_logger};
531 use crate::time_it;
532
533 use super::*;
534
535 fn check_format(data: &Vec<u8>) {
536 let mut p = Pack::new(
537 None,
538 Some(1024 * 1024 * 1024 * 6), Some(PathBuf::from("/tmp/.cache_temp")),
540 true,
541 );
542 let mut reader = Cursor::new(data);
543 tracing::debug!("start check format");
544 p.decode(&mut reader, |_, _| {})
545 .expect("pack file format error");
546 }
547
548 #[tokio::test]
549 async fn test_pack_encoder() {
550 async fn encode_once(window_size: usize) -> Vec<u8> {
551 let (tx, mut rx) = mpsc::channel(100);
552 let (entry_tx, entry_rx) = mpsc::channel::<Entry>(1);
553
554 let str_vec = vec!["hello, word", "hello, world.", "!", "123141251251"];
556 let encoder = PackEncoder::new(str_vec.len(), window_size, tx);
557 encoder.encode_async(entry_rx).await.unwrap();
558
559 for str in str_vec {
560 let blob = Blob::from_content(str);
561 let entry: Entry = blob.into();
562 entry_tx.send(entry).await.unwrap();
563 }
564 drop(entry_tx);
565 let mut result = Vec::new();
567 while let Some(chunk) = rx.recv().await {
568 result.extend(chunk);
569 }
570 result
571 }
572
573 let pack_without_delta = encode_once(0).await;
575 let pack_without_delta_size = pack_without_delta.len();
576 check_format(&pack_without_delta);
577
578 let pack_with_delta = encode_once(4).await;
580 assert!(pack_with_delta.len() <= pack_without_delta_size);
581 check_format(&pack_with_delta);
582 }
583
584 async fn get_entries_for_test() -> Arc<Mutex<Vec<Entry>>> {
585 let mut source = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
586 source.push("tests/data/packs/pack-f8bbb573cef7d851957caceb491c073ee8e8de41.pack");
587 let mut p = Pack::new(None, None, Some(PathBuf::from("/tmp/.cache_temp")), true);
593
594 let f = std::fs::File::open(source).unwrap();
595 tracing::info!("pack file size: {}", f.metadata().unwrap().len());
596 let mut reader = std::io::BufReader::new(f);
597 let entries = Arc::new(Mutex::new(Vec::new()));
598 let entries_clone = entries.clone();
599 p.decode(&mut reader, move |entry, _| {
600 let mut entries = entries_clone.blocking_lock();
601 entries.push(entry);
602 })
603 .unwrap();
604 assert_eq!(p.number, entries.lock().await.len());
605 tracing::info!("total entries: {}", p.number);
606 drop(p);
607
608 entries
609 }
610
611 #[tokio::test]
612 async fn test_pack_encoder_parallel_large_file() {
613 init_logger();
614
615 let start = Instant::now();
616 let entries = get_entries_for_test().await;
617 let entries_number = entries.lock().await.len();
618
619 let total_original_size: usize = entries
620 .lock()
621 .await
622 .iter()
623 .map(|entry| entry.data.len())
624 .sum();
625
626 let (tx, mut rx) = mpsc::channel(1_000_000);
628 let (entry_tx, entry_rx) = mpsc::channel::<Entry>(1_000_000);
629
630 let mut encoder = PackEncoder::new(entries_number, 0, tx);
631 tokio::spawn(async move {
632 time_it!("test parallel encode", {
633 encoder.parallel_encode(entry_rx).await.unwrap();
634 });
635 });
636
637 tokio::spawn(async move {
639 let entries = entries.lock().await;
640 for entry in entries.iter() {
641 entry_tx.send(entry.clone()).await.unwrap();
642 }
643 drop(entry_tx);
644 tracing::info!("all entries sent");
645 });
646
647 let mut result = Vec::new();
648 while let Some(chunk) = rx.recv().await {
649 result.extend(chunk);
650 }
651
652 let pack_size = result.len();
653 let compression_rate = if total_original_size > 0 {
654 1.0 - (pack_size as f64 / total_original_size as f64)
655 } else {
656 0.0
657 };
658
659 let duration = start.elapsed();
660 tracing::info!("test executed in: {:.2?}", duration);
661 tracing::info!("new pack file size: {}", result.len());
662 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
663 check_format(&result);
665 }
666
667 #[tokio::test]
668 async fn test_pack_encoder_large_file() {
669 init_logger();
670 let entries = get_entries_for_test().await;
671 let entries_number = entries.lock().await.len();
672
673 let total_original_size: usize = entries
674 .lock()
675 .await
676 .iter()
677 .map(|entry| entry.data.len())
678 .sum();
679
680 let start = Instant::now();
681 let (tx, mut rx) = mpsc::channel(100_000);
683 let (entry_tx, entry_rx) = mpsc::channel::<Entry>(100_000);
684
685 let mut encoder = PackEncoder::new(entries_number, 0, tx);
686 tokio::spawn(async move {
687 time_it!("test encode no parallel", {
688 encoder.encode(entry_rx).await.unwrap();
689 });
690 });
691
692 tokio::spawn(async move {
694 let entries = entries.lock().await;
695 for entry in entries.iter() {
696 entry_tx.send(entry.clone()).await.unwrap();
697 }
698 drop(entry_tx);
699 tracing::info!("all entries sent");
700 });
701
702 let mut result = Vec::new();
708 while let Some(chunk) = rx.recv().await {
709 result.extend(chunk);
710 }
711
712 let pack_size = result.len();
713 let compression_rate = if total_original_size > 0 {
714 1.0 - (pack_size as f64 / total_original_size as f64)
715 } else {
716 0.0
717 };
718
719 let duration = start.elapsed();
720 tracing::info!("test executed in: {:.2?}", duration);
721 tracing::info!("new pack file size: {}", pack_size);
722 tracing::info!("original total size: {}", total_original_size);
723 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
724 tracing::info!(
725 "space saved: {} bytes",
726 total_original_size.saturating_sub(pack_size)
727 );
728 }
729
730 #[tokio::test]
731 async fn test_pack_encoder_with_zstdelta() {
732 init_logger();
733 let entries = get_entries_for_test().await;
734 let entries_number = entries.lock().await.len();
735
736 let total_original_size: usize = entries
737 .lock()
738 .await
739 .iter()
740 .map(|entry| entry.data.len())
741 .sum();
742
743 let start = Instant::now();
744 let (tx, mut rx) = mpsc::channel(100_000);
745 let (entry_tx, entry_rx) = mpsc::channel::<Entry>(100_000);
746
747 let encoder = PackEncoder::new(entries_number, 10, tx);
748 encoder.encode_async_with_zstdelta(entry_rx).await.unwrap();
749
750 tokio::spawn(async move {
752 let entries = entries.lock().await;
753 for entry in entries.iter() {
754 entry_tx.send(entry.clone()).await.unwrap();
755 }
756 drop(entry_tx);
757 tracing::info!("all entries sent");
758 });
759
760 let mut result = Vec::new();
761 while let Some(chunk) = rx.recv().await {
762 result.extend(chunk);
763 }
764
765 let pack_size = result.len();
766 let compression_rate = if total_original_size > 0 {
767 1.0 - (pack_size as f64 / total_original_size as f64)
768 } else {
769 0.0
770 };
771
772 let duration = start.elapsed();
773 tracing::info!("test executed in: {:.2?}", duration);
774 tracing::info!("new pack file size: {}", pack_size);
775 tracing::info!("original total size: {}", total_original_size);
776 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
777 tracing::info!(
778 "space saved: {} bytes",
779 total_original_size.saturating_sub(pack_size)
780 );
781
782 check_format(&result);
784 }
785
786 #[test]
787 fn test_encode_offset() {
788 let value = 16389;
790
791 let data = encode_offset(value);
792 println!("{data:?}");
793 let mut reader = Cursor::new(data);
794 let (result, _) = read_offset_encoding(&mut reader).unwrap();
795 println!("result: {result}");
796 assert_eq!(result, value as u64);
797 }
798
799 #[tokio::test]
800 async fn test_pack_encoder_large_file_with_delta() {
801 init_logger();
802 let entries = get_entries_for_test().await;
803 let entries_number = entries.lock().await.len();
804
805 let total_original_size: usize = entries
806 .lock()
807 .await
808 .iter()
809 .map(|entry| entry.data.len())
810 .sum();
811
812 let (tx, mut rx) = mpsc::channel(100_000);
813 let (entry_tx, entry_rx) = mpsc::channel::<Entry>(100_000);
814
815 let encoder = PackEncoder::new(entries_number, 10, tx);
816
817 let start = Instant::now(); encoder.encode_async(entry_rx).await.unwrap();
819
820 tokio::spawn(async move {
822 let entries = entries.lock().await;
823 for entry in entries.iter() {
824 entry_tx.send(entry.clone()).await.unwrap();
825 }
826 drop(entry_tx);
827 tracing::info!("all entries sent");
828 });
829
830 let mut result = Vec::new();
831 while let Some(chunk) = rx.recv().await {
832 result.extend(chunk);
833 }
834
835 let pack_size = result.len();
836 let compression_rate = if total_original_size > 0 {
837 1.0 - (pack_size as f64 / total_original_size as f64)
838 } else {
839 0.0
840 };
841
842 let duration = start.elapsed();
843 tracing::info!("test executed in: {:.2?}", duration);
844 tracing::info!("new pack file size: {}", pack_size);
845 tracing::info!("original total size: {}", total_original_size);
846 tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
847 tracing::info!(
848 "space saved: {} bytes",
849 total_original_size.saturating_sub(pack_size)
850 );
851
852 check_format(&result);
854 }
855}