git_internal/internal/pack/
encode.rs

1use std::cmp::Ordering;
2use std::collections::VecDeque;
3use std::io::Write;
4
5use crate::delta;
6use crate::zstdelta;
7use crate::internal::object::types::ObjectType;
8use crate::time_it;
9use crate::{errors::GitError, hash::SHA1, internal::pack::entry::Entry};
10use ahash::AHasher;
11use flate2::write::ZlibEncoder;
12use rayon::prelude::*;
13use sha1::{Digest, Sha1};
14use std::hash::{Hash, Hasher};
15use tokio::sync::mpsc;
16use tokio::task::JoinHandle;
17
18const MAX_CHAIN_LEN: usize = 50;
19const MIN_DELTA_RATE: f64 = 0.5; // minimum delta rate
20//const MAX_ZSTDELTA_CHAIN_LEN: usize = 50;
21
22/// A encoder for generating pack files with delta objects.
23pub struct PackEncoder {
24    object_number: usize,
25    process_index: usize,
26    window_size: usize,
27    // window: VecDeque<(Entry, usize)>, // entry and offset
28    sender: Option<mpsc::Sender<Vec<u8>>>,
29    inner_offset: usize, // offset of current entry
30    inner_hash: Sha1,    // Not SHA1 because need update trait
31    final_hash: Option<SHA1>,
32    start_encoding: bool,
33}
34
35/// Encode header of pack file (12 byte)<br>
36/// Content: 'PACK', Version(2), number of objects
37fn encode_header(object_number: usize) -> Vec<u8> {
38    let mut result: Vec<u8> = vec![
39        b'P', b'A', b'C', b'K', // The logotype of the Pack File
40        0, 0, 0, 2, // generates version 2 only.
41    ];
42    assert_ne!(object_number, 0); // guarantee self.number_of_objects!=0
43    assert!(object_number < (1 << 32));
44    //TODO: GitError:numbers of objects should < 4G ,
45    result.append((object_number as u32).to_be_bytes().to_vec().as_mut()); // to 4 bytes (network byte order aka. big-endian)
46    result
47}
48
49/// Encode offset of delta object
50fn encode_offset(mut value: usize) -> Vec<u8> {
51    assert_ne!(value, 0, "offset can't be zero");
52    let mut bytes = Vec::new();
53
54    bytes.push((value & 0x7F) as u8);
55    value >>= 7;
56    while value != 0 {
57        value -= 1;
58        let byte = (value & 0x7F) as u8 | 0x80; // set first bit one
59        value >>= 7;
60        bytes.push(byte);
61    }
62    bytes.reverse();
63    bytes
64}
65
66/// Encode one object, and update the hash
67/// @offset: offset of this object if it's a delta object. For other object, it's None
68fn encode_one_object(entry: &Entry, offset: Option<usize>) -> Result<Vec<u8>, GitError> {
69    // try encode as delta
70    let obj_data = &entry.data;
71    let obj_data_len = obj_data.len();
72    let obj_type_number = entry.obj_type.to_u8();
73
74    let mut encoded_data = Vec::new();
75
76    // **header** encoding
77    let mut header_data = vec![(0x80 | (obj_type_number << 4)) + (obj_data_len & 0x0f) as u8];
78    let mut size = obj_data_len >> 4; // 4 bit has been used in first byte
79    if size > 0 {
80        while size > 0 {
81            if size >> 7 > 0 {
82                header_data.push((0x80 | size) as u8);
83                size >>= 7;
84            } else {
85                header_data.push(size as u8);
86                break;
87            }
88        }
89    } else {
90        header_data.push(0);
91    }
92    encoded_data.extend(header_data);
93
94    // **offset** encoding
95    if entry.obj_type == ObjectType::OffsetDelta || entry.obj_type == ObjectType::OffsetZstdelta {
96        let offset_data = encode_offset(offset.unwrap());
97        encoded_data.extend(offset_data);
98    } else if entry.obj_type == ObjectType::HashDelta {
99        unreachable!("unsupported type")
100    }
101
102    // **data** encoding, need zlib compress
103    let mut inflate = ZlibEncoder::new(Vec::new(), flate2::Compression::default());
104    inflate
105        .write_all(obj_data)
106        .expect("zlib compress should never failed");
107    inflate.flush().expect("zlib flush should never failed");
108    let compressed_data = inflate.finish().expect("zlib compress should never failed");
109    // self.write_all_and_update(&compressed_data).await;
110    encoded_data.extend(compressed_data);
111    Ok(encoded_data)
112}
113
114fn magic_sort(a: &Entry, b: &Entry) -> Ordering {
115    // let ord = b.obj_type.to_u8().cmp(&a.obj_type.to_u8());
116    // if ord != Ordering::Equal {
117    //     return ord;
118    // }
119
120    // the hash should be file hash not content hash
121    // todo the feature need larger refactor
122    // let ord = b.hash.cmp(&a.hash);
123    // if ord != Ordering::Equal { return ord; }
124
125    let ord = b.data.len().cmp(&a.data.len());
126    if ord != Ordering::Equal {
127        return ord;
128    }
129
130    // fallback pointer order (newest first)
131    (a as *const Entry).cmp(&(b as *const Entry))
132}
133
134fn calc_hash(data: &[u8]) -> u64 {
135    let mut hasher = AHasher::default();
136    data.hash(&mut hasher);
137    hasher.finish()
138}
139
140fn cheap_similar(a: &[u8], b: &[u8]) -> bool {
141    let k = a.len().min(b.len()).min(128);
142    if k == 0 {
143        return false;
144    }
145    calc_hash(&a[..k]) == calc_hash(&b[..k])
146}
147
148impl PackEncoder {
149    pub fn new(object_number: usize, window_size: usize, sender: mpsc::Sender<Vec<u8>>) -> Self {
150        PackEncoder {
151            object_number,
152            window_size,
153            process_index: 0,
154            // window: VecDeque::with_capacity(window_size),
155            sender: Some(sender),
156            inner_offset: 12, // 12 bytes header
157            inner_hash: Sha1::new(),
158            final_hash: None,
159            start_encoding: false,
160        }
161    }
162
163    pub fn drop_sender(&mut self) {
164        self.sender.take(); // Take the sender out, dropping it
165    }
166
167    pub async fn send_data(&mut self, data: Vec<u8>) {
168        if let Some(sender) = &self.sender {
169            sender.send(data).await.unwrap();
170        }
171    }
172
173    /// Get the hash of the pack file. if the pack file is not finished, return None
174    pub fn get_hash(&self) -> Option<SHA1> {
175        self.final_hash
176    }
177
178    /// Encodes entries into a pack file with delta objects and outputs them through the specified writer.
179    /// # Arguments
180    /// - `rx` - A receiver channel (`mpsc::Receiver<Entry>`) from which entries to be encoded are received.
181    /// # Returns
182    /// Returns `Ok(())` if encoding is successful, or a `GitError` in case of failure.
183    /// - Returns a `GitError` if there is a failure during the encoding process.
184    /// - Returns `PackEncodeError` if an encoding operation is already in progress.
185    pub async fn encode(&mut self, entry_rx: mpsc::Receiver<Entry>) -> Result<(), GitError> {
186        self.inner_encode(entry_rx, false).await
187    }
188
189    pub async fn encode_with_zstdelta(
190        &mut self,
191        entry_rx: mpsc::Receiver<Entry>,
192    ) -> Result<(), GitError> {
193        self.inner_encode(entry_rx, true).await
194    }
195
196    /// Delta selection heuristics are based on:
197    ///   https://github.com/git/git/blob/master/Documentation/technical/pack-heuristics.adoc
198    async fn inner_encode(
199        &mut self,
200        mut entry_rx: mpsc::Receiver<Entry>,
201        enable_zstdelta: bool,
202    ) -> Result<(), GitError> {
203        let head = encode_header(self.object_number);
204        self.send_data(head.clone()).await;
205        self.inner_hash.update(&head);
206
207        // ensure only one decode can only invoke once
208        if self.start_encoding {
209            return Err(GitError::PackEncodeError(
210                "encoding operation is already in progress".to_string(),
211            ));
212        }
213
214        let mut commits: Vec<Entry> = Vec::new();
215        let mut trees: Vec<Entry> = Vec::new();
216        let mut blobs: Vec<Entry> = Vec::new();
217        let mut tags: Vec<Entry> = Vec::new();
218        while let Some(entry) = entry_rx.recv().await {
219            match entry.obj_type {
220                ObjectType::Commit => {
221                    commits.push(entry);
222                }
223                ObjectType::Tree => {
224                    trees.push(entry);
225                }
226                ObjectType::Blob => {
227                    blobs.push(entry);
228                }
229                ObjectType::Tag => {
230                    tags.push(entry);
231                }
232                _ => {}
233            }
234        }
235
236        commits.sort_by(magic_sort);
237        trees.sort_by(magic_sort);
238        blobs.sort_by(magic_sort);
239        tags.sort_by(magic_sort);
240        tracing::info!(
241            "numbers :  commits: {:?} trees: {:?} blobs:{:?} tag :{:?}",
242            commits.len(),
243            trees.len(),
244            blobs.len(),
245            tags.len()
246        );
247
248        // parallel encoding vec with different object_type
249        let (commit_results, tree_results, blob_results, tag_results) = tokio::try_join!(
250            tokio::task::spawn_blocking(move || {
251                Self::try_as_offset_delta(commits, 10, enable_zstdelta)
252            }),
253            tokio::task::spawn_blocking(move || {
254                Self::try_as_offset_delta(trees, 10, enable_zstdelta)
255            }),
256            tokio::task::spawn_blocking(move || {
257                Self::try_as_offset_delta(blobs, 10, enable_zstdelta)
258            }),
259            tokio::task::spawn_blocking(move || {
260                Self::try_as_offset_delta(tags, 10, enable_zstdelta)
261            }),
262        )
263        .map_err(|e| GitError::PackEncodeError(format!("Task join error: {e}")))?;
264
265        let all_encoded_data = [
266            commit_results
267                .map_err(|e| GitError::PackEncodeError(format!("Commit encoding error: {e}")))?,
268            tree_results
269                .map_err(|e| GitError::PackEncodeError(format!("Tree encoding error: {e}")))?,
270            blob_results
271                .map_err(|e| GitError::PackEncodeError(format!("Blob encoding error: {e}")))?,
272            tag_results
273                .map_err(|e| GitError::PackEncodeError(format!("Tag encoding error: {e}")))?,
274        ]
275        .concat();
276
277        for data in all_encoded_data {
278            self.write_all_and_update(&data).await;
279        }
280
281        // Hash signature
282        let hash_result = self.inner_hash.clone().finalize();
283        self.final_hash = Some(SHA1::from_bytes(&hash_result));
284        self.send_data(hash_result.to_vec()).await;
285
286        self.drop_sender();
287        Ok(())
288    }
289
290    /// Try to encode as delta using objects in window
291    /// delta & zstdelta have been gathered here
292    /// Refs: https://sapling-scm.com/docs/dev/internals/zstdelta/
293    /// the sliding window was moved here
294    /// # Returns
295    /// - Return (Vec<Vec<u8>) if success make delta
296    /// - Return (None) if didn't delta,
297    fn try_as_offset_delta(
298        mut bucket: Vec<Entry>,
299        window_size: usize,
300        enable_zstdelta: bool,
301    ) -> Result<Vec<Vec<u8>>, GitError> {
302        let mut current_offset = 0usize;
303        let mut window: VecDeque<(Entry, usize)> = VecDeque::with_capacity(window_size);
304        let mut res: Vec<Vec<u8>> = Vec::new();
305
306        for entry in bucket.iter_mut() {
307            //let entry_for_window = entry.clone();
308            // 每次循环重置最佳基对象选择
309            let mut best_base: Option<&(Entry, usize)> = None;
310            let mut best_rate: f64 = 0.0;
311            let tie_epsilon: f64 = 0.15;
312
313            let candidates: Vec<_> = window
314                .par_iter()
315                .with_min_len(3)
316                .filter_map(|try_base| {
317                    if try_base.0.obj_type != entry.obj_type {
318                        return None;
319                    }
320
321                    if try_base.0.chain_len >= MAX_CHAIN_LEN {
322                        return None;
323                    }
324
325                    if try_base.0.hash == entry.hash {
326                        return None;
327                    }
328
329                    let sym_ratio = (try_base.0.data.len().min(entry.data.len()) as f64)
330                        / (try_base.0.data.len().max(entry.data.len()) as f64);
331                    if sym_ratio < 0.5 {
332                        return None;
333                    }
334
335                    if !cheap_similar(&try_base.0.data, &entry.data) {
336                        return None;
337                    }
338
339                    let rate = if (try_base.0.data.len() + entry.data.len()) / 2 > 64 {
340                        delta::heuristic_encode_rate_parallel(&try_base.0.data, &entry.data)
341                    } else {
342                        delta::encode_rate(&try_base.0.data, &entry.data)
343                        // let try_delta_obj = zstdelta::diff(&try_base.0.data, &entry.data).unwrap();
344                        // 1.0 - try_delta_obj.len() as f64 / entry.data.len() as f64
345                    };
346
347                    if rate > MIN_DELTA_RATE {
348                        Some((rate, try_base))
349                    } else {
350                        None
351                    }
352                })
353                .collect();
354
355            for (rate, try_base) in candidates {
356                match best_base {
357                    None => {
358                        best_rate = rate;
359                        //best_base_offset = current_offset - try_base.1;
360                        best_base = Some(try_base);
361                    }
362                    Some(best_base_ref) => {
363                        let is_better = if rate > best_rate + tie_epsilon {
364                            true
365                        } else if (rate - best_rate).abs() <= tie_epsilon {
366                            try_base.0.chain_len > best_base_ref.0.chain_len
367                        } else {
368                            false
369                        };
370
371                        if is_better {
372                            best_rate = rate;
373                            best_base = Some(try_base);
374                        }
375                    }
376                }
377            }
378
379            let mut entry_for_window = entry.clone();
380
381            let offset = best_base.map(|best_base| {
382                let delta = if enable_zstdelta {
383                    entry.obj_type = ObjectType::OffsetZstdelta;
384                    zstdelta::diff(&best_base.0.data, &entry.data)
385                        .map_err(|e| {
386                            GitError::DeltaObjectError(format!("zstdelta diff failed: {e}"))
387                        })
388                        .unwrap()
389                } else {
390                    entry.obj_type = ObjectType::OffsetDelta;
391                    delta::encode(&best_base.0.data, &entry.data)
392                };
393                //entry.obj_type = ObjectType::OffsetDelta;
394                entry.data = delta;
395                entry.chain_len = best_base.0.chain_len + 1;
396                current_offset - best_base.1
397            });
398
399            entry_for_window.chain_len = entry.chain_len;
400            let obj_data = encode_one_object(entry, offset)?;
401            window.push_back((entry_for_window, current_offset));
402            if window.len() > window_size {
403                window.pop_front();
404            }
405            current_offset += obj_data.len();
406            res.push(obj_data);
407        }
408        Ok(res)
409    }
410
411    /// Parallel encode with rayon, only works when window_size == 0 (no delta)
412    pub async fn parallel_encode(
413        &mut self,
414        mut entry_rx: mpsc::Receiver<Entry>,
415    ) -> Result<(), GitError> {
416        if self.window_size != 0 {
417            return Err(GitError::PackEncodeError(
418                "parallel encode only works when window_size == 0".to_string(),
419            ));
420        }
421
422        let head = encode_header(self.object_number);
423        self.send_data(head.clone()).await;
424        self.inner_hash.update(&head);
425
426        // ensure only one decode can only invoke once
427        if self.start_encoding {
428            return Err(GitError::PackEncodeError(
429                "encoding operation is already in progress".to_string(),
430            ));
431        }
432
433        let batch_size = usize::max(1000, entry_rx.max_capacity() / 10); // A temporary value, not optimized
434        tracing::info!("encode with batch size: {}", batch_size);
435        loop {
436            let mut batch_entries = Vec::with_capacity(batch_size);
437            time_it!("parallel encode: receive batch", {
438                for _ in 0..batch_size {
439                    match entry_rx.recv().await {
440                        Some(entry) => {
441                            batch_entries.push(entry);
442                            self.process_index += 1;
443                        }
444                        None => break,
445                    }
446                }
447            });
448
449            if batch_entries.is_empty() {
450                break;
451            }
452
453            // use `collect` will return result in order, refs: https://github.com/rayon-rs/rayon/issues/551#issuecomment-371657900
454            let batch_result: Vec<Vec<u8>> = time_it!("parallel encode: encode batch", {
455                batch_entries
456                    .par_iter()
457                    .map(|entry| encode_one_object(entry, None).unwrap())
458                    .collect()
459            });
460
461            time_it!("parallel encode: write batch", {
462                for obj_data in batch_result {
463                    self.write_all_and_update(&obj_data).await;
464                }
465            });
466        }
467
468        if self.process_index != self.object_number {
469            panic!(
470                "not all objects are encoded, process:{}, total:{}",
471                self.process_index, self.object_number
472            );
473        }
474
475        // hash signature
476        let hash_result = self.inner_hash.clone().finalize();
477        self.final_hash = Some(SHA1::from_bytes(&hash_result));
478        self.send_data(hash_result.to_vec()).await;
479        self.drop_sender();
480        Ok(())
481    }
482
483    /// Write data to writer and update hash & offset
484    async fn write_all_and_update(&mut self, data: &[u8]) {
485        self.inner_hash.update(data);
486        self.inner_offset += data.len();
487        self.send_data(data.to_vec()).await;
488    }
489
490    /// async version of encode, result data will be returned by JoinHandle.
491    /// It will consume PackEncoder, so you can't use it after calling this function.
492    /// when window_size = 0, it executes parallel_encode which retains stream transmission
493    /// when window_size = 0,it executes encode which uses magic sort and delta.
494    /// It seems that all other modules rely on this api
495    pub async fn encode_async(
496        mut self,
497        rx: mpsc::Receiver<Entry>,
498    ) -> Result<JoinHandle<()>, GitError> {
499        Ok(tokio::spawn(async move {
500            if self.window_size == 0 {
501                self.parallel_encode(rx).await.unwrap()
502            } else {
503                self.encode(rx).await.unwrap()
504            }
505        }))
506    }
507
508    pub async fn encode_async_with_zstdelta(
509        mut self,
510        rx: mpsc::Receiver<Entry>,
511    ) -> Result<JoinHandle<()>, GitError> {
512        Ok(tokio::spawn(async move {
513            // Do not use parallel encode with zstdelta because it make no sense.
514            self.encode_with_zstdelta(rx).await.unwrap()
515        }))
516    }
517}
518
519#[cfg(test)]
520mod tests {
521    use std::env;
522    use std::sync::Arc;
523    use std::time::Instant;
524    use std::{io::Cursor, path::PathBuf};
525    use tokio::sync::Mutex;
526
527    use crate::internal::object::blob::Blob;
528    use crate::internal::pack::utils::read_offset_encoding;
529    use crate::internal::pack::{Pack, tests::init_logger};
530    use crate::time_it;
531
532    use super::*;
533
534    fn check_format(data: &Vec<u8>) {
535        let mut p = Pack::new(
536            None,
537            Some(1024 * 1024 * 1024 * 6), // 6GB
538            Some(PathBuf::from("/tmp/.cache_temp")),
539            true,
540        );
541        let mut reader = Cursor::new(data);
542        tracing::debug!("start check format");
543        p.decode(&mut reader, |_, _| {})
544            .expect("pack file format error");
545    }
546
547    #[tokio::test]
548    async fn test_pack_encoder() {
549        async fn encode_once(window_size: usize) -> Vec<u8> {
550            let (tx, mut rx) = mpsc::channel(100);
551            let (entry_tx, entry_rx) = mpsc::channel::<Entry>(1);
552
553            // make some different objects, or decode will fail
554            let str_vec = vec!["hello, word", "hello, world.", "!", "123141251251"];
555            let encoder = PackEncoder::new(str_vec.len(), window_size, tx);
556            encoder.encode_async(entry_rx).await.unwrap();
557
558            for str in str_vec {
559                let blob = Blob::from_content(str);
560                let entry: Entry = blob.into();
561                entry_tx.send(entry).await.unwrap();
562            }
563            drop(entry_tx);
564            // assert!(encoder.get_hash().is_some());
565            let mut result = Vec::new();
566            while let Some(chunk) = rx.recv().await {
567                result.extend(chunk);
568            }
569            result
570        }
571
572        // without delta
573        let pack_without_delta = encode_once(0).await;
574        let pack_without_delta_size = pack_without_delta.len();
575        check_format(&pack_without_delta);
576
577        // with delta
578        let pack_with_delta = encode_once(4).await;
579        assert!(pack_with_delta.len() <= pack_without_delta_size);
580        check_format(&pack_with_delta);
581    }
582
583    async fn get_entries_for_test() -> Arc<Mutex<Vec<Entry>>> {
584        let source = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
585            .join("tests/data/packs/pack-f8bbb573cef7d851957caceb491c073ee8e8de41.pack");
586        
587        let mut p = Pack::new(None, None, Some(PathBuf::from("/tmp/.cache_temp")), true);
588
589        let f = std::fs::File::open(&source).unwrap();
590        tracing::info!("pack file size: {}", f.metadata().unwrap().len());
591        let mut reader = std::io::BufReader::new(f);
592        let entries = Arc::new(Mutex::new(Vec::new()));
593        let entries_clone = entries.clone();
594        p.decode(&mut reader, move |entry, _| {
595            let mut entries = entries_clone.blocking_lock();
596            entries.push(entry);
597        })
598        .unwrap();
599        assert_eq!(p.number, entries.lock().await.len());
600        tracing::info!("total entries: {}", p.number);
601        drop(p);
602
603        entries
604    }
605
606    #[tokio::test]
607    async fn test_pack_encoder_parallel_large_file() {
608        init_logger();
609
610        let start = Instant::now();
611        let entries = get_entries_for_test().await;
612        let entries_number = entries.lock().await.len();
613
614        let total_original_size: usize = entries
615            .lock()
616            .await
617            .iter()
618            .map(|entry| entry.data.len())
619            .sum();
620
621        // encode entries with parallel
622        let (tx, mut rx) = mpsc::channel(1_000_000);
623        let (entry_tx, entry_rx) = mpsc::channel::<Entry>(1_000_000);
624
625        let mut encoder = PackEncoder::new(entries_number, 0, tx);
626        tokio::spawn(async move {
627            time_it!("test parallel encode", {
628                encoder.parallel_encode(entry_rx).await.unwrap();
629            });
630        });
631
632        // spawn a task to send entries
633        tokio::spawn(async move {
634            let entries = entries.lock().await;
635            for entry in entries.iter() {
636                entry_tx.send(entry.clone()).await.unwrap();
637            }
638            drop(entry_tx);
639            tracing::info!("all entries sent");
640        });
641
642        let mut result = Vec::new();
643        while let Some(chunk) = rx.recv().await {
644            result.extend(chunk);
645        }
646
647        let pack_size = result.len();
648        let compression_rate = if total_original_size > 0 {
649            1.0 - (pack_size as f64 / total_original_size as f64)
650        } else {
651            0.0
652        };
653
654        let duration = start.elapsed();
655        tracing::info!("test executed in: {:.2?}", duration);
656        tracing::info!("new pack file size: {}", result.len());
657        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
658        // check format
659        check_format(&result);
660    }
661
662    #[tokio::test]
663    async fn test_pack_encoder_large_file() {
664        init_logger();
665        let entries = get_entries_for_test().await;
666        let entries_number = entries.lock().await.len();
667
668        let total_original_size: usize = entries
669            .lock()
670            .await
671            .iter()
672            .map(|entry| entry.data.len())
673            .sum();
674
675        let start = Instant::now();
676        // encode entries
677        let (tx, mut rx) = mpsc::channel(100_000);
678        let (entry_tx, entry_rx) = mpsc::channel::<Entry>(100_000);
679
680        let mut encoder = PackEncoder::new(entries_number, 0, tx);
681        tokio::spawn(async move {
682            time_it!("test encode no parallel", {
683                encoder.encode(entry_rx).await.unwrap();
684            });
685        });
686
687        // spawn a task to send entries
688        tokio::spawn(async move {
689            let entries = entries.lock().await;
690            for entry in entries.iter() {
691                entry_tx.send(entry.clone()).await.unwrap();
692            }
693            drop(entry_tx);
694            tracing::info!("all entries sent");
695        });
696
697        // // only receive data
698        // while (rx.recv().await).is_some() {
699        //     // do nothing
700        // }
701
702        let mut result = Vec::new();
703        while let Some(chunk) = rx.recv().await {
704            result.extend(chunk);
705        }
706
707        let pack_size = result.len();
708        let compression_rate = if total_original_size > 0 {
709            1.0 - (pack_size as f64 / total_original_size as f64)
710        } else {
711            0.0
712        };
713
714        let duration = start.elapsed();
715        tracing::info!("test executed in: {:.2?}", duration);
716        tracing::info!("new pack file size: {}", pack_size);
717        tracing::info!("original total size: {}", total_original_size);
718        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
719        tracing::info!(
720            "space saved: {} bytes",
721            total_original_size.saturating_sub(pack_size)
722        );
723    }
724
725    #[tokio::test]
726    async fn test_pack_encoder_with_zstdelta() {
727        init_logger();
728        let entries = get_entries_for_test().await;
729        let entries_number = entries.lock().await.len();
730
731        let total_original_size: usize = entries
732            .lock()
733            .await
734            .iter()
735            .map(|entry| entry.data.len())
736            .sum();
737
738        let start = Instant::now();
739        let (tx, mut rx) = mpsc::channel(100_000);
740        let (entry_tx, entry_rx) = mpsc::channel::<Entry>(100_000);
741
742        let encoder = PackEncoder::new(entries_number, 10, tx);
743        encoder.encode_async_with_zstdelta(entry_rx).await.unwrap();
744
745        // spawn a task to send entries
746        tokio::spawn(async move {
747            let entries = entries.lock().await;
748            for entry in entries.iter() {
749                entry_tx.send(entry.clone()).await.unwrap();
750            }
751            drop(entry_tx);
752            tracing::info!("all entries sent");
753        });
754
755        let mut result = Vec::new();
756        while let Some(chunk) = rx.recv().await {
757            result.extend(chunk);
758        }
759
760        let pack_size = result.len();
761        let compression_rate = if total_original_size > 0 {
762            1.0 - (pack_size as f64 / total_original_size as f64)
763        } else {
764            0.0
765        };
766
767        let duration = start.elapsed();
768        tracing::info!("test executed in: {:.2?}", duration);
769        tracing::info!("new pack file size: {}", pack_size);
770        tracing::info!("original total size: {}", total_original_size);
771        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
772        tracing::info!(
773            "space saved: {} bytes",
774            total_original_size.saturating_sub(pack_size)
775        );
776
777        // check format
778        check_format(&result);
779    }
780
781    #[test]
782    fn test_encode_offset() {
783        // let value = 11013;
784        let value = 16389;
785
786        let data = encode_offset(value);
787        println!("{data:?}");
788        let mut reader = Cursor::new(data);
789        let (result, _) = read_offset_encoding(&mut reader).unwrap();
790        println!("result: {result}");
791        assert_eq!(result, value as u64);
792    }
793
794    #[tokio::test]
795    async fn test_pack_encoder_large_file_with_delta() {
796        init_logger();
797        let entries = get_entries_for_test().await;
798        let entries_number = entries.lock().await.len();
799
800        let total_original_size: usize = entries
801            .lock()
802            .await
803            .iter()
804            .map(|entry| entry.data.len())
805            .sum();
806
807        let (tx, mut rx) = mpsc::channel(100_000);
808        let (entry_tx, entry_rx) = mpsc::channel::<Entry>(100_000);
809
810        let encoder = PackEncoder::new(entries_number, 10, tx);
811
812        let start = Instant::now(); // 开始时间
813        encoder.encode_async(entry_rx).await.unwrap();
814
815        // spawn a task to send entries
816        tokio::spawn(async move {
817            let entries = entries.lock().await;
818            for entry in entries.iter() {
819                entry_tx.send(entry.clone()).await.unwrap();
820            }
821            drop(entry_tx);
822            tracing::info!("all entries sent");
823        });
824
825        let mut result = Vec::new();
826        while let Some(chunk) = rx.recv().await {
827            result.extend(chunk);
828        }
829
830        let pack_size = result.len();
831        let compression_rate = if total_original_size > 0 {
832            1.0 - (pack_size as f64 / total_original_size as f64)
833        } else {
834            0.0
835        };
836
837        let duration = start.elapsed();
838        tracing::info!("test executed in: {:.2?}", duration);
839        tracing::info!("new pack file size: {}", pack_size);
840        tracing::info!("original total size: {}", total_original_size);
841        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
842        tracing::info!(
843            "space saved: {} bytes",
844            total_original_size.saturating_sub(pack_size)
845        );
846
847        // check format
848        check_format(&result);
849    }
850}