git_internal/internal/pack/
encode.rs

1use std::cmp::Ordering;
2use std::collections::VecDeque;
3use std::io::Write;
4
5use crate::delta;
6use crate::internal::metadata::{EntryMeta, MetaAttached};
7use crate::internal::object::types::ObjectType;
8use crate::time_it;
9use crate::zstdelta;
10use crate::{errors::GitError, hash::SHA1, internal::pack::entry::Entry};
11use ahash::AHasher;
12use flate2::write::ZlibEncoder;
13use natord::compare;
14use rayon::prelude::*;
15use sha1::{Digest, Sha1};
16use std::hash::{Hash, Hasher};
17use std::path::Path;
18use tokio::sync::mpsc;
19use tokio::task::JoinHandle;
20
21const MAX_CHAIN_LEN: usize = 50;
22const MIN_DELTA_RATE: f64 = 0.5; // minimum delta rate
23//const MAX_ZSTDELTA_CHAIN_LEN: usize = 50;
24
25/// A encoder for generating pack files with delta objects.
26pub struct PackEncoder {
27    object_number: usize,
28    process_index: usize,
29    window_size: usize,
30    // window: VecDeque<(Entry, usize)>, // entry and offset
31    sender: Option<mpsc::Sender<Vec<u8>>>,
32    inner_offset: usize, // offset of current entry
33    inner_hash: Sha1,    // Not SHA1 because need update trait
34    final_hash: Option<SHA1>,
35    start_encoding: bool,
36}
37
38/// Encode header of pack file (12 byte)<br>
39/// Content: 'PACK', Version(2), number of objects
40fn encode_header(object_number: usize) -> Vec<u8> {
41    let mut result: Vec<u8> = vec![
42        b'P', b'A', b'C', b'K', // The logotype of the Pack File
43        0, 0, 0, 2, // generates version 2 only.
44    ];
45    assert_ne!(object_number, 0); // guarantee self.number_of_objects!=0
46    assert!(object_number < (1 << 32));
47    //TODO: GitError:numbers of objects should < 4G ,
48    result.append((object_number as u32).to_be_bytes().to_vec().as_mut()); // to 4 bytes (network byte order aka. big-endian)
49    result
50}
51
52/// Encode offset of delta object
53fn encode_offset(mut value: usize) -> Vec<u8> {
54    assert_ne!(value, 0, "offset can't be zero");
55    let mut bytes = Vec::new();
56
57    bytes.push((value & 0x7F) as u8);
58    value >>= 7;
59    while value != 0 {
60        value -= 1;
61        let byte = (value & 0x7F) as u8 | 0x80; // set first bit one
62        value >>= 7;
63        bytes.push(byte);
64    }
65    bytes.reverse();
66    bytes
67}
68
69/// Encode one object, and update the hash
70/// @offset: offset of this object if it's a delta object. For other object, it's None
71fn encode_one_object(entry: &Entry, offset: Option<usize>) -> Result<Vec<u8>, GitError> {
72    // try encode as delta
73    let obj_data = &entry.data;
74    let obj_data_len = obj_data.len();
75    let obj_type_number = entry.obj_type.to_u8();
76
77    let mut encoded_data = Vec::new();
78
79    // **header** encoding
80    let mut header_data = vec![(0x80 | (obj_type_number << 4)) + (obj_data_len & 0x0f) as u8];
81    let mut size = obj_data_len >> 4; // 4 bit has been used in first byte
82    if size > 0 {
83        while size > 0 {
84            if size >> 7 > 0 {
85                header_data.push((0x80 | size) as u8);
86                size >>= 7;
87            } else {
88                header_data.push(size as u8);
89                break;
90            }
91        }
92    } else {
93        header_data.push(0);
94    }
95    encoded_data.extend(header_data);
96
97    // **offset** encoding
98    if entry.obj_type == ObjectType::OffsetDelta || entry.obj_type == ObjectType::OffsetZstdelta {
99        let offset_data = encode_offset(offset.unwrap());
100        encoded_data.extend(offset_data);
101    } else if entry.obj_type == ObjectType::HashDelta {
102        unreachable!("unsupported type")
103    }
104
105    // **data** encoding, need zlib compress
106    let mut inflate = ZlibEncoder::new(Vec::new(), flate2::Compression::default());
107    inflate
108        .write_all(obj_data)
109        .expect("zlib compress should never failed");
110    inflate.flush().expect("zlib flush should never failed");
111    let compressed_data = inflate.finish().expect("zlib compress should never failed");
112    // self.write_all_and_update(&compressed_data).await;
113    encoded_data.extend(compressed_data);
114    Ok(encoded_data)
115}
116
117fn magic_sort(a: &MetaAttached<Entry, EntryMeta>, b: &MetaAttached<Entry, EntryMeta>) -> Ordering {
118    let path_a = a.meta.file_path.as_ref();
119    let path_b = b.meta.file_path.as_ref();
120
121    // 1. Handle path existence: entries with paths sort first
122    match (path_a, path_b) {
123        (Some(pa), Some(pb)) => {
124            let pa = Path::new(pa);
125            let pb = Path::new(pb);
126
127            // 1. Compare parent directory paths
128            let dir_ord = pa.parent().cmp(&pb.parent());
129            if dir_ord != Ordering::Equal {
130                return dir_ord;
131            }
132
133            // 2. Compare filenames (natural sort)
134            let name_a = pa.file_name().unwrap_or_default().to_string_lossy();
135            let name_b = pb.file_name().unwrap_or_default().to_string_lossy();
136            let name_ord = compare(&name_a, &name_b);
137            if name_ord != Ordering::Equal {
138                return name_ord;
139            }
140        }
141        (Some(_), None) => return Ordering::Less, // entries with paths sort first
142        (None, Some(_)) => return Ordering::Greater, // entries without paths sort last
143        (None, None) => {}
144    }
145
146    // let ord = b.obj_type.to_u8().cmp(&a.obj_type.to_u8());
147    // if ord != Ordering::Equal {
148    //     return ord;
149    // }
150
151    // the hash should be file hash not content hash
152    // todo the feature need larger refactor
153    // let ord = b.hash.cmp(&a.hash);
154    // if ord != Ordering::Equal { return ord; }
155
156    let ord = b.inner.data.len().cmp(&a.inner.data.len());
157    if ord != Ordering::Equal {
158        return ord;
159    }
160
161    // fallback pointer order (newest first)
162    (a as *const MetaAttached<Entry, EntryMeta>).cmp(&(b as *const MetaAttached<Entry, EntryMeta>))
163}
164
165fn calc_hash(data: &[u8]) -> u64 {
166    let mut hasher = AHasher::default();
167    data.hash(&mut hasher);
168    hasher.finish()
169}
170
171fn cheap_similar(a: &[u8], b: &[u8]) -> bool {
172    let k = a.len().min(b.len()).min(128);
173    if k == 0 {
174        return false;
175    }
176    calc_hash(&a[..k]) == calc_hash(&b[..k])
177}
178
179impl PackEncoder {
180    pub fn new(object_number: usize, window_size: usize, sender: mpsc::Sender<Vec<u8>>) -> Self {
181        PackEncoder {
182            object_number,
183            window_size,
184            process_index: 0,
185            // window: VecDeque::with_capacity(window_size),
186            sender: Some(sender),
187            inner_offset: 12, // 12 bytes header
188            inner_hash: Sha1::new(),
189            final_hash: None,
190            start_encoding: false,
191        }
192    }
193
194    pub fn drop_sender(&mut self) {
195        self.sender.take(); // Take the sender out, dropping it
196    }
197
198    pub async fn send_data(&mut self, data: Vec<u8>) {
199        if let Some(sender) = &self.sender {
200            sender.send(data).await.unwrap();
201        }
202    }
203
204    /// Get the hash of the pack file. if the pack file is not finished, return None
205    pub fn get_hash(&self) -> Option<SHA1> {
206        self.final_hash
207    }
208
209    /// Encodes entries into a pack file with delta objects and outputs them through the specified writer.
210    /// # Arguments
211    /// - `rx` - A receiver channel (`mpsc::Receiver<Entry>`) from which entries to be encoded are received.
212    /// # Returns
213    /// Returns `Ok(())` if encoding is successful, or a `GitError` in case of failure.
214    /// - Returns a `GitError` if there is a failure during the encoding process.
215    /// - Returns `PackEncodeError` if an encoding operation is already in progress.
216    pub async fn encode(
217        &mut self,
218        entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
219    ) -> Result<(), GitError> {
220        self.inner_encode(entry_rx, false).await
221    }
222
223    pub async fn encode_with_zstdelta(
224        &mut self,
225        entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
226    ) -> Result<(), GitError> {
227        self.inner_encode(entry_rx, true).await
228    }
229
230    /// Delta selection heuristics are based on:
231    ///   https://github.com/git/git/blob/master/Documentation/technical/pack-heuristics.adoc
232    async fn inner_encode(
233        &mut self,
234        mut entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
235        enable_zstdelta: bool,
236    ) -> Result<(), GitError> {
237        let head = encode_header(self.object_number);
238        self.send_data(head.clone()).await;
239        self.inner_hash.update(&head);
240
241        // ensure only one decode can only invoke once
242        if self.start_encoding {
243            return Err(GitError::PackEncodeError(
244                "encoding operation is already in progress".to_string(),
245            ));
246        }
247
248        let mut commits: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
249        let mut trees: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
250        let mut blobs: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
251        let mut tags: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
252        while let Some(entry) = entry_rx.recv().await {
253            match entry.inner.obj_type {
254                ObjectType::Commit => {
255                    commits.push(entry);
256                }
257                ObjectType::Tree => {
258                    trees.push(entry);
259                }
260                ObjectType::Blob => {
261                    blobs.push(entry);
262                }
263                ObjectType::Tag => {
264                    tags.push(entry);
265                }
266                _ => {}
267            }
268        }
269
270        commits.sort_by(magic_sort);
271        trees.sort_by(magic_sort);
272        blobs.sort_by(magic_sort);
273        tags.sort_by(magic_sort);
274        tracing::info!(
275            "numbers :  commits: {:?} trees: {:?} blobs:{:?} tag :{:?}",
276            commits.len(),
277            trees.len(),
278            blobs.len(),
279            tags.len()
280        );
281
282        // parallel encoding vec with different object_type
283        let (commit_results, tree_results, blob_results, tag_results) = tokio::try_join!(
284            tokio::task::spawn_blocking(move || {
285                Self::try_as_offset_delta(
286                    commits
287                        .into_iter()
288                        .map(|entry_with_meta| entry_with_meta.inner)
289                        .collect(),
290                    10,
291                    enable_zstdelta,
292                )
293            }),
294            tokio::task::spawn_blocking(move || {
295                Self::try_as_offset_delta(
296                    trees
297                        .into_iter()
298                        .map(|entry_with_meta| entry_with_meta.inner)
299                        .collect(),
300                    10,
301                    enable_zstdelta,
302                )
303            }),
304            tokio::task::spawn_blocking(move || {
305                Self::try_as_offset_delta(
306                    blobs
307                        .into_iter()
308                        .map(|entry_with_meta| entry_with_meta.inner)
309                        .collect(),
310                    10,
311                    enable_zstdelta,
312                )
313            }),
314            tokio::task::spawn_blocking(move || {
315                Self::try_as_offset_delta(
316                    tags.into_iter()
317                        .map(|entry_with_meta| entry_with_meta.inner)
318                        .collect(),
319                    10,
320                    enable_zstdelta,
321                )
322            }),
323        )
324        .map_err(|e| GitError::PackEncodeError(format!("Task join error: {e}")))?;
325
326        let all_encoded_data = [
327            commit_results
328                .map_err(|e| GitError::PackEncodeError(format!("Commit encoding error: {e}")))?,
329            tree_results
330                .map_err(|e| GitError::PackEncodeError(format!("Tree encoding error: {e}")))?,
331            blob_results
332                .map_err(|e| GitError::PackEncodeError(format!("Blob encoding error: {e}")))?,
333            tag_results
334                .map_err(|e| GitError::PackEncodeError(format!("Tag encoding error: {e}")))?,
335        ]
336        .concat();
337
338        for data in all_encoded_data {
339            self.write_all_and_update(&data).await;
340        }
341
342        // Hash signature
343        let hash_result = self.inner_hash.clone().finalize();
344        self.final_hash = Some(SHA1::from_bytes(&hash_result));
345        self.send_data(hash_result.to_vec()).await;
346
347        self.drop_sender();
348        Ok(())
349    }
350
351    /// Try to encode as delta using objects in window
352    /// delta & zstdelta have been gathered here
353    /// Refs: https://sapling-scm.com/docs/dev/internals/zstdelta/
354    /// the sliding window was moved here
355    /// # Returns
356    /// - Return (Vec<Vec<u8>) if success make delta
357    /// - Return (None) if didn't delta,
358    fn try_as_offset_delta(
359        mut bucket: Vec<Entry>,
360        window_size: usize,
361        enable_zstdelta: bool,
362    ) -> Result<Vec<Vec<u8>>, GitError> {
363        let mut current_offset = 0usize;
364        let mut window: VecDeque<(Entry, usize)> = VecDeque::with_capacity(window_size);
365        let mut res: Vec<Vec<u8>> = Vec::new();
366
367        for entry in bucket.iter_mut() {
368            //let entry_for_window = entry.clone();
369            // 每次循环重置最佳基对象选择
370            let mut best_base: Option<&(Entry, usize)> = None;
371            let mut best_rate: f64 = 0.0;
372            let tie_epsilon: f64 = 0.15;
373
374            let candidates: Vec<_> = window
375                .par_iter()
376                .with_min_len(3)
377                .filter_map(|try_base| {
378                    if try_base.0.obj_type != entry.obj_type {
379                        return None;
380                    }
381
382                    if try_base.0.chain_len >= MAX_CHAIN_LEN {
383                        return None;
384                    }
385
386                    if try_base.0.hash == entry.hash {
387                        return None;
388                    }
389
390                    let sym_ratio = (try_base.0.data.len().min(entry.data.len()) as f64)
391                        / (try_base.0.data.len().max(entry.data.len()) as f64);
392                    if sym_ratio < 0.5 {
393                        return None;
394                    }
395
396                    if !cheap_similar(&try_base.0.data, &entry.data) {
397                        return None;
398                    }
399
400                    let rate = if (try_base.0.data.len() + entry.data.len()) / 2 > 64 {
401                        delta::heuristic_encode_rate_parallel(&try_base.0.data, &entry.data)
402                    } else {
403                        delta::encode_rate(&try_base.0.data, &entry.data)
404                        // let try_delta_obj = zstdelta::diff(&try_base.0.data, &entry.data).unwrap();
405                        // 1.0 - try_delta_obj.len() as f64 / entry.data.len() as f64
406                    };
407
408                    if rate > MIN_DELTA_RATE {
409                        Some((rate, try_base))
410                    } else {
411                        None
412                    }
413                })
414                .collect();
415
416            for (rate, try_base) in candidates {
417                match best_base {
418                    None => {
419                        best_rate = rate;
420                        //best_base_offset = current_offset - try_base.1;
421                        best_base = Some(try_base);
422                    }
423                    Some(best_base_ref) => {
424                        let is_better = if rate > best_rate + tie_epsilon {
425                            true
426                        } else if (rate - best_rate).abs() <= tie_epsilon {
427                            try_base.0.chain_len > best_base_ref.0.chain_len
428                        } else {
429                            false
430                        };
431
432                        if is_better {
433                            best_rate = rate;
434                            best_base = Some(try_base);
435                        }
436                    }
437                }
438            }
439
440            let mut entry_for_window = entry.clone();
441
442            let offset = best_base.map(|best_base| {
443                let delta = if enable_zstdelta {
444                    entry.obj_type = ObjectType::OffsetZstdelta;
445                    zstdelta::diff(&best_base.0.data, &entry.data)
446                        .map_err(|e| {
447                            GitError::DeltaObjectError(format!("zstdelta diff failed: {e}"))
448                        })
449                        .unwrap()
450                } else {
451                    entry.obj_type = ObjectType::OffsetDelta;
452                    delta::encode(&best_base.0.data, &entry.data)
453                };
454                //entry.obj_type = ObjectType::OffsetDelta;
455                entry.data = delta;
456                entry.chain_len = best_base.0.chain_len + 1;
457                current_offset - best_base.1
458            });
459
460            entry_for_window.chain_len = entry.chain_len;
461            let obj_data = encode_one_object(entry, offset)?;
462            window.push_back((entry_for_window, current_offset));
463            if window.len() > window_size {
464                window.pop_front();
465            }
466            current_offset += obj_data.len();
467            res.push(obj_data);
468        }
469        Ok(res)
470    }
471
472    /// Parallel encode with rayon, only works when window_size == 0 (no delta)
473    pub async fn parallel_encode(
474        &mut self,
475        mut entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
476    ) -> Result<(), GitError> {
477        if self.window_size != 0 {
478            return Err(GitError::PackEncodeError(
479                "parallel encode only works when window_size == 0".to_string(),
480            ));
481        }
482
483        let head = encode_header(self.object_number);
484        self.send_data(head.clone()).await;
485        self.inner_hash.update(&head);
486
487        // ensure only one decode can only invoke once
488        if self.start_encoding {
489            return Err(GitError::PackEncodeError(
490                "encoding operation is already in progress".to_string(),
491            ));
492        }
493
494        let batch_size = usize::max(1000, entry_rx.max_capacity() / 10); // A temporary value, not optimized
495        tracing::info!("encode with batch size: {}", batch_size);
496        loop {
497            let mut batch_entries = Vec::with_capacity(batch_size);
498            time_it!("parallel encode: receive batch", {
499                for _ in 0..batch_size {
500                    match entry_rx.recv().await {
501                        Some(entry) => {
502                            batch_entries.push(entry.inner);
503                            self.process_index += 1;
504                        }
505                        None => break,
506                    }
507                }
508            });
509
510            if batch_entries.is_empty() {
511                break;
512            }
513
514            // use `collect` will return result in order, refs: https://github.com/rayon-rs/rayon/issues/551#issuecomment-371657900
515            let batch_result: Vec<Vec<u8>> = time_it!("parallel encode: encode batch", {
516                batch_entries
517                    .par_iter()
518                    .map(|entry| encode_one_object(entry, None).unwrap())
519                    .collect()
520            });
521
522            time_it!("parallel encode: write batch", {
523                for obj_data in batch_result {
524                    self.write_all_and_update(&obj_data).await;
525                }
526            });
527        }
528
529        if self.process_index != self.object_number {
530            panic!(
531                "not all objects are encoded, process:{}, total:{}",
532                self.process_index, self.object_number
533            );
534        }
535
536        // hash signature
537        let hash_result = self.inner_hash.clone().finalize();
538        self.final_hash = Some(SHA1::from_bytes(&hash_result));
539        self.send_data(hash_result.to_vec()).await;
540        self.drop_sender();
541        Ok(())
542    }
543
544    /// Write data to writer and update hash & offset
545    async fn write_all_and_update(&mut self, data: &[u8]) {
546        self.inner_hash.update(data);
547        self.inner_offset += data.len();
548        self.send_data(data.to_vec()).await;
549    }
550
551    /// async version of encode, result data will be returned by JoinHandle.
552    /// It will consume PackEncoder, so you can't use it after calling this function.
553    /// when window_size = 0, it executes parallel_encode which retains stream transmission
554    /// when window_size = 0,it executes encode which uses magic sort and delta.
555    /// It seems that all other modules rely on this api
556    pub async fn encode_async(
557        mut self,
558        rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
559    ) -> Result<JoinHandle<()>, GitError> {
560        Ok(tokio::spawn(async move {
561            if self.window_size == 0 {
562                self.parallel_encode(rx).await.unwrap()
563            } else {
564                self.encode(rx).await.unwrap()
565            }
566        }))
567    }
568
569    pub async fn encode_async_with_zstdelta(
570        mut self,
571        rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
572    ) -> Result<JoinHandle<()>, GitError> {
573        Ok(tokio::spawn(async move {
574            // Do not use parallel encode with zstdelta because it make no sense.
575            self.encode_with_zstdelta(rx).await.unwrap()
576        }))
577    }
578}
579
580#[cfg(test)]
581mod tests {
582    use std::env;
583    use std::sync::Arc;
584    use std::time::Instant;
585    use std::{io::Cursor, path::PathBuf};
586    use tokio::sync::Mutex;
587
588    use crate::internal::object::blob::Blob;
589    use crate::internal::pack::utils::read_offset_encoding;
590    use crate::internal::pack::{Pack, tests::init_logger};
591    use crate::time_it;
592
593    use super::*;
594
595    fn check_format(data: &Vec<u8>) {
596        let mut p = Pack::new(
597            None,
598            Some(1024 * 1024 * 1024 * 6), // 6GB
599            Some(PathBuf::from("/tmp/.cache_temp")),
600            true,
601        );
602        let mut reader = Cursor::new(data);
603        tracing::debug!("start check format");
604        p.decode(&mut reader, |_| {}, None::<fn(SHA1)>)
605            .expect("pack file format error");
606    }
607
608    #[tokio::test]
609    async fn test_pack_encoder() {
610        async fn encode_once(window_size: usize) -> Vec<u8> {
611            let (tx, mut rx) = mpsc::channel(100);
612            let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
613
614            // make some different objects, or decode will fail
615            let str_vec = vec!["hello, word", "hello, world.", "!", "123141251251"];
616            let encoder = PackEncoder::new(str_vec.len(), window_size, tx);
617            encoder.encode_async(entry_rx).await.unwrap();
618
619            for str in str_vec {
620                let blob = Blob::from_content(str);
621                let entry: Entry = blob.into();
622                entry_tx
623                    .send(MetaAttached {
624                        inner: entry,
625                        meta: EntryMeta::new(),
626                    })
627                    .await
628                    .unwrap();
629            }
630            drop(entry_tx);
631            // assert!(encoder.get_hash().is_some());
632            let mut result = Vec::new();
633            while let Some(chunk) = rx.recv().await {
634                result.extend(chunk);
635            }
636            result
637        }
638
639        // without delta
640        let pack_without_delta = encode_once(0).await;
641        let pack_without_delta_size = pack_without_delta.len();
642        check_format(&pack_without_delta);
643
644        // with delta
645        let pack_with_delta = encode_once(4).await;
646        assert!(pack_with_delta.len() <= pack_without_delta_size);
647        check_format(&pack_with_delta);
648    }
649
650    async fn get_entries_for_test() -> Arc<Mutex<Vec<Entry>>> {
651        let source = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
652            .join("tests/data/packs/pack-f8bbb573cef7d851957caceb491c073ee8e8de41.pack");
653
654        let mut p = Pack::new(None, None, Some(PathBuf::from("/tmp/.cache_temp")), true);
655
656        let f = std::fs::File::open(&source).unwrap();
657        tracing::info!("pack file size: {}", f.metadata().unwrap().len());
658        let mut reader = std::io::BufReader::new(f);
659        let entries = Arc::new(Mutex::new(Vec::new()));
660        let entries_clone = entries.clone();
661        p.decode(
662            &mut reader,
663            move |entry| {
664                let mut entries = entries_clone.blocking_lock();
665                entries.push(entry.inner);
666            },
667            None::<fn(SHA1)>,
668        )
669        .unwrap();
670        assert_eq!(p.number, entries.lock().await.len());
671        tracing::info!("total entries: {}", p.number);
672        drop(p);
673
674        entries
675    }
676
677    #[tokio::test]
678    async fn test_pack_encoder_parallel_large_file() {
679        init_logger();
680
681        let start = Instant::now();
682        let entries = get_entries_for_test().await;
683        let entries_number = entries.lock().await.len();
684
685        let total_original_size: usize = entries
686            .lock()
687            .await
688            .iter()
689            .map(|entry| entry.data.len())
690            .sum();
691
692        // encode entries with parallel
693        let (tx, mut rx) = mpsc::channel(1_000_000);
694        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1_000_000);
695
696        let mut encoder = PackEncoder::new(entries_number, 0, tx);
697        tokio::spawn(async move {
698            time_it!("test parallel encode", {
699                encoder.parallel_encode(entry_rx).await.unwrap();
700            });
701        });
702
703        // spawn a task to send entries
704        tokio::spawn(async move {
705            let entries = entries.lock().await;
706            for entry in entries.iter() {
707                entry_tx
708                    .send(MetaAttached {
709                        inner: entry.clone(),
710                        meta: EntryMeta::new(),
711                    })
712                    .await
713                    .unwrap();
714            }
715            drop(entry_tx);
716            tracing::info!("all entries sent");
717        });
718
719        let mut result = Vec::new();
720        while let Some(chunk) = rx.recv().await {
721            result.extend(chunk);
722        }
723
724        let pack_size = result.len();
725        let compression_rate = if total_original_size > 0 {
726            1.0 - (pack_size as f64 / total_original_size as f64)
727        } else {
728            0.0
729        };
730
731        let duration = start.elapsed();
732        tracing::info!("test executed in: {:.2?}", duration);
733        tracing::info!("new pack file size: {}", result.len());
734        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
735        // check format
736        check_format(&result);
737    }
738
739    #[tokio::test]
740    async fn test_pack_encoder_large_file() {
741        init_logger();
742        let entries = get_entries_for_test().await;
743        let entries_number = entries.lock().await.len();
744
745        let total_original_size: usize = entries
746            .lock()
747            .await
748            .iter()
749            .map(|entry| entry.data.len())
750            .sum();
751
752        let start = Instant::now();
753        // encode entries
754        let (tx, mut rx) = mpsc::channel(100_000);
755        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
756
757        let mut encoder = PackEncoder::new(entries_number, 0, tx);
758        tokio::spawn(async move {
759            time_it!("test encode no parallel", {
760                encoder.encode(entry_rx).await.unwrap();
761            });
762        });
763
764        // spawn a task to send entries
765        tokio::spawn(async move {
766            let entries = entries.lock().await;
767            for entry in entries.iter() {
768                entry_tx
769                    .send(MetaAttached {
770                        inner: entry.clone(),
771                        meta: EntryMeta::new(),
772                    })
773                    .await
774                    .unwrap();
775            }
776            drop(entry_tx);
777            tracing::info!("all entries sent");
778        });
779
780        // // only receive data
781        // while (rx.recv().await).is_some() {
782        //     // do nothing
783        // }
784
785        let mut result = Vec::new();
786        while let Some(chunk) = rx.recv().await {
787            result.extend(chunk);
788        }
789
790        let pack_size = result.len();
791        let compression_rate = if total_original_size > 0 {
792            1.0 - (pack_size as f64 / total_original_size as f64)
793        } else {
794            0.0
795        };
796
797        let duration = start.elapsed();
798        tracing::info!("test executed in: {:.2?}", duration);
799        tracing::info!("new pack file size: {}", pack_size);
800        tracing::info!("original total size: {}", total_original_size);
801        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
802        tracing::info!(
803            "space saved: {} bytes",
804            total_original_size.saturating_sub(pack_size)
805        );
806    }
807
808    #[tokio::test]
809    async fn test_pack_encoder_with_zstdelta() {
810        init_logger();
811        let entries = get_entries_for_test().await;
812        let entries_number = entries.lock().await.len();
813
814        let total_original_size: usize = entries
815            .lock()
816            .await
817            .iter()
818            .map(|entry| entry.data.len())
819            .sum();
820
821        let start = Instant::now();
822        let (tx, mut rx) = mpsc::channel(100_000);
823        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
824
825        let encoder = PackEncoder::new(entries_number, 10, tx);
826        encoder.encode_async_with_zstdelta(entry_rx).await.unwrap();
827
828        // spawn a task to send entries
829        tokio::spawn(async move {
830            let entries = entries.lock().await;
831            for entry in entries.iter() {
832                entry_tx
833                    .send(MetaAttached {
834                        inner: entry.clone(),
835                        meta: EntryMeta::new(),
836                    })
837                    .await
838                    .unwrap();
839            }
840            drop(entry_tx);
841            tracing::info!("all entries sent");
842        });
843
844        let mut result = Vec::new();
845        while let Some(chunk) = rx.recv().await {
846            result.extend(chunk);
847        }
848
849        let pack_size = result.len();
850        let compression_rate = if total_original_size > 0 {
851            1.0 - (pack_size as f64 / total_original_size as f64)
852        } else {
853            0.0
854        };
855
856        let duration = start.elapsed();
857        tracing::info!("test executed in: {:.2?}", duration);
858        tracing::info!("new pack file size: {}", pack_size);
859        tracing::info!("original total size: {}", total_original_size);
860        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
861        tracing::info!(
862            "space saved: {} bytes",
863            total_original_size.saturating_sub(pack_size)
864        );
865
866        // check format
867        check_format(&result);
868    }
869
870    #[test]
871    fn test_encode_offset() {
872        // let value = 11013;
873        let value = 16389;
874
875        let data = encode_offset(value);
876        println!("{data:?}");
877        let mut reader = Cursor::new(data);
878        let (result, _) = read_offset_encoding(&mut reader).unwrap();
879        println!("result: {result}");
880        assert_eq!(result, value as u64);
881    }
882
883    #[tokio::test]
884    async fn test_pack_encoder_large_file_with_delta() {
885        init_logger();
886        let entries = get_entries_for_test().await;
887        let entries_number = entries.lock().await.len();
888
889        let total_original_size: usize = entries
890            .lock()
891            .await
892            .iter()
893            .map(|entry| entry.data.len())
894            .sum();
895
896        let (tx, mut rx) = mpsc::channel(100_000);
897        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
898
899        let encoder = PackEncoder::new(entries_number, 10, tx);
900
901        let start = Instant::now(); // 开始时间
902        encoder.encode_async(entry_rx).await.unwrap();
903
904        // spawn a task to send entries
905        tokio::spawn(async move {
906            let entries = entries.lock().await;
907            for entry in entries.iter() {
908                entry_tx
909                    .send(MetaAttached {
910                        inner: entry.clone(),
911                        meta: EntryMeta::new(),
912                    })
913                    .await
914                    .unwrap();
915            }
916            drop(entry_tx);
917            tracing::info!("all entries sent");
918        });
919
920        let mut result = Vec::new();
921        while let Some(chunk) = rx.recv().await {
922            result.extend(chunk);
923        }
924
925        let pack_size = result.len();
926        let compression_rate = if total_original_size > 0 {
927            1.0 - (pack_size as f64 / total_original_size as f64)
928        } else {
929            0.0
930        };
931
932        let duration = start.elapsed();
933        tracing::info!("test executed in: {:.2?}", duration);
934        tracing::info!("new pack file size: {}", pack_size);
935        tracing::info!("original total size: {}", total_original_size);
936        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
937        tracing::info!(
938            "space saved: {} bytes",
939            total_original_size.saturating_sub(pack_size)
940        );
941
942        // check format
943        check_format(&result);
944    }
945}