git_internal/internal/pack/
encode.rs

1use std::cmp::Ordering;
2use std::collections::VecDeque;
3use std::io::Write;
4
5use crate::delta;
6use crate::internal::metadata::{EntryMeta, MetaAttached};
7use crate::internal::object::types::ObjectType;
8use crate::time_it;
9use crate::zstdelta;
10use crate::{
11    errors::GitError, hash::ObjectHash, internal::pack::entry::Entry, utils::HashAlgorithm,
12};
13use ahash::AHasher;
14use flate2::write::ZlibEncoder;
15use natord::compare;
16use rayon::prelude::*;
17
18use std::hash::{Hash, Hasher};
19use std::path::Path;
20use tokio::sync::mpsc;
21use tokio::task::JoinHandle;
22
23const MAX_CHAIN_LEN: usize = 50;
24const MIN_DELTA_RATE: f64 = 0.5; // minimum delta rate
25//const MAX_ZSTDELTA_CHAIN_LEN: usize = 50;
26
27/// A encoder for generating pack files with delta objects.
28pub struct PackEncoder {
29    object_number: usize,
30    process_index: usize,
31    window_size: usize,
32    // window: VecDeque<(Entry, usize)>, // entry and offset
33    sender: Option<mpsc::Sender<Vec<u8>>>,
34    inner_offset: usize,       // offset of current entry
35    inner_hash: HashAlgorithm, // introduce different hash algorithm
36    final_hash: Option<ObjectHash>,
37    start_encoding: bool,
38}
39
40/// Encode header of pack file (12 byte)<br>
41/// Content: 'PACK', Version(2), number of objects
42fn encode_header(object_number: usize) -> Vec<u8> {
43    let mut result: Vec<u8> = vec![
44        b'P', b'A', b'C', b'K', // The logotype of the Pack File
45        0, 0, 0, 2, // generates version 2 only.
46    ];
47    assert_ne!(object_number, 0); // guarantee self.number_of_objects!=0
48    assert!(object_number < (1 << 32));
49    //TODO: GitError:numbers of objects should < 4G ,
50    result.append((object_number as u32).to_be_bytes().to_vec().as_mut()); // to 4 bytes (network byte order aka. big-endian)
51    result
52}
53
54/// Encode offset of delta object
55fn encode_offset(mut value: usize) -> Vec<u8> {
56    assert_ne!(value, 0, "offset can't be zero");
57    let mut bytes = Vec::new();
58
59    bytes.push((value & 0x7F) as u8);
60    value >>= 7;
61    while value != 0 {
62        value -= 1;
63        let byte = (value & 0x7F) as u8 | 0x80; // set first bit one
64        value >>= 7;
65        bytes.push(byte);
66    }
67    bytes.reverse();
68    bytes
69}
70
71/// Encode one object, and update the hash
72/// @offset: offset of this object if it's a delta object. For other object, it's None
73fn encode_one_object(entry: &Entry, offset: Option<usize>) -> Result<Vec<u8>, GitError> {
74    // try encode as delta
75    let obj_data = &entry.data;
76    let obj_data_len = obj_data.len();
77    let obj_type_number = entry.obj_type.to_u8();
78
79    let mut encoded_data = Vec::new();
80
81    // **header** encoding
82    let mut header_data = vec![(0x80 | (obj_type_number << 4)) + (obj_data_len & 0x0f) as u8];
83    let mut size = obj_data_len >> 4; // 4 bit has been used in first byte
84    if size > 0 {
85        while size > 0 {
86            if size >> 7 > 0 {
87                header_data.push((0x80 | size) as u8);
88                size >>= 7;
89            } else {
90                header_data.push(size as u8);
91                break;
92            }
93        }
94    } else {
95        header_data.push(0);
96    }
97    encoded_data.extend(header_data);
98
99    // **offset** encoding
100    if entry.obj_type == ObjectType::OffsetDelta || entry.obj_type == ObjectType::OffsetZstdelta {
101        let offset_data = encode_offset(offset.unwrap());
102        encoded_data.extend(offset_data);
103    } else if entry.obj_type == ObjectType::HashDelta {
104        unreachable!("unsupported type")
105    }
106
107    // **data** encoding, need zlib compress
108    let mut inflate = ZlibEncoder::new(Vec::new(), flate2::Compression::default());
109    inflate
110        .write_all(obj_data)
111        .expect("zlib compress should never failed");
112    inflate.flush().expect("zlib flush should never failed");
113    let compressed_data = inflate.finish().expect("zlib compress should never failed");
114    // self.write_all_and_update(&compressed_data).await;
115    encoded_data.extend(compressed_data);
116    Ok(encoded_data)
117}
118
119fn magic_sort(a: &MetaAttached<Entry, EntryMeta>, b: &MetaAttached<Entry, EntryMeta>) -> Ordering {
120    let path_a = a.meta.file_path.as_ref();
121    let path_b = b.meta.file_path.as_ref();
122
123    // 1. Handle path existence: entries with paths sort first
124    match (path_a, path_b) {
125        (Some(pa), Some(pb)) => {
126            let pa = Path::new(pa);
127            let pb = Path::new(pb);
128
129            // 1. Compare parent directory paths
130            let dir_ord = pa.parent().cmp(&pb.parent());
131            if dir_ord != Ordering::Equal {
132                return dir_ord;
133            }
134
135            // 2. Compare filenames (natural sort)
136            let name_a = pa.file_name().unwrap_or_default().to_string_lossy();
137            let name_b = pb.file_name().unwrap_or_default().to_string_lossy();
138            let name_ord = compare(&name_a, &name_b);
139            if name_ord != Ordering::Equal {
140                return name_ord;
141            }
142        }
143        (Some(_), None) => return Ordering::Less, // entries with paths sort first
144        (None, Some(_)) => return Ordering::Greater, // entries without paths sort last
145        (None, None) => {}
146    }
147
148    // let ord = b.obj_type.to_u8().cmp(&a.obj_type.to_u8());
149    // if ord != Ordering::Equal {
150    //     return ord;
151    // }
152
153    // the hash should be file hash not content hash
154    // todo the feature need larger refactor
155    // let ord = b.hash.cmp(&a.hash);
156    // if ord != Ordering::Equal { return ord; }
157
158    let ord = b.inner.data.len().cmp(&a.inner.data.len());
159    if ord != Ordering::Equal {
160        return ord;
161    }
162
163    // fallback pointer order (newest first)
164    (a as *const MetaAttached<Entry, EntryMeta>).cmp(&(b as *const MetaAttached<Entry, EntryMeta>))
165}
166
167fn calc_hash(data: &[u8]) -> u64 {
168    let mut hasher = AHasher::default();
169    data.hash(&mut hasher);
170    hasher.finish()
171}
172
173fn cheap_similar(a: &[u8], b: &[u8]) -> bool {
174    let k = a.len().min(b.len()).min(128);
175    if k == 0 {
176        return false;
177    }
178    calc_hash(&a[..k]) == calc_hash(&b[..k])
179}
180
181impl PackEncoder {
182    pub fn new(object_number: usize, window_size: usize, sender: mpsc::Sender<Vec<u8>>) -> Self {
183        PackEncoder {
184            object_number,
185            window_size,
186            process_index: 0,
187            // window: VecDeque::with_capacity(window_size),
188            sender: Some(sender),
189            inner_offset: 12, // start  after 12 bytes pack header(signature + version + object count).
190            inner_hash: HashAlgorithm::new(), // introduce different hash algorithm
191            final_hash: None,
192            start_encoding: false,
193        }
194    }
195
196    pub fn drop_sender(&mut self) {
197        self.sender.take(); // Take the sender out, dropping it
198    }
199
200    pub async fn send_data(&mut self, data: Vec<u8>) {
201        if let Some(sender) = &self.sender {
202            sender.send(data).await.unwrap();
203        }
204    }
205
206    /// Get the hash of the pack file. if the pack file is not finished, return None
207    pub fn get_hash(&self) -> Option<ObjectHash> {
208        self.final_hash
209    }
210
211    /// Encodes entries into a pack file with delta objects and outputs them through the specified writer.
212    /// # Arguments
213    /// - `rx` - A receiver channel (`mpsc::Receiver<Entry>`) from which entries to be encoded are received.
214    /// # Returns
215    /// Returns `Ok(())` if encoding is successful, or a `GitError` in case of failure.
216    /// - Returns a `GitError` if there is a failure during the encoding process.
217    /// - Returns `PackEncodeError` if an encoding operation is already in progress.
218    pub async fn encode(
219        &mut self,
220        entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
221    ) -> Result<(), GitError> {
222        self.inner_encode(entry_rx, false).await
223    }
224
225    pub async fn encode_with_zstdelta(
226        &mut self,
227        entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
228    ) -> Result<(), GitError> {
229        self.inner_encode(entry_rx, true).await
230    }
231
232    /// Delta selection heuristics are based on:
233    ///   https://github.com/git/git/blob/master/Documentation/technical/pack-heuristics.adoc
234    async fn inner_encode(
235        &mut self,
236        mut entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
237        enable_zstdelta: bool,
238    ) -> Result<(), GitError> {
239        let head = encode_header(self.object_number);
240        self.send_data(head.clone()).await;
241        self.inner_hash.update(&head);
242
243        // ensure only one decode can only invoke once
244        if self.start_encoding {
245            return Err(GitError::PackEncodeError(
246                "encoding operation is already in progress".to_string(),
247            ));
248        }
249
250        let mut commits: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
251        let mut trees: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
252        let mut blobs: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
253        let mut tags: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
254        while let Some(entry) = entry_rx.recv().await {
255            match entry.inner.obj_type {
256                ObjectType::Commit => {
257                    commits.push(entry);
258                }
259                ObjectType::Tree => {
260                    trees.push(entry);
261                }
262                ObjectType::Blob => {
263                    blobs.push(entry);
264                }
265                ObjectType::Tag => {
266                    tags.push(entry);
267                }
268                _ => {}
269            }
270        }
271
272        commits.sort_by(magic_sort);
273        trees.sort_by(magic_sort);
274        blobs.sort_by(magic_sort);
275        tags.sort_by(magic_sort);
276        tracing::info!(
277            "numbers :  commits: {:?} trees: {:?} blobs:{:?} tag :{:?}",
278            commits.len(),
279            trees.len(),
280            blobs.len(),
281            tags.len()
282        );
283
284        // parallel encoding vec with different object_type
285        let (commit_results, tree_results, blob_results, tag_results) = tokio::try_join!(
286            tokio::task::spawn_blocking(move || {
287                Self::try_as_offset_delta(
288                    commits
289                        .into_iter()
290                        .map(|entry_with_meta| entry_with_meta.inner)
291                        .collect(),
292                    10,
293                    enable_zstdelta,
294                )
295            }),
296            tokio::task::spawn_blocking(move || {
297                Self::try_as_offset_delta(
298                    trees
299                        .into_iter()
300                        .map(|entry_with_meta| entry_with_meta.inner)
301                        .collect(),
302                    10,
303                    enable_zstdelta,
304                )
305            }),
306            tokio::task::spawn_blocking(move || {
307                Self::try_as_offset_delta(
308                    blobs
309                        .into_iter()
310                        .map(|entry_with_meta| entry_with_meta.inner)
311                        .collect(),
312                    10,
313                    enable_zstdelta,
314                )
315            }),
316            tokio::task::spawn_blocking(move || {
317                Self::try_as_offset_delta(
318                    tags.into_iter()
319                        .map(|entry_with_meta| entry_with_meta.inner)
320                        .collect(),
321                    10,
322                    enable_zstdelta,
323                )
324            }),
325        )
326        .map_err(|e| GitError::PackEncodeError(format!("Task join error: {e}")))?;
327
328        let all_encoded_data = [
329            commit_results
330                .map_err(|e| GitError::PackEncodeError(format!("Commit encoding error: {e}")))?,
331            tree_results
332                .map_err(|e| GitError::PackEncodeError(format!("Tree encoding error: {e}")))?,
333            blob_results
334                .map_err(|e| GitError::PackEncodeError(format!("Blob encoding error: {e}")))?,
335            tag_results
336                .map_err(|e| GitError::PackEncodeError(format!("Tag encoding error: {e}")))?,
337        ]
338        .concat();
339
340        for data in all_encoded_data {
341            self.write_all_and_update(&data).await;
342        }
343
344        // Hash signature
345        let hash_result = self.inner_hash.clone().finalize();
346        self.final_hash = Some(ObjectHash::from_bytes(&hash_result).unwrap());
347        self.send_data(hash_result.to_vec()).await;
348
349        self.drop_sender();
350        Ok(())
351    }
352
353    /// Try to encode as delta using objects in window
354    /// delta & zstdelta have been gathered here
355    /// Refs: https://sapling-scm.com/docs/dev/internals/zstdelta/
356    /// the sliding window was moved here
357    /// # Returns
358    /// - Return (Vec<Vec<u8>) if success make delta
359    /// - Return (None) if didn't delta,
360    fn try_as_offset_delta(
361        mut bucket: Vec<Entry>,
362        window_size: usize,
363        enable_zstdelta: bool,
364    ) -> Result<Vec<Vec<u8>>, GitError> {
365        let mut current_offset = 0usize;
366        let mut window: VecDeque<(Entry, usize)> = VecDeque::with_capacity(window_size);
367        let mut res: Vec<Vec<u8>> = Vec::new();
368
369        for entry in bucket.iter_mut() {
370            //let entry_for_window = entry.clone();
371            // 每次循环重置最佳基对象选择
372            let mut best_base: Option<&(Entry, usize)> = None;
373            let mut best_rate: f64 = 0.0;
374            let tie_epsilon: f64 = 0.15;
375
376            let candidates: Vec<_> = window
377                .par_iter()
378                .with_min_len(3)
379                .filter_map(|try_base| {
380                    if try_base.0.obj_type != entry.obj_type {
381                        return None;
382                    }
383
384                    if try_base.0.chain_len >= MAX_CHAIN_LEN {
385                        return None;
386                    }
387
388                    if try_base.0.hash == entry.hash {
389                        return None;
390                    }
391
392                    let sym_ratio = (try_base.0.data.len().min(entry.data.len()) as f64)
393                        / (try_base.0.data.len().max(entry.data.len()) as f64);
394                    if sym_ratio < 0.5 {
395                        return None;
396                    }
397
398                    if !cheap_similar(&try_base.0.data, &entry.data) {
399                        return None;
400                    }
401
402                    let rate = if (try_base.0.data.len() + entry.data.len()) / 2 > 64 {
403                        delta::heuristic_encode_rate_parallel(&try_base.0.data, &entry.data)
404                    } else {
405                        delta::encode_rate(&try_base.0.data, &entry.data)
406                        // let try_delta_obj = zstdelta::diff(&try_base.0.data, &entry.data).unwrap();
407                        // 1.0 - try_delta_obj.len() as f64 / entry.data.len() as f64
408                    };
409
410                    if rate > MIN_DELTA_RATE {
411                        Some((rate, try_base))
412                    } else {
413                        None
414                    }
415                })
416                .collect();
417
418            for (rate, try_base) in candidates {
419                match best_base {
420                    None => {
421                        best_rate = rate;
422                        //best_base_offset = current_offset - try_base.1;
423                        best_base = Some(try_base);
424                    }
425                    Some(best_base_ref) => {
426                        let is_better = if rate > best_rate + tie_epsilon {
427                            true
428                        } else if (rate - best_rate).abs() <= tie_epsilon {
429                            try_base.0.chain_len > best_base_ref.0.chain_len
430                        } else {
431                            false
432                        };
433
434                        if is_better {
435                            best_rate = rate;
436                            best_base = Some(try_base);
437                        }
438                    }
439                }
440            }
441
442            let mut entry_for_window = entry.clone();
443
444            let offset = best_base.map(|best_base| {
445                let delta = if enable_zstdelta {
446                    entry.obj_type = ObjectType::OffsetZstdelta;
447                    zstdelta::diff(&best_base.0.data, &entry.data)
448                        .map_err(|e| {
449                            GitError::DeltaObjectError(format!("zstdelta diff failed: {e}"))
450                        })
451                        .unwrap()
452                } else {
453                    entry.obj_type = ObjectType::OffsetDelta;
454                    delta::encode(&best_base.0.data, &entry.data)
455                };
456                //entry.obj_type = ObjectType::OffsetDelta;
457                entry.data = delta;
458                entry.chain_len = best_base.0.chain_len + 1;
459                current_offset - best_base.1
460            });
461
462            entry_for_window.chain_len = entry.chain_len;
463            let obj_data = encode_one_object(entry, offset)?;
464            window.push_back((entry_for_window, current_offset));
465            if window.len() > window_size {
466                window.pop_front();
467            }
468            current_offset += obj_data.len();
469            res.push(obj_data);
470        }
471        Ok(res)
472    }
473
474    /// Parallel encode with rayon, only works when window_size == 0 (no delta)
475    pub async fn parallel_encode(
476        &mut self,
477        mut entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
478    ) -> Result<(), GitError> {
479        if self.window_size != 0 {
480            return Err(GitError::PackEncodeError(
481                "parallel encode only works when window_size == 0".to_string(),
482            ));
483        }
484
485        let head = encode_header(self.object_number);
486        self.send_data(head.clone()).await;
487        self.inner_hash.update(&head);
488
489        // ensure only one decode can only invoke once
490        if self.start_encoding {
491            return Err(GitError::PackEncodeError(
492                "encoding operation is already in progress".to_string(),
493            ));
494        }
495
496        let batch_size = usize::max(1000, entry_rx.max_capacity() / 10); // A temporary value, not optimized
497        tracing::info!("encode with batch size: {}", batch_size);
498        loop {
499            let mut batch_entries = Vec::with_capacity(batch_size);
500            time_it!("parallel encode: receive batch", {
501                for _ in 0..batch_size {
502                    match entry_rx.recv().await {
503                        Some(entry) => {
504                            batch_entries.push(entry.inner);
505                            self.process_index += 1;
506                        }
507                        None => break,
508                    }
509                }
510            });
511
512            if batch_entries.is_empty() {
513                break;
514            }
515
516            // use `collect` will return result in order, refs: https://github.com/rayon-rs/rayon/issues/551#issuecomment-371657900
517            let batch_result: Vec<Vec<u8>> = time_it!("parallel encode: encode batch", {
518                batch_entries
519                    .par_iter()
520                    .map(|entry| encode_one_object(entry, None).unwrap())
521                    .collect()
522            });
523
524            time_it!("parallel encode: write batch", {
525                for obj_data in batch_result {
526                    self.write_all_and_update(&obj_data).await;
527                }
528            });
529        }
530
531        if self.process_index != self.object_number {
532            panic!(
533                "not all objects are encoded, process:{}, total:{}",
534                self.process_index, self.object_number
535            );
536        }
537
538        // hash signature
539        let hash_result = self.inner_hash.clone().finalize();
540        self.final_hash = Some(ObjectHash::from_bytes(&hash_result).unwrap());
541        self.send_data(hash_result.to_vec()).await;
542        self.drop_sender();
543        Ok(())
544    }
545
546    /// Write data to writer and update hash & offset
547    async fn write_all_and_update(&mut self, data: &[u8]) {
548        self.inner_hash.update(data);
549        self.inner_offset += data.len();
550        self.send_data(data.to_vec()).await;
551    }
552
553    /// async version of encode, result data will be returned by JoinHandle.
554    /// It will consume PackEncoder, so you can't use it after calling this function.
555    /// when window_size = 0, it executes parallel_encode which retains stream transmission
556    /// when window_size = 0,it executes encode which uses magic sort and delta.
557    /// It seems that all other modules rely on this api
558    pub async fn encode_async(
559        mut self,
560        rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
561    ) -> Result<JoinHandle<()>, GitError> {
562        Ok(tokio::spawn(async move {
563            if self.window_size == 0 {
564                self.parallel_encode(rx).await.unwrap()
565            } else {
566                self.encode(rx).await.unwrap()
567            }
568        }))
569    }
570
571    pub async fn encode_async_with_zstdelta(
572        mut self,
573        rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
574    ) -> Result<JoinHandle<()>, GitError> {
575        Ok(tokio::spawn(async move {
576            // Do not use parallel encode with zstdelta because it make no sense.
577            self.encode_with_zstdelta(rx).await.unwrap()
578        }))
579    }
580}
581
582#[cfg(test)]
583mod tests {
584    use std::env;
585    use std::sync::Arc;
586    use std::time::Instant;
587    use std::{io::Cursor, path::PathBuf};
588    use tokio::sync::Mutex;
589
590    use super::*;
591    use crate::hash::{HashKind, ObjectHash, set_hash_kind_for_test};
592    use crate::internal::object::blob::Blob;
593    use crate::internal::pack::utils::read_offset_encoding;
594    use crate::internal::pack::{Pack, tests::init_logger};
595    use crate::time_it;
596
597    fn check_format(data: &Vec<u8>) {
598        let mut p = Pack::new(
599            None,
600            Some(1024 * 1024 * 1024 * 6), // 6GB
601            Some(PathBuf::from("/tmp/.cache_temp")),
602            true,
603        );
604        let mut reader = Cursor::new(data);
605        tracing::debug!("start check format");
606        p.decode(&mut reader, |_| {}, None::<fn(ObjectHash)>)
607            .expect("pack file format error");
608    }
609
610    #[tokio::test]
611    async fn test_pack_encoder() {
612        let _guard = set_hash_kind_for_test(HashKind::Sha1);
613        async fn encode_once(window_size: usize) -> Vec<u8> {
614            let (tx, mut rx) = mpsc::channel(100);
615            let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
616
617            // make some different objects, or decode will fail
618            let str_vec = vec!["hello, word", "hello, world.", "!", "123141251251"];
619            let encoder = PackEncoder::new(str_vec.len(), window_size, tx);
620            encoder.encode_async(entry_rx).await.unwrap();
621
622            for str in str_vec {
623                let blob = Blob::from_content(str);
624                let entry: Entry = blob.into();
625                entry_tx
626                    .send(MetaAttached {
627                        inner: entry,
628                        meta: EntryMeta::new(),
629                    })
630                    .await
631                    .unwrap();
632            }
633            drop(entry_tx);
634            // assert!(encoder.get_hash().is_some());
635            let mut result = Vec::new();
636            while let Some(chunk) = rx.recv().await {
637                result.extend(chunk);
638            }
639            result
640        }
641
642        // without delta
643        let pack_without_delta = encode_once(0).await;
644        let pack_without_delta_size = pack_without_delta.len();
645        check_format(&pack_without_delta);
646
647        // with delta
648        let pack_with_delta = encode_once(4).await;
649        assert!(pack_with_delta.len() <= pack_without_delta_size);
650        check_format(&pack_with_delta);
651    }
652    #[tokio::test]
653    async fn test_pack_encoder_sha256() {
654        let _guard = set_hash_kind_for_test(HashKind::Sha256);
655
656        async fn encode_once(window_size: usize) -> Vec<u8> {
657            let (tx, mut rx) = mpsc::channel(100);
658            let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
659
660            let str_vec = vec!["hello, word", "hello, world.", "!", "123141251251"];
661            let encoder = PackEncoder::new(str_vec.len(), window_size, tx);
662            encoder.encode_async(entry_rx).await.unwrap();
663
664            for s in str_vec {
665                let blob = Blob::from_content(s);
666                let entry: Entry = blob.into();
667                entry_tx
668                    .send(MetaAttached {
669                        inner: entry,
670                        meta: EntryMeta::new(),
671                    })
672                    .await
673                    .unwrap();
674            }
675            drop(entry_tx);
676
677            let mut result = Vec::new();
678            while let Some(chunk) = rx.recv().await {
679                result.extend(chunk);
680            }
681            result
682        }
683
684        // without delta
685        let pack_without_delta = encode_once(0).await;
686        let pack_without_delta_size = pack_without_delta.len();
687        check_format(&pack_without_delta);
688
689        // with delta
690        let pack_with_delta = encode_once(4).await;
691        assert!(pack_with_delta.len() <= pack_without_delta_size);
692        check_format(&pack_with_delta);
693    }
694
695    async fn get_entries_for_test() -> Arc<Mutex<Vec<Entry>>> {
696        let source = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
697            .join("tests/data/packs/pack-f8bbb573cef7d851957caceb491c073ee8e8de41.pack");
698
699        let mut p = Pack::new(None, None, Some(PathBuf::from("/tmp/.cache_temp")), true);
700
701        let f = std::fs::File::open(&source).unwrap();
702        tracing::info!("pack file size: {}", f.metadata().unwrap().len());
703        let mut reader = std::io::BufReader::new(f);
704        let entries = Arc::new(Mutex::new(Vec::new()));
705        let entries_clone = entries.clone();
706        p.decode(
707            &mut reader,
708            move |entry| {
709                let mut entries = entries_clone.blocking_lock();
710                entries.push(entry.inner);
711            },
712            None::<fn(ObjectHash)>,
713        )
714        .unwrap();
715        assert_eq!(p.number, entries.lock().await.len());
716        tracing::info!("total entries: {}", p.number);
717        drop(p);
718
719        entries
720    }
721    async fn get_entries_for_test_sha256() -> Arc<Mutex<Vec<Entry>>> {
722        let source = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
723            .join("tests/data/packs/pack-78047853c60a1a3bb587f59598bdeb773fefc821f6f60f4f4797644ad43dad3d.pack");
724
725        let mut p = Pack::new(None, None, Some(PathBuf::from("/tmp/.cache_temp")), true);
726
727        let f = std::fs::File::open(&source).unwrap();
728        tracing::info!("pack file size: {}", f.metadata().unwrap().len());
729        let mut reader = std::io::BufReader::new(f);
730        let entries = Arc::new(Mutex::new(Vec::new()));
731        let entries_clone = entries.clone();
732        p.decode(
733            &mut reader,
734            move |entry| {
735                let mut entries = entries_clone.blocking_lock();
736                entries.push(entry.inner);
737            },
738            None::<fn(ObjectHash)>,
739        )
740        .unwrap();
741        assert_eq!(p.number, entries.lock().await.len());
742        tracing::info!("total entries: {}", p.number);
743        drop(p);
744
745        entries
746    }
747
748    #[tokio::test]
749    async fn test_pack_encoder_parallel_large_file() {
750        let _guard = set_hash_kind_for_test(HashKind::Sha1);
751        init_logger();
752
753        let start = Instant::now();
754        let entries = get_entries_for_test().await;
755        let entries_number = entries.lock().await.len();
756
757        let total_original_size: usize = entries
758            .lock()
759            .await
760            .iter()
761            .map(|entry| entry.data.len())
762            .sum();
763
764        // encode entries with parallel
765        let (tx, mut rx) = mpsc::channel(1_000_000);
766        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1_000_000);
767
768        let mut encoder = PackEncoder::new(entries_number, 0, tx);
769        tokio::spawn(async move {
770            time_it!("test parallel encode", {
771                encoder.parallel_encode(entry_rx).await.unwrap();
772            });
773        });
774
775        // spawn a task to send entries
776        tokio::spawn(async move {
777            let entries = entries.lock().await;
778            for entry in entries.iter() {
779                entry_tx
780                    .send(MetaAttached {
781                        inner: entry.clone(),
782                        meta: EntryMeta::new(),
783                    })
784                    .await
785                    .unwrap();
786            }
787            drop(entry_tx);
788            tracing::info!("all entries sent");
789        });
790
791        let mut result = Vec::new();
792        while let Some(chunk) = rx.recv().await {
793            result.extend(chunk);
794        }
795
796        let pack_size = result.len();
797        let compression_rate = if total_original_size > 0 {
798            1.0 - (pack_size as f64 / total_original_size as f64)
799        } else {
800            0.0
801        };
802
803        let duration = start.elapsed();
804        tracing::info!("test executed in: {:.2?}", duration);
805        tracing::info!("new pack file size: {}", result.len());
806        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
807        // check format
808        check_format(&result);
809    }
810    #[tokio::test]
811    async fn test_pack_encoder_parallel_large_file_sha256() {
812        let _guard = set_hash_kind_for_test(HashKind::Sha256);
813        init_logger();
814
815        let start = Instant::now();
816        // use sha256 pack file for testing
817        let entries = get_entries_for_test_sha256().await;
818        let entries_number = entries.lock().await.len();
819
820        let total_original_size: usize = entries
821            .lock()
822            .await
823            .iter()
824            .map(|entry| entry.data.len())
825            .sum();
826
827        let (tx, mut rx) = mpsc::channel(1_000_000);
828        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1_000_000);
829
830        let mut encoder = PackEncoder::new(entries_number, 0, tx);
831        tokio::spawn(async move {
832            time_it!("test parallel encode sha256", {
833                encoder.parallel_encode(entry_rx).await.unwrap();
834            });
835        });
836
837        tokio::spawn(async move {
838            let entries = entries.lock().await;
839            for entry in entries.iter() {
840                entry_tx
841                    .send(MetaAttached {
842                        inner: entry.clone(),
843                        meta: EntryMeta::new(),
844                    })
845                    .await
846                    .unwrap();
847            }
848            drop(entry_tx);
849            tracing::info!("all entries sent");
850        });
851
852        let mut result = Vec::new();
853        while let Some(chunk) = rx.recv().await {
854            result.extend(chunk);
855        }
856
857        let pack_size = result.len();
858        let compression_rate = if total_original_size > 0 {
859            1.0 - (pack_size as f64 / total_original_size as f64)
860        } else {
861            0.0
862        };
863
864        let duration = start.elapsed();
865        tracing::info!("sha256 test executed in: {:.2?}", duration);
866        tracing::info!("new pack file size: {}", result.len());
867        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
868        check_format(&result);
869    }
870
871    #[tokio::test]
872    async fn test_pack_encoder_large_file() {
873        let _guard = set_hash_kind_for_test(HashKind::Sha1);
874        init_logger();
875        let entries = get_entries_for_test().await;
876        let entries_number = entries.lock().await.len();
877
878        let total_original_size: usize = entries
879            .lock()
880            .await
881            .iter()
882            .map(|entry| entry.data.len())
883            .sum();
884
885        let start = Instant::now();
886        // encode entries
887        let (tx, mut rx) = mpsc::channel(100_000);
888        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
889
890        let mut encoder = PackEncoder::new(entries_number, 0, tx);
891        tokio::spawn(async move {
892            time_it!("test encode no parallel", {
893                encoder.encode(entry_rx).await.unwrap();
894            });
895        });
896
897        // spawn a task to send entries
898        tokio::spawn(async move {
899            let entries = entries.lock().await;
900            for entry in entries.iter() {
901                entry_tx
902                    .send(MetaAttached {
903                        inner: entry.clone(),
904                        meta: EntryMeta::new(),
905                    })
906                    .await
907                    .unwrap();
908            }
909            drop(entry_tx);
910            tracing::info!("all entries sent");
911        });
912
913        // // only receive data
914        // while (rx.recv().await).is_some() {
915        //     // do nothing
916        // }
917
918        let mut result = Vec::new();
919        while let Some(chunk) = rx.recv().await {
920            result.extend(chunk);
921        }
922
923        let pack_size = result.len();
924        let compression_rate = if total_original_size > 0 {
925            1.0 - (pack_size as f64 / total_original_size as f64)
926        } else {
927            0.0
928        };
929
930        let duration = start.elapsed();
931        tracing::info!("test executed in: {:.2?}", duration);
932        tracing::info!("new pack file size: {}", pack_size);
933        tracing::info!("original total size: {}", total_original_size);
934        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
935        tracing::info!(
936            "space saved: {} bytes",
937            total_original_size.saturating_sub(pack_size)
938        );
939    }
940    #[tokio::test]
941    async fn test_pack_encoder_large_file_sha256() {
942        let _guard = set_hash_kind_for_test(HashKind::Sha256);
943        init_logger();
944        let entries = get_entries_for_test_sha256().await;
945        let entries_number = entries.lock().await.len();
946
947        let total_original_size: usize = entries
948            .lock()
949            .await
950            .iter()
951            .map(|entry| entry.data.len())
952            .sum();
953
954        let start = Instant::now();
955        // encode entries
956        let (tx, mut rx) = mpsc::channel(100_000);
957        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
958
959        let mut encoder = PackEncoder::new(entries_number, 0, tx);
960        tokio::spawn(async move {
961            time_it!("test encode no parallel sha256", {
962                encoder.encode(entry_rx).await.unwrap();
963            });
964        });
965
966        // spawn a task to send entries
967        tokio::spawn(async move {
968            let entries = entries.lock().await;
969            for entry in entries.iter() {
970                entry_tx
971                    .send(MetaAttached {
972                        inner: entry.clone(),
973                        meta: EntryMeta::new(),
974                    })
975                    .await
976                    .unwrap();
977            }
978            drop(entry_tx);
979            tracing::info!("all entries sent");
980        });
981
982        // // only receive data
983        // while (rx.recv().await).is_some() {
984        //     // do nothing
985        // }
986
987        let mut result = Vec::new();
988        while let Some(chunk) = rx.recv().await {
989            result.extend(chunk);
990        }
991
992        let pack_size = result.len();
993        let compression_rate = if total_original_size > 0 {
994            1.0 - (pack_size as f64 / total_original_size as f64)
995        } else {
996            0.0
997        };
998
999        let duration = start.elapsed();
1000        tracing::info!("test executed in: {:.2?}", duration);
1001        tracing::info!("new pack file size: {}", pack_size);
1002        tracing::info!("original total size: {}", total_original_size);
1003        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1004        tracing::info!(
1005            "space saved: {} bytes",
1006            total_original_size.saturating_sub(pack_size)
1007        );
1008    }
1009
1010    #[tokio::test]
1011    async fn test_pack_encoder_with_zstdelta() {
1012        let _guard = set_hash_kind_for_test(HashKind::Sha1);
1013        init_logger();
1014        let entries = get_entries_for_test().await;
1015        let entries_number = entries.lock().await.len();
1016
1017        let total_original_size: usize = entries
1018            .lock()
1019            .await
1020            .iter()
1021            .map(|entry| entry.data.len())
1022            .sum();
1023
1024        let start = Instant::now();
1025        let (tx, mut rx) = mpsc::channel(100_000);
1026        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1027
1028        let encoder = PackEncoder::new(entries_number, 10, tx);
1029        encoder.encode_async_with_zstdelta(entry_rx).await.unwrap();
1030
1031        // spawn a task to send entries
1032        tokio::spawn(async move {
1033            let entries = entries.lock().await;
1034            for entry in entries.iter() {
1035                entry_tx
1036                    .send(MetaAttached {
1037                        inner: entry.clone(),
1038                        meta: EntryMeta::new(),
1039                    })
1040                    .await
1041                    .unwrap();
1042            }
1043            drop(entry_tx);
1044            tracing::info!("all entries sent");
1045        });
1046
1047        let mut result = Vec::new();
1048        while let Some(chunk) = rx.recv().await {
1049            result.extend(chunk);
1050        }
1051
1052        let pack_size = result.len();
1053        let compression_rate = if total_original_size > 0 {
1054            1.0 - (pack_size as f64 / total_original_size as f64)
1055        } else {
1056            0.0
1057        };
1058
1059        let duration = start.elapsed();
1060        tracing::info!("test executed in: {:.2?}", duration);
1061        tracing::info!("new pack file size: {}", pack_size);
1062        tracing::info!("original total size: {}", total_original_size);
1063        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1064        tracing::info!(
1065            "space saved: {} bytes",
1066            total_original_size.saturating_sub(pack_size)
1067        );
1068
1069        // check format
1070        check_format(&result);
1071    }
1072    #[tokio::test]
1073    async fn test_pack_encoder_with_zstdelta_sha256() {
1074        let _guard = set_hash_kind_for_test(HashKind::Sha256);
1075        init_logger();
1076        let entries = get_entries_for_test_sha256().await;
1077        let entries_number = entries.lock().await.len();
1078
1079        let total_original_size: usize = entries
1080            .lock()
1081            .await
1082            .iter()
1083            .map(|entry| entry.data.len())
1084            .sum();
1085
1086        let start = Instant::now();
1087        let (tx, mut rx) = mpsc::channel(100_000);
1088        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1089
1090        let encoder = PackEncoder::new(entries_number, 10, tx);
1091        encoder.encode_async_with_zstdelta(entry_rx).await.unwrap();
1092
1093        // spawn a task to send entries
1094        tokio::spawn(async move {
1095            let entries = entries.lock().await;
1096            for entry in entries.iter() {
1097                entry_tx
1098                    .send(MetaAttached {
1099                        inner: entry.clone(),
1100                        meta: EntryMeta::new(),
1101                    })
1102                    .await
1103                    .unwrap();
1104            }
1105            drop(entry_tx);
1106            tracing::info!("all entries sent");
1107        });
1108
1109        let mut result = Vec::new();
1110        while let Some(chunk) = rx.recv().await {
1111            result.extend(chunk);
1112        }
1113
1114        let pack_size = result.len();
1115        let compression_rate = if total_original_size > 0 {
1116            1.0 - (pack_size as f64 / total_original_size as f64)
1117        } else {
1118            0.0
1119        };
1120
1121        let duration = start.elapsed();
1122        tracing::info!("test executed in: {:.2?}", duration);
1123        tracing::info!("new pack file size: {}", pack_size);
1124        tracing::info!("original total size: {}", total_original_size);
1125        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1126        tracing::info!(
1127            "space saved: {} bytes",
1128            total_original_size.saturating_sub(pack_size)
1129        );
1130
1131        // check format
1132        check_format(&result);
1133    }
1134
1135    #[test]
1136    fn test_encode_offset() {
1137        // let value = 11013;
1138        let value = 16389;
1139
1140        let data = encode_offset(value);
1141        println!("{data:?}");
1142        let mut reader = Cursor::new(data);
1143        let (result, _) = read_offset_encoding(&mut reader).unwrap();
1144        println!("result: {result}");
1145        assert_eq!(result, value as u64);
1146    }
1147
1148    #[tokio::test]
1149    async fn test_pack_encoder_large_file_with_delta() {
1150        let _guard = set_hash_kind_for_test(HashKind::Sha1);
1151        init_logger();
1152        let entries = get_entries_for_test().await;
1153        let entries_number = entries.lock().await.len();
1154
1155        let total_original_size: usize = entries
1156            .lock()
1157            .await
1158            .iter()
1159            .map(|entry| entry.data.len())
1160            .sum();
1161
1162        let (tx, mut rx) = mpsc::channel(100_000);
1163        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1164
1165        let encoder = PackEncoder::new(entries_number, 10, tx);
1166
1167        let start = Instant::now(); // 开始时间
1168        encoder.encode_async(entry_rx).await.unwrap();
1169
1170        // spawn a task to send entries
1171        tokio::spawn(async move {
1172            let entries = entries.lock().await;
1173            for entry in entries.iter() {
1174                entry_tx
1175                    .send(MetaAttached {
1176                        inner: entry.clone(),
1177                        meta: EntryMeta::new(),
1178                    })
1179                    .await
1180                    .unwrap();
1181            }
1182            drop(entry_tx);
1183            tracing::info!("all entries sent");
1184        });
1185
1186        let mut result = Vec::new();
1187        while let Some(chunk) = rx.recv().await {
1188            result.extend(chunk);
1189        }
1190
1191        let pack_size = result.len();
1192        let compression_rate = if total_original_size > 0 {
1193            1.0 - (pack_size as f64 / total_original_size as f64)
1194        } else {
1195            0.0
1196        };
1197
1198        let duration = start.elapsed();
1199        tracing::info!("test executed in: {:.2?}", duration);
1200        tracing::info!("new pack file size: {}", pack_size);
1201        tracing::info!("original total size: {}", total_original_size);
1202        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1203        tracing::info!(
1204            "space saved: {} bytes",
1205            total_original_size.saturating_sub(pack_size)
1206        );
1207
1208        // check format
1209        check_format(&result);
1210    }
1211    #[tokio::test]
1212    async fn test_pack_encoder_large_file_with_delta_sha256() {
1213        let _guard = set_hash_kind_for_test(HashKind::Sha256);
1214        init_logger();
1215        let entries = get_entries_for_test_sha256().await;
1216        let entries_number = entries.lock().await.len();
1217
1218        let total_original_size: usize = entries
1219            .lock()
1220            .await
1221            .iter()
1222            .map(|entry| entry.data.len())
1223            .sum();
1224
1225        let (tx, mut rx) = mpsc::channel(100_000);
1226        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1227
1228        let encoder = PackEncoder::new(entries_number, 10, tx);
1229
1230        let start = Instant::now(); // 开始时间
1231        encoder.encode_async(entry_rx).await.unwrap();
1232
1233        // spawn a task to send entries
1234        tokio::spawn(async move {
1235            let entries = entries.lock().await;
1236            for entry in entries.iter() {
1237                entry_tx
1238                    .send(MetaAttached {
1239                        inner: entry.clone(),
1240                        meta: EntryMeta::new(),
1241                    })
1242                    .await
1243                    .unwrap();
1244            }
1245            drop(entry_tx);
1246            tracing::info!("all entries sent");
1247        });
1248
1249        let mut result = Vec::new();
1250        while let Some(chunk) = rx.recv().await {
1251            result.extend(chunk);
1252        }
1253
1254        let pack_size = result.len();
1255        let compression_rate = if total_original_size > 0 {
1256            1.0 - (pack_size as f64 / total_original_size as f64)
1257        } else {
1258            0.0
1259        };
1260
1261        let duration = start.elapsed();
1262        tracing::info!("test executed in: {:.2?}", duration);
1263        tracing::info!("new pack file size: {}", pack_size);
1264        tracing::info!("original total size: {}", total_original_size);
1265        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1266        tracing::info!(
1267            "space saved: {} bytes",
1268            total_original_size.saturating_sub(pack_size)
1269        );
1270
1271        // check format
1272        check_format(&result);
1273    }
1274}