git_internal/internal/pack/
encode.rs

1use std::cmp::Ordering;
2use std::collections::VecDeque;
3use std::io::Write;
4
5use crate::delta;
6use crate::zstdelta;
7use crate::internal::object::types::ObjectType;
8use crate::time_it;
9use crate::{errors::GitError, hash::SHA1, internal::pack::entry::Entry};
10use ahash::AHasher;
11use flate2::write::ZlibEncoder;
12use rayon::prelude::*;
13use sha1::{Digest, Sha1};
14use std::hash::{Hash, Hasher};
15use tokio::sync::mpsc;
16use tokio::task::JoinHandle;
17
18const MAX_CHAIN_LEN: usize = 50;
19const MIN_DELTA_RATE: f64 = 0.5; // minimum delta rate
20//const MAX_ZSTDELTA_CHAIN_LEN: usize = 50;
21
22/// A encoder for generating pack files with delta objects.
23pub struct PackEncoder {
24    object_number: usize,
25    process_index: usize,
26    window_size: usize,
27    // window: VecDeque<(Entry, usize)>, // entry and offset
28    sender: Option<mpsc::Sender<Vec<u8>>>,
29    inner_offset: usize, // offset of current entry
30    inner_hash: Sha1,    // Not SHA1 because need update trait
31    final_hash: Option<SHA1>,
32    start_encoding: bool,
33}
34
35/// Encode header of pack file (12 byte)<br>
36/// Content: 'PACK', Version(2), number of objects
37fn encode_header(object_number: usize) -> Vec<u8> {
38    let mut result: Vec<u8> = vec![
39        b'P', b'A', b'C', b'K', // The logotype of the Pack File
40        0, 0, 0, 2, // generates version 2 only.
41    ];
42    assert_ne!(object_number, 0); // guarantee self.number_of_objects!=0
43    assert!(object_number < (1 << 32));
44    //TODO: GitError:numbers of objects should < 4G ,
45    result.append((object_number as u32).to_be_bytes().to_vec().as_mut()); // to 4 bytes (network byte order aka. big-endian)
46    result
47}
48
49/// Encode offset of delta object
50fn encode_offset(mut value: usize) -> Vec<u8> {
51    assert_ne!(value, 0, "offset can't be zero");
52    let mut bytes = Vec::new();
53
54    bytes.push((value & 0x7F) as u8);
55    value >>= 7;
56    while value != 0 {
57        value -= 1;
58        let byte = (value & 0x7F) as u8 | 0x80; // set first bit one
59        value >>= 7;
60        bytes.push(byte);
61    }
62    bytes.reverse();
63    bytes
64}
65
66/// Encode one object, and update the hash
67/// @offset: offset of this object if it's a delta object. For other object, it's None
68fn encode_one_object(entry: &Entry, offset: Option<usize>) -> Result<Vec<u8>, GitError> {
69    // try encode as delta
70    let obj_data = &entry.data;
71    let obj_data_len = obj_data.len();
72    let obj_type_number = entry.obj_type.to_u8();
73
74    let mut encoded_data = Vec::new();
75
76    // **header** encoding
77    let mut header_data = vec![(0x80 | (obj_type_number << 4)) + (obj_data_len & 0x0f) as u8];
78    let mut size = obj_data_len >> 4; // 4 bit has been used in first byte
79    if size > 0 {
80        while size > 0 {
81            if size >> 7 > 0 {
82                header_data.push((0x80 | size) as u8);
83                size >>= 7;
84            } else {
85                header_data.push(size as u8);
86                break;
87            }
88        }
89    } else {
90        header_data.push(0);
91    }
92    encoded_data.extend(header_data);
93
94    // **offset** encoding
95    if entry.obj_type == ObjectType::OffsetDelta || entry.obj_type == ObjectType::OffsetZstdelta {
96        let offset_data = encode_offset(offset.unwrap());
97        encoded_data.extend(offset_data);
98    } else if entry.obj_type == ObjectType::HashDelta {
99        unreachable!("unsupported type")
100    }
101
102    // **data** encoding, need zlib compress
103    let mut inflate = ZlibEncoder::new(Vec::new(), flate2::Compression::default());
104    inflate
105        .write_all(obj_data)
106        .expect("zlib compress should never failed");
107    inflate.flush().expect("zlib flush should never failed");
108    let compressed_data = inflate.finish().expect("zlib compress should never failed");
109    // self.write_all_and_update(&compressed_data).await;
110    encoded_data.extend(compressed_data);
111    Ok(encoded_data)
112}
113
114fn magic_sort(a: &Entry, b: &Entry) -> Ordering {
115    // let ord = b.obj_type.to_u8().cmp(&a.obj_type.to_u8());
116    // if ord != Ordering::Equal {
117    //     return ord;
118    // }
119
120    // the hash should be file hash not content hash
121    // todo the feature need larger refactor
122    // let ord = b.hash.cmp(&a.hash);
123    // if ord != Ordering::Equal { return ord; }
124
125    let ord = b.data.len().cmp(&a.data.len());
126    if ord != Ordering::Equal {
127        return ord;
128    }
129
130    // fallback pointer order (newest first)
131    (a as *const Entry).cmp(&(b as *const Entry))
132}
133
134fn calc_hash(data: &[u8]) -> u64 {
135    let mut hasher = AHasher::default();
136    data.hash(&mut hasher);
137    hasher.finish()
138}
139
140fn cheap_similar(a: &[u8], b: &[u8]) -> bool {
141    let k = a.len().min(b.len()).min(128);
142    if k == 0 {
143        return false;
144    }
145    calc_hash(&a[..k]) == calc_hash(&b[..k])
146}
147
148impl PackEncoder {
149    pub fn new(object_number: usize, window_size: usize, sender: mpsc::Sender<Vec<u8>>) -> Self {
150        PackEncoder {
151            object_number,
152            window_size,
153            process_index: 0,
154            // window: VecDeque::with_capacity(window_size),
155            sender: Some(sender),
156            inner_offset: 12, // 12 bytes header
157            inner_hash: Sha1::new(),
158            final_hash: None,
159            start_encoding: false,
160        }
161    }
162
163    pub fn drop_sender(&mut self) {
164        self.sender.take(); // Take the sender out, dropping it
165    }
166
167    pub async fn send_data(&mut self, data: Vec<u8>) {
168        if let Some(sender) = &self.sender {
169            sender.send(data).await.unwrap();
170        }
171    }
172
173    /// Get the hash of the pack file. if the pack file is not finished, return None
174    pub fn get_hash(&self) -> Option<SHA1> {
175        self.final_hash
176    }
177
178    /// Encodes entries into a pack file with delta objects and outputs them through the specified writer.
179    /// # Arguments
180    /// - `rx` - A receiver channel (`mpsc::Receiver<Entry>`) from which entries to be encoded are received.
181    /// # Returns
182    /// Returns `Ok(())` if encoding is successful, or a `GitError` in case of failure.
183    /// - Returns a `GitError` if there is a failure during the encoding process.
184    /// - Returns `PackEncodeError` if an encoding operation is already in progress.
185    pub async fn encode(&mut self, entry_rx: mpsc::Receiver<Entry>) -> Result<(), GitError> {
186        self.inner_encode(entry_rx, false).await
187    }
188
189    pub async fn encode_with_zstdelta(
190        &mut self,
191        entry_rx: mpsc::Receiver<Entry>,
192    ) -> Result<(), GitError> {
193        self.inner_encode(entry_rx, true).await
194    }
195
196    /// Delta selection heuristics are based on:
197    ///   https://github.com/git/git/blob/master/Documentation/technical/pack-heuristics.adoc
198    async fn inner_encode(
199        &mut self,
200        mut entry_rx: mpsc::Receiver<Entry>,
201        enable_zstdelta: bool,
202    ) -> Result<(), GitError> {
203        let head = encode_header(self.object_number);
204        self.send_data(head.clone()).await;
205        self.inner_hash.update(&head);
206
207        // ensure only one decode can only invoke once
208        if self.start_encoding {
209            return Err(GitError::PackEncodeError(
210                "encoding operation is already in progress".to_string(),
211            ));
212        }
213
214        let mut commits: Vec<Entry> = Vec::new();
215        let mut trees: Vec<Entry> = Vec::new();
216        let mut blobs: Vec<Entry> = Vec::new();
217        let mut tags: Vec<Entry> = Vec::new();
218        while let Some(entry) = entry_rx.recv().await {
219            match entry.obj_type {
220                ObjectType::Commit => {
221                    commits.push(entry);
222                }
223                ObjectType::Tree => {
224                    trees.push(entry);
225                }
226                ObjectType::Blob => {
227                    blobs.push(entry);
228                }
229                ObjectType::Tag => {
230                    tags.push(entry);
231                }
232                _ => {}
233            }
234        }
235
236        commits.sort_by(magic_sort);
237        trees.sort_by(magic_sort);
238        blobs.sort_by(magic_sort);
239        tags.sort_by(magic_sort);
240        tracing::info!(
241            "numbers :  commits: {:?} trees: {:?} blobs:{:?} tag :{:?}",
242            commits.len(),
243            trees.len(),
244            blobs.len(),
245            tags.len()
246        );
247
248        // parallel encoding vec with different object_type
249        let (commit_results, tree_results, blob_results, tag_results) = tokio::try_join!(
250            tokio::task::spawn_blocking(move || {
251                Self::try_as_offset_delta(commits, 10, enable_zstdelta)
252            }),
253            tokio::task::spawn_blocking(move || {
254                Self::try_as_offset_delta(trees, 10, enable_zstdelta)
255            }),
256            tokio::task::spawn_blocking(move || {
257                Self::try_as_offset_delta(blobs, 10, enable_zstdelta)
258            }),
259            tokio::task::spawn_blocking(move || {
260                Self::try_as_offset_delta(tags, 10, enable_zstdelta)
261            }),
262        )
263        .map_err(|e| GitError::PackEncodeError(format!("Task join error: {e}")))?;
264
265        let all_encoded_data = [
266            commit_results
267                .map_err(|e| GitError::PackEncodeError(format!("Commit encoding error: {e}")))?,
268            tree_results
269                .map_err(|e| GitError::PackEncodeError(format!("Tree encoding error: {e}")))?,
270            blob_results
271                .map_err(|e| GitError::PackEncodeError(format!("Blob encoding error: {e}")))?,
272            tag_results
273                .map_err(|e| GitError::PackEncodeError(format!("Tag encoding error: {e}")))?,
274        ]
275        .concat();
276
277        // 按顺序发送合并后的结果
278        for data in all_encoded_data {
279            self.write_all_and_update(&data).await;
280        }
281
282        // Hash signature
283        let hash_result = self.inner_hash.clone().finalize();
284        self.final_hash = Some(SHA1::from_bytes(&hash_result));
285        self.send_data(hash_result.to_vec()).await;
286
287        self.drop_sender();
288        Ok(())
289    }
290
291    /// Try to encode as delta using objects in window
292    /// delta & zstdelta have been gathered here
293    /// Refs: https://sapling-scm.com/docs/dev/internals/zstdelta/
294    /// the sliding window was moved here
295    /// # Returns
296    /// - Return (Vec<Vec<u8>) if success make delta
297    /// - Return (None) if didn't delta,
298    fn try_as_offset_delta(
299        mut bucket: Vec<Entry>,
300        window_size: usize,
301        enable_zstdelta: bool,
302    ) -> Result<Vec<Vec<u8>>, GitError> {
303        let mut current_offset = 0usize;
304        let mut window: VecDeque<(Entry, usize)> = VecDeque::with_capacity(window_size);
305        let mut res: Vec<Vec<u8>> = Vec::new();
306
307        for entry in bucket.iter_mut() {
308            //let entry_for_window = entry.clone();
309            // 每次循环重置最佳基对象选择
310            let mut best_base: Option<&(Entry, usize)> = None;
311            let mut best_rate: f64 = 0.0;
312            let tie_epsilon: f64 = 0.15;
313
314            let candidates: Vec<_> = window
315                .par_iter()
316                .with_min_len(3)
317                .filter_map(|try_base| {
318                    if try_base.0.obj_type != entry.obj_type {
319                        return None;
320                    }
321
322                    if try_base.0.chain_len >= MAX_CHAIN_LEN {
323                        return None;
324                    }
325
326                    if try_base.0.hash == entry.hash {
327                        return None;
328                    }
329
330                    let sym_ratio = (try_base.0.data.len().min(entry.data.len()) as f64)
331                        / (try_base.0.data.len().max(entry.data.len()) as f64);
332                    if sym_ratio < 0.5 {
333                        return None;
334                    }
335
336                    if !cheap_similar(&try_base.0.data, &entry.data) {
337                        return None;
338                    }
339
340                    let rate = if (try_base.0.data.len() + entry.data.len()) / 2 > 64 {
341                        delta::heuristic_encode_rate_parallel(&try_base.0.data, &entry.data)
342                    } else {
343                        delta::encode_rate(&try_base.0.data, &entry.data)
344                        // let try_delta_obj = zstdelta::diff(&try_base.0.data, &entry.data).unwrap();
345                        // 1.0 - try_delta_obj.len() as f64 / entry.data.len() as f64
346                    };
347
348                    if rate > MIN_DELTA_RATE {
349                        Some((rate, try_base))
350                    } else {
351                        None
352                    }
353                })
354                .collect();
355
356            for (rate, try_base) in candidates {
357                match best_base {
358                    None => {
359                        best_rate = rate;
360                        //best_base_offset = current_offset - try_base.1;
361                        best_base = Some(try_base);
362                    }
363                    Some(best_base_ref) => {
364                        let is_better = if rate > best_rate + tie_epsilon {
365                            true
366                        } else if (rate - best_rate).abs() <= tie_epsilon {
367                            try_base.0.chain_len > best_base_ref.0.chain_len
368                        } else {
369                            false
370                        };
371
372                        if is_better {
373                            best_rate = rate;
374                            best_base = Some(try_base);
375                        }
376                    }
377                }
378            }
379
380            let mut entry_for_window = entry.clone();
381
382            let offset = best_base.map(|best_base| {
383                let delta = if enable_zstdelta {
384                    entry.obj_type = ObjectType::OffsetZstdelta;
385                    zstdelta::diff(&best_base.0.data, &entry.data)
386                        .map_err(|e| {
387                            GitError::DeltaObjectError(format!("zstdelta diff failed: {e}"))
388                        })
389                        .unwrap()
390                } else {
391                    entry.obj_type = ObjectType::OffsetDelta;
392                    delta::encode(&best_base.0.data, &entry.data)
393                };
394                //entry.obj_type = ObjectType::OffsetDelta;
395                entry.data = delta;
396                entry.chain_len = best_base.0.chain_len + 1;
397                current_offset - best_base.1
398            });
399
400            entry_for_window.chain_len = entry.chain_len;
401            let obj_data = encode_one_object(entry, offset)?;
402            window.push_back((entry_for_window, current_offset));
403            if window.len() > window_size {
404                window.pop_front();
405            }
406            current_offset += obj_data.len();
407            res.push(obj_data);
408        }
409        Ok(res)
410    }
411
412    /// Parallel encode with rayon, only works when window_size == 0 (no delta)
413    pub async fn parallel_encode(
414        &mut self,
415        mut entry_rx: mpsc::Receiver<Entry>,
416    ) -> Result<(), GitError> {
417        if self.window_size != 0 {
418            return Err(GitError::PackEncodeError(
419                "parallel encode only works when window_size == 0".to_string(),
420            ));
421        }
422
423        let head = encode_header(self.object_number);
424        self.send_data(head.clone()).await;
425        self.inner_hash.update(&head);
426
427        // ensure only one decode can only invoke once
428        if self.start_encoding {
429            return Err(GitError::PackEncodeError(
430                "encoding operation is already in progress".to_string(),
431            ));
432        }
433
434        let batch_size = usize::max(1000, entry_rx.max_capacity() / 10); // A temporary value, not optimized
435        tracing::info!("encode with batch size: {}", batch_size);
436        loop {
437            let mut batch_entries = Vec::with_capacity(batch_size);
438            time_it!("parallel encode: receive batch", {
439                for _ in 0..batch_size {
440                    match entry_rx.recv().await {
441                        Some(entry) => {
442                            batch_entries.push(entry);
443                            self.process_index += 1;
444                        }
445                        None => break,
446                    }
447                }
448            });
449
450            if batch_entries.is_empty() {
451                break;
452            }
453
454            // use `collect` will return result in order, refs: https://github.com/rayon-rs/rayon/issues/551#issuecomment-371657900
455            let batch_result: Vec<Vec<u8>> = time_it!("parallel encode: encode batch", {
456                batch_entries
457                    .par_iter()
458                    .map(|entry| encode_one_object(entry, None).unwrap())
459                    .collect()
460            });
461
462            time_it!("parallel encode: write batch", {
463                for obj_data in batch_result {
464                    self.write_all_and_update(&obj_data).await;
465                }
466            });
467        }
468
469        if self.process_index != self.object_number {
470            panic!(
471                "not all objects are encoded, process:{}, total:{}",
472                self.process_index, self.object_number
473            );
474        }
475
476        // hash signature
477        let hash_result = self.inner_hash.clone().finalize();
478        self.final_hash = Some(SHA1::from_bytes(&hash_result));
479        self.send_data(hash_result.to_vec()).await;
480        self.drop_sender();
481        Ok(())
482    }
483
484    /// Write data to writer and update hash & offset
485    async fn write_all_and_update(&mut self, data: &[u8]) {
486        self.inner_hash.update(data);
487        self.inner_offset += data.len();
488        self.send_data(data.to_vec()).await;
489    }
490
491    /// async version of encode, result data will be returned by JoinHandle.
492    /// It will consume PackEncoder, so you can't use it after calling this function.
493    /// when window_size = 0, it executes parallel_encode which retains stream transmission
494    /// when window_size = 0,it executes encode which uses magic sort and delta.
495    /// It seems that all other modules rely on this api
496    pub async fn encode_async(
497        mut self,
498        rx: mpsc::Receiver<Entry>,
499    ) -> Result<JoinHandle<()>, GitError> {
500        Ok(tokio::spawn(async move {
501            if self.window_size == 0 {
502                self.parallel_encode(rx).await.unwrap()
503            } else {
504                self.encode(rx).await.unwrap()
505            }
506        }))
507    }
508
509    pub async fn encode_async_with_zstdelta(
510        mut self,
511        rx: mpsc::Receiver<Entry>,
512    ) -> Result<JoinHandle<()>, GitError> {
513        Ok(tokio::spawn(async move {
514            // Do not use parallel encode with zstdelta because it make no sense.
515            self.encode_with_zstdelta(rx).await.unwrap()
516        }))
517    }
518}
519
520#[cfg(test)]
521mod tests {
522    use std::env;
523    use std::sync::Arc;
524    use std::time::Instant;
525    use std::{io::Cursor, path::PathBuf};
526    use tokio::sync::Mutex;
527
528    use crate::internal::object::blob::Blob;
529    use crate::internal::pack::utils::read_offset_encoding;
530    use crate::internal::pack::{Pack, tests::init_logger};
531    use crate::time_it;
532
533    use super::*;
534
535    fn check_format(data: &Vec<u8>) {
536        let mut p = Pack::new(
537            None,
538            Some(1024 * 1024 * 1024 * 6), // 6GB
539            Some(PathBuf::from("/tmp/.cache_temp")),
540            true,
541        );
542        let mut reader = Cursor::new(data);
543        tracing::debug!("start check format");
544        p.decode(&mut reader, |_, _| {})
545            .expect("pack file format error");
546    }
547
548    #[tokio::test]
549    async fn test_pack_encoder() {
550        async fn encode_once(window_size: usize) -> Vec<u8> {
551            let (tx, mut rx) = mpsc::channel(100);
552            let (entry_tx, entry_rx) = mpsc::channel::<Entry>(1);
553
554            // make some different objects, or decode will fail
555            let str_vec = vec!["hello, word", "hello, world.", "!", "123141251251"];
556            let encoder = PackEncoder::new(str_vec.len(), window_size, tx);
557            encoder.encode_async(entry_rx).await.unwrap();
558
559            for str in str_vec {
560                let blob = Blob::from_content(str);
561                let entry: Entry = blob.into();
562                entry_tx.send(entry).await.unwrap();
563            }
564            drop(entry_tx);
565            // assert!(encoder.get_hash().is_some());
566            let mut result = Vec::new();
567            while let Some(chunk) = rx.recv().await {
568                result.extend(chunk);
569            }
570            result
571        }
572
573        // without delta
574        let pack_without_delta = encode_once(0).await;
575        let pack_without_delta_size = pack_without_delta.len();
576        check_format(&pack_without_delta);
577
578        // with delta
579        let pack_with_delta = encode_once(4).await;
580        assert!(pack_with_delta.len() <= pack_without_delta_size);
581        check_format(&pack_with_delta);
582    }
583
584    async fn get_entries_for_test() -> Arc<Mutex<Vec<Entry>>> {
585        let mut source = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
586        source.push("tests/data/packs/pack-f8bbb573cef7d851957caceb491c073ee8e8de41.pack");
587        // let file_map = crate::test_utils::setup_lfs_file().await;
588        // let source = file_map
589        //     .get("git-2d187177923cd618a75da6c6db45bb89d92bd504.pack")
590        //     .unwrap();
591        // decode pack file to get entries
592        let mut p = Pack::new(None, None, Some(PathBuf::from("/tmp/.cache_temp")), true);
593
594        let f = std::fs::File::open(source).unwrap();
595        tracing::info!("pack file size: {}", f.metadata().unwrap().len());
596        let mut reader = std::io::BufReader::new(f);
597        let entries = Arc::new(Mutex::new(Vec::new()));
598        let entries_clone = entries.clone();
599        p.decode(&mut reader, move |entry, _| {
600            let mut entries = entries_clone.blocking_lock();
601            entries.push(entry);
602        })
603        .unwrap();
604        assert_eq!(p.number, entries.lock().await.len());
605        tracing::info!("total entries: {}", p.number);
606        drop(p);
607
608        entries
609    }
610
611    #[tokio::test]
612    async fn test_pack_encoder_parallel_large_file() {
613        init_logger();
614
615        let start = Instant::now();
616        let entries = get_entries_for_test().await;
617        let entries_number = entries.lock().await.len();
618
619        let total_original_size: usize = entries
620            .lock()
621            .await
622            .iter()
623            .map(|entry| entry.data.len())
624            .sum();
625
626        // encode entries with parallel
627        let (tx, mut rx) = mpsc::channel(1_000_000);
628        let (entry_tx, entry_rx) = mpsc::channel::<Entry>(1_000_000);
629
630        let mut encoder = PackEncoder::new(entries_number, 0, tx);
631        tokio::spawn(async move {
632            time_it!("test parallel encode", {
633                encoder.parallel_encode(entry_rx).await.unwrap();
634            });
635        });
636
637        // spawn a task to send entries
638        tokio::spawn(async move {
639            let entries = entries.lock().await;
640            for entry in entries.iter() {
641                entry_tx.send(entry.clone()).await.unwrap();
642            }
643            drop(entry_tx);
644            tracing::info!("all entries sent");
645        });
646
647        let mut result = Vec::new();
648        while let Some(chunk) = rx.recv().await {
649            result.extend(chunk);
650        }
651
652        let pack_size = result.len();
653        let compression_rate = if total_original_size > 0 {
654            1.0 - (pack_size as f64 / total_original_size as f64)
655        } else {
656            0.0
657        };
658
659        let duration = start.elapsed();
660        tracing::info!("test executed in: {:.2?}", duration);
661        tracing::info!("new pack file size: {}", result.len());
662        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
663        // check format
664        check_format(&result);
665    }
666
667    #[tokio::test]
668    async fn test_pack_encoder_large_file() {
669        init_logger();
670        let entries = get_entries_for_test().await;
671        let entries_number = entries.lock().await.len();
672
673        let total_original_size: usize = entries
674            .lock()
675            .await
676            .iter()
677            .map(|entry| entry.data.len())
678            .sum();
679
680        let start = Instant::now();
681        // encode entries
682        let (tx, mut rx) = mpsc::channel(100_000);
683        let (entry_tx, entry_rx) = mpsc::channel::<Entry>(100_000);
684
685        let mut encoder = PackEncoder::new(entries_number, 0, tx);
686        tokio::spawn(async move {
687            time_it!("test encode no parallel", {
688                encoder.encode(entry_rx).await.unwrap();
689            });
690        });
691
692        // spawn a task to send entries
693        tokio::spawn(async move {
694            let entries = entries.lock().await;
695            for entry in entries.iter() {
696                entry_tx.send(entry.clone()).await.unwrap();
697            }
698            drop(entry_tx);
699            tracing::info!("all entries sent");
700        });
701
702        // // only receive data
703        // while (rx.recv().await).is_some() {
704        //     // do nothing
705        // }
706
707        let mut result = Vec::new();
708        while let Some(chunk) = rx.recv().await {
709            result.extend(chunk);
710        }
711
712        let pack_size = result.len();
713        let compression_rate = if total_original_size > 0 {
714            1.0 - (pack_size as f64 / total_original_size as f64)
715        } else {
716            0.0
717        };
718
719        let duration = start.elapsed();
720        tracing::info!("test executed in: {:.2?}", duration);
721        tracing::info!("new pack file size: {}", pack_size);
722        tracing::info!("original total size: {}", total_original_size);
723        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
724        tracing::info!(
725            "space saved: {} bytes",
726            total_original_size.saturating_sub(pack_size)
727        );
728    }
729
730    #[tokio::test]
731    async fn test_pack_encoder_with_zstdelta() {
732        init_logger();
733        let entries = get_entries_for_test().await;
734        let entries_number = entries.lock().await.len();
735
736        let total_original_size: usize = entries
737            .lock()
738            .await
739            .iter()
740            .map(|entry| entry.data.len())
741            .sum();
742
743        let start = Instant::now();
744        let (tx, mut rx) = mpsc::channel(100_000);
745        let (entry_tx, entry_rx) = mpsc::channel::<Entry>(100_000);
746
747        let encoder = PackEncoder::new(entries_number, 10, tx);
748        encoder.encode_async_with_zstdelta(entry_rx).await.unwrap();
749
750        // spawn a task to send entries
751        tokio::spawn(async move {
752            let entries = entries.lock().await;
753            for entry in entries.iter() {
754                entry_tx.send(entry.clone()).await.unwrap();
755            }
756            drop(entry_tx);
757            tracing::info!("all entries sent");
758        });
759
760        let mut result = Vec::new();
761        while let Some(chunk) = rx.recv().await {
762            result.extend(chunk);
763        }
764
765        let pack_size = result.len();
766        let compression_rate = if total_original_size > 0 {
767            1.0 - (pack_size as f64 / total_original_size as f64)
768        } else {
769            0.0
770        };
771
772        let duration = start.elapsed();
773        tracing::info!("test executed in: {:.2?}", duration);
774        tracing::info!("new pack file size: {}", pack_size);
775        tracing::info!("original total size: {}", total_original_size);
776        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
777        tracing::info!(
778            "space saved: {} bytes",
779            total_original_size.saturating_sub(pack_size)
780        );
781
782        // check format
783        check_format(&result);
784    }
785
786    #[test]
787    fn test_encode_offset() {
788        // let value = 11013;
789        let value = 16389;
790
791        let data = encode_offset(value);
792        println!("{data:?}");
793        let mut reader = Cursor::new(data);
794        let (result, _) = read_offset_encoding(&mut reader).unwrap();
795        println!("result: {result}");
796        assert_eq!(result, value as u64);
797    }
798
799    #[tokio::test]
800    async fn test_pack_encoder_large_file_with_delta() {
801        init_logger();
802        let entries = get_entries_for_test().await;
803        let entries_number = entries.lock().await.len();
804
805        let total_original_size: usize = entries
806            .lock()
807            .await
808            .iter()
809            .map(|entry| entry.data.len())
810            .sum();
811
812        let (tx, mut rx) = mpsc::channel(100_000);
813        let (entry_tx, entry_rx) = mpsc::channel::<Entry>(100_000);
814
815        let encoder = PackEncoder::new(entries_number, 10, tx);
816
817        let start = Instant::now(); // 开始时间
818        encoder.encode_async(entry_rx).await.unwrap();
819
820        // spawn a task to send entries
821        tokio::spawn(async move {
822            let entries = entries.lock().await;
823            for entry in entries.iter() {
824                entry_tx.send(entry.clone()).await.unwrap();
825            }
826            drop(entry_tx);
827            tracing::info!("all entries sent");
828        });
829
830        let mut result = Vec::new();
831        while let Some(chunk) = rx.recv().await {
832            result.extend(chunk);
833        }
834
835        let pack_size = result.len();
836        let compression_rate = if total_original_size > 0 {
837            1.0 - (pack_size as f64 / total_original_size as f64)
838        } else {
839            0.0
840        };
841
842        let duration = start.elapsed();
843        tracing::info!("test executed in: {:.2?}", duration);
844        tracing::info!("new pack file size: {}", pack_size);
845        tracing::info!("original total size: {}", total_original_size);
846        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
847        tracing::info!(
848            "space saved: {} bytes",
849            total_original_size.saturating_sub(pack_size)
850        );
851
852        // check format
853        check_format(&result);
854    }
855}