git_internal/internal/pack/
encode.rs

1//! Pack encoder capable of building streamed `.pack`/`.idx` pairs with optional delta compression,
2//! windowing, and asynchronous writers.
3
4use std::{
5    cmp::Ordering,
6    collections::VecDeque,
7    hash::{Hash, Hasher},
8    io::Write,
9    path::{Path, PathBuf},
10};
11
12use ahash::AHasher;
13// use libc::ungetc;
14use chrono::Utc;
15use flate2::write::ZlibEncoder;
16use natord::compare;
17use rayon::prelude::*;
18//use tokio::io::AsyncWriteExt;
19use tokio::io::AsyncWriteExt as TokioAsyncWriteExt;
20use tokio::{fs::File, sync::mpsc, task::JoinHandle};
21
22//use std::io as stdio;
23use crate::delta;
24use crate::{
25    errors::GitError,
26    hash::ObjectHash,
27    internal::{
28        metadata::{EntryMeta, MetaAttached},
29        object::types::ObjectType,
30        pack::{entry::Entry, index_entry::IndexEntry, pack_index::IdxBuilder},
31    },
32    time_it,
33    utils::HashAlgorithm,
34    zstdelta,
35};
36
37const MAX_CHAIN_LEN: usize = 50;
38const MIN_DELTA_RATE: f64 = 0.5; // minimum delta rate
39//const MAX_ZSTDELTA_CHAIN_LEN: usize = 50;
40
41/// A encoder for generating pack files with delta objects.
42pub struct PackEncoder {
43    //path: Option<PathBuf>,
44    object_number: usize,
45    process_index: usize,
46    window_size: usize,
47    // window: VecDeque<(Entry, usize)>, // entry and offset
48    pack_sender: Option<mpsc::Sender<Vec<u8>>>,
49    idx_sender: Option<mpsc::Sender<Vec<u8>>>,
50    //idx_sender: Option<mpsc::Sender<Vec<u8>>>,
51    idx_entries: Option<Vec<IndexEntry>>,
52    inner_offset: usize,       // offset of current entry
53    inner_hash: HashAlgorithm, // introduce different hash algorithm
54    final_hash: Option<ObjectHash>,
55    start_encoding: bool,
56}
57
58/// Encode entries into a pack, write `.pack`/`.idx` files to `output_dir`.
59/// - Spawns background writers to consume pack/idx channels to avoid back-pressure.
60/// - Uses `window_size` to control delta: `0` means no delta (parallel encode), otherwise enable delta window.
61/// # Arguments
62/// * `raw_entries_rx` - receiver providing entries with metadata
63/// * `object_number` - expected total object count for the pack header
64/// * `output_dir` - target directory to place the generated files
65/// * `window_size` - delta window size; `0` disables delta
66/// # Returns
67/// * `Ok(())` on success, `GitError` on failure
68pub async fn encode_and_output_to_files(
69    raw_entries_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
70    object_number: usize,
71    output_dir: PathBuf,
72    window_size: usize,
73) -> Result<(), GitError> {
74    let (pack_tx, mut pack_rx) = mpsc::channel(1024);
75    let (idx_tx, mut idx_rx) = mpsc::channel(1024);
76    let mut pack_encoder = PackEncoder::new_with_idx(object_number, window_size, pack_tx, idx_tx);
77
78    // timestamp for temp filename
79    let now = Utc::now();
80    let timestamp = now.format("%Y%m%d%H%M%S%.3f").to_string(); // 例如 20251209235959.123
81    let tmp_path = output_dir.join(format!("{}objects.pack.tmp", timestamp));
82    let mut pack_file = File::create(&tmp_path).await?;
83
84    let pack_writer = tokio::spawn(async move {
85        while let Some(chunk) = pack_rx.recv().await {
86            TokioAsyncWriteExt::write_all(&mut pack_file, &chunk).await?;
87        }
88        //pack_file.flush().await?;
89        TokioAsyncWriteExt::flush(&mut pack_file).await?;
90        Ok::<(), GitError>(())
91    });
92
93    pack_encoder.encode(raw_entries_rx).await?;
94
95    // 等待 pack 写入完成
96    let pack_write_result = pack_writer
97        .await
98        .map_err(|e| GitError::PackEncodeError(format!("pack writer task join error: {e}")))?;
99    pack_write_result?;
100
101    let final_pack_name =
102        output_dir.join(format!("pack-{}.pack", pack_encoder.final_hash.unwrap()));
103    let final_idx_name = output_dir.join(format!("pack-{}.idx", pack_encoder.final_hash.unwrap()));
104    tokio::fs::rename(tmp_path, &final_pack_name).await?;
105
106    let mut idx_file = File::create(&final_idx_name).await?;
107    let idx_writer = tokio::spawn(async move {
108        while let Some(chunk) = idx_rx.recv().await {
109            //idx_file.write_all(&chunk).await?;
110            TokioAsyncWriteExt::write_all(&mut idx_file, &chunk).await?;
111        }
112        //idx_file.flush().await?;
113        TokioAsyncWriteExt::flush(&mut idx_file).await?;
114        Ok::<(), GitError>(())
115    });
116
117    //build idx
118    pack_encoder.encode_idx_file().await?;
119
120    let idx_write_result = idx_writer
121        .await
122        .map_err(|e| GitError::PackEncodeError(format!("idx writer task join error: {e}")))?;
123    idx_write_result?;
124
125    Ok(())
126}
127
128/// Encode header of pack file (12 byte)<br>
129/// Content: 'PACK', Version(2), number of objects
130fn encode_header(object_number: usize) -> Vec<u8> {
131    let mut result: Vec<u8> = vec![
132        b'P', b'A', b'C', b'K', // The logotype of the Pack File
133        0, 0, 0, 2, // generates version 2 only.
134    ];
135    assert_ne!(object_number, 0); // guarantee self.number_of_objects!=0
136    assert!(object_number < (1 << 32));
137    //TODO: GitError:numbers of objects should < 4G ,
138    result.append((object_number as u32).to_be_bytes().to_vec().as_mut()); // to 4 bytes (network byte order aka. big-endian)
139    result
140}
141
142/// Encode offset of delta object
143fn encode_offset(mut value: usize) -> Vec<u8> {
144    assert_ne!(value, 0, "offset can't be zero");
145    let mut bytes = Vec::new();
146
147    bytes.push((value & 0x7F) as u8);
148    value >>= 7;
149    while value != 0 {
150        value -= 1;
151        let byte = (value & 0x7F) as u8 | 0x80; // set first bit one
152        value >>= 7;
153        bytes.push(byte);
154    }
155    bytes.reverse();
156    bytes
157}
158
159/// Encode one object, and update the hash
160/// @offset: offset of this object if it's a delta object. For other object, it's None
161fn encode_one_object(entry: &Entry, offset: Option<usize>) -> Result<Vec<u8>, GitError> {
162    // try encode as delta
163    let obj_data = &entry.data;
164    let obj_data_len = obj_data.len();
165    let obj_type_number = entry.obj_type.to_u8();
166
167    let mut encoded_data = Vec::new();
168
169    // **header** encoding
170    let mut header_data = vec![(0x80 | (obj_type_number << 4)) + (obj_data_len & 0x0f) as u8];
171    let mut size = obj_data_len >> 4; // 4 bit has been used in first byte
172    if size > 0 {
173        while size > 0 {
174            if size >> 7 > 0 {
175                header_data.push((0x80 | size) as u8);
176                size >>= 7;
177            } else {
178                header_data.push(size as u8);
179                break;
180            }
181        }
182    } else {
183        header_data.push(0);
184    }
185    encoded_data.extend(header_data);
186
187    // **offset** encoding
188    if entry.obj_type == ObjectType::OffsetDelta || entry.obj_type == ObjectType::OffsetZstdelta {
189        let offset_data = encode_offset(offset.unwrap());
190        encoded_data.extend(offset_data);
191    } else if entry.obj_type == ObjectType::HashDelta {
192        unreachable!("unsupported type")
193    }
194
195    // **data** encoding, need zlib compress
196    let mut inflate = ZlibEncoder::new(Vec::new(), flate2::Compression::default());
197    inflate
198        .write_all(obj_data)
199        .expect("zlib compress should never failed");
200    inflate.flush().expect("zlib flush should never failed");
201    let compressed_data = inflate.finish().expect("zlib compress should never failed");
202    // self.write_all_and_update(&compressed_data).await;
203    encoded_data.extend(compressed_data);
204    Ok(encoded_data)
205}
206
207/// Magic sort function for entries
208fn magic_sort(a: &MetaAttached<Entry, EntryMeta>, b: &MetaAttached<Entry, EntryMeta>) -> Ordering {
209    let path_a = a.meta.file_path.as_ref();
210    let path_b = b.meta.file_path.as_ref();
211
212    // 1. Handle path existence: entries with paths sort first
213    match (path_a, path_b) {
214        (Some(pa), Some(pb)) => {
215            let pa = Path::new(pa);
216            let pb = Path::new(pb);
217
218            // 1. Compare parent directory paths
219            let dir_ord = pa.parent().cmp(&pb.parent());
220            if dir_ord != Ordering::Equal {
221                return dir_ord;
222            }
223
224            // 2. Compare filenames (natural sort)
225            let name_a = pa.file_name().unwrap_or_default().to_string_lossy();
226            let name_b = pb.file_name().unwrap_or_default().to_string_lossy();
227            let name_ord = compare(&name_a, &name_b);
228            if name_ord != Ordering::Equal {
229                return name_ord;
230            }
231        }
232        (Some(_), None) => return Ordering::Less, // entries with paths sort first
233        (None, Some(_)) => return Ordering::Greater, // entries without paths sort last
234        (None, None) => {}
235    }
236
237    let ord = b.inner.data.len().cmp(&a.inner.data.len());
238    if ord != Ordering::Equal {
239        return ord;
240    }
241
242    // fallback pointer order (newest first)
243    (a as *const MetaAttached<Entry, EntryMeta>).cmp(&(b as *const MetaAttached<Entry, EntryMeta>))
244}
245
246/// Calculate hash of data
247fn calc_hash(data: &[u8]) -> u64 {
248    let mut hasher = AHasher::default();
249    data.hash(&mut hasher);
250    hasher.finish()
251}
252
253/// Cheap check if two byte slices are similar by comparing their hashes of the first 128 bytes.
254fn cheap_similar(a: &[u8], b: &[u8]) -> bool {
255    let k = a.len().min(b.len()).min(128);
256    if k == 0 {
257        return false;
258    }
259    calc_hash(&a[..k]) == calc_hash(&b[..k])
260}
261
262impl PackEncoder {
263    pub fn new(object_number: usize, window_size: usize, sender: mpsc::Sender<Vec<u8>>) -> Self {
264        PackEncoder {
265            object_number,
266            window_size,
267            process_index: 0,
268            // window: VecDeque::with_capacity(window_size),
269            pack_sender: Some(sender),
270            idx_sender: None,
271            idx_entries: None,
272            inner_offset: 12, // start  after 12 bytes pack header(signature + version + object count).
273            inner_hash: HashAlgorithm::new(), // introduce different hash algorithm
274            final_hash: None,
275            start_encoding: false,
276        }
277    }
278
279    pub fn new_with_idx(
280        object_number: usize,
281        window_size: usize,
282        pack_sender: mpsc::Sender<Vec<u8>>,
283        idx_sender: mpsc::Sender<Vec<u8>>,
284    ) -> Self {
285        PackEncoder {
286            //path: Some(path),
287            object_number,
288            window_size,
289            process_index: 0,
290            // window: VecDeque::with_capacity(window_size),
291            pack_sender: Some(pack_sender),
292            idx_sender: Some(idx_sender),
293            idx_entries: None,
294            inner_offset: 12, // start  after 12 bytes pack header(signature + version + object count).
295            inner_hash: HashAlgorithm::new(), // introduce different hash algorithm
296            final_hash: None,
297            start_encoding: false,
298        }
299    }
300
301    pub fn drop_sender(&mut self) {
302        self.pack_sender.take(); // Take the sender out, dropping it
303    }
304
305    pub async fn send_data(&mut self, data: Vec<u8>) {
306        if let Some(sender) = &self.pack_sender {
307            sender.send(data).await.unwrap();
308        }
309    }
310
311    /// Get the hash of the pack file. if the pack file is not finished, return None
312    pub fn get_hash(&self) -> Option<ObjectHash> {
313        self.final_hash
314    }
315
316    /// Encodes entries into a pack file with delta objects and outputs them through the specified writer.
317    /// # Arguments
318    /// - `rx` - A receiver channel (`mpsc::Receiver<Entry>`) from which entries to be encoded are received.
319    /// # Returns
320    /// Returns `Ok(())` if encoding is successful, or a `GitError` in case of failure.
321    /// - Returns a `GitError` if there is a failure during the encoding process.
322    /// - Returns `PackEncodeError` if an encoding operation is already in progress.
323    pub async fn encode(
324        &mut self,
325        entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
326    ) -> Result<(), GitError> {
327        //self.inner_encode(entry_rx, false).await
328        if self.window_size == 0 {
329            self.parallel_encode(entry_rx).await
330        } else {
331            self.inner_encode(entry_rx, false).await
332        }
333    }
334
335    /// Encode with zstdelta
336    pub async fn encode_with_zstdelta(
337        &mut self,
338        entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
339    ) -> Result<(), GitError> {
340        self.inner_encode(entry_rx, true).await
341    }
342
343    /// Delta selection heuristics are based on:
344    ///   https://github.com/git/git/blob/master/Documentation/technical/pack-heuristics.adoc
345    async fn inner_encode(
346        &mut self,
347        mut entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
348        enable_zstdelta: bool,
349    ) -> Result<(), GitError> {
350        let head = encode_header(self.object_number);
351        self.send_data(head.clone()).await;
352        self.inner_hash.update(&head);
353
354        // ensure only one decode can only invoke once
355        if self.start_encoding {
356            return Err(GitError::PackEncodeError(
357                "encoding operation is already in progress".to_string(),
358            ));
359        }
360
361        let mut commits: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
362        let mut trees: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
363        let mut blobs: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
364        let mut tags: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
365        while let Some(entry) = entry_rx.recv().await {
366            match entry.inner.obj_type {
367                ObjectType::Commit => {
368                    commits.push(entry);
369                }
370                ObjectType::Tree => {
371                    trees.push(entry);
372                }
373                ObjectType::Blob => {
374                    blobs.push(entry);
375                }
376                ObjectType::Tag => {
377                    tags.push(entry);
378                }
379                _ => {}
380            }
381        }
382
383        commits.sort_by(magic_sort);
384        trees.sort_by(magic_sort);
385        blobs.sort_by(magic_sort);
386        tags.sort_by(magic_sort);
387        tracing::info!(
388            "numbers :  commits: {:?} trees: {:?} blobs:{:?} tag :{:?}",
389            commits.len(),
390            trees.len(),
391            blobs.len(),
392            tags.len()
393        );
394
395        // parallel encoding vec with different object_type
396        let (commit_results, tree_results, blob_results, tag_results) = tokio::try_join!(
397            tokio::task::spawn_blocking(move || {
398                Self::try_as_offset_delta(
399                    commits
400                        .into_iter()
401                        .map(|entry_with_meta| entry_with_meta.inner)
402                        .collect(),
403                    10,
404                    enable_zstdelta,
405                )
406            }),
407            tokio::task::spawn_blocking(move || {
408                Self::try_as_offset_delta(
409                    trees
410                        .into_iter()
411                        .map(|entry_with_meta| entry_with_meta.inner)
412                        .collect(),
413                    10,
414                    enable_zstdelta,
415                )
416            }),
417            tokio::task::spawn_blocking(move || {
418                Self::try_as_offset_delta(
419                    blobs
420                        .into_iter()
421                        .map(|entry_with_meta| entry_with_meta.inner)
422                        .collect(),
423                    10,
424                    enable_zstdelta,
425                )
426            }),
427            tokio::task::spawn_blocking(move || {
428                Self::try_as_offset_delta(
429                    tags.into_iter()
430                        .map(|entry_with_meta| entry_with_meta.inner)
431                        .collect(),
432                    10,
433                    enable_zstdelta,
434                )
435            }),
436        )
437        .map_err(|e| GitError::PackEncodeError(format!("Task join error: {e}")))?;
438
439        let commit_res = commit_results?;
440        let tree_res = tree_results?;
441        let blob_res = blob_results?;
442        let tag_res = tag_results?;
443
444        let mut all_res = vec![commit_res, tree_res, blob_res, tag_res];
445
446        let mut idx_entries = Vec::new();
447        for res in &mut all_res {
448            for data in res {
449                data.1.offset = self.inner_offset as u64;
450                self.write_all_and_update(&data.0).await;
451                idx_entries.push(data.1.clone());
452            }
453        }
454
455        self.idx_entries = Some(idx_entries);
456
457        // Hash signature
458        let hash_result = self.inner_hash.clone().finalize();
459        self.final_hash = Some(ObjectHash::from_bytes(&hash_result).unwrap());
460        self.send_data(hash_result.to_vec()).await;
461
462        self.drop_sender();
463        Ok(())
464    }
465
466    /// Try to encode as delta using objects in window
467    /// delta & zstdelta have been gathered here
468    /// Refs: https://sapling-scm.com/docs/dev/internals/zstdelta/
469    /// the sliding window was moved here
470    /// # Returns
471    /// - Return (Vec<Vec<u8>) if success make delta
472    /// - Return (None) if didn't delta,
473    fn try_as_offset_delta(
474        mut bucket: Vec<Entry>,
475        window_size: usize,
476        enable_zstdelta: bool,
477    ) -> Result<Vec<(Vec<u8>, IndexEntry)>, GitError> {
478        let mut current_offset = 0usize;
479        let mut window: VecDeque<(Entry, usize)> = VecDeque::with_capacity(window_size);
480        let mut res: Vec<(Vec<u8>, IndexEntry)> = Vec::new();
481        //let mut idx_entries: Vec<IndexEntry> = Vec::new();
482
483        for entry in bucket.iter_mut() {
484            //let entry_for_window = entry.clone();
485            // 每次循环重置最佳基对象选择
486            let mut best_base: Option<&(Entry, usize)> = None;
487            let mut best_rate: f64 = 0.0;
488            let tie_epsilon: f64 = 0.15;
489
490            let candidates: Vec<_> = window
491                .par_iter()
492                .with_min_len(3)
493                .filter_map(|try_base| {
494                    if try_base.0.obj_type != entry.obj_type {
495                        return None;
496                    }
497
498                    if try_base.0.chain_len >= MAX_CHAIN_LEN {
499                        return None;
500                    }
501
502                    if try_base.0.hash == entry.hash {
503                        return None;
504                    }
505
506                    let sym_ratio = (try_base.0.data.len().min(entry.data.len()) as f64)
507                        / (try_base.0.data.len().max(entry.data.len()) as f64);
508                    if sym_ratio < 0.5 {
509                        return None;
510                    }
511
512                    if !cheap_similar(&try_base.0.data, &entry.data) {
513                        return None;
514                    }
515
516                    let rate = if (try_base.0.data.len() + entry.data.len()) / 2 > 64 {
517                        delta::heuristic_encode_rate_parallel(&try_base.0.data, &entry.data)
518                    } else {
519                        delta::encode_rate(&try_base.0.data, &entry.data)
520                        // let try_delta_obj = zstdelta::diff(&try_base.0.data, &entry.data).unwrap();
521                        // 1.0 - try_delta_obj.len() as f64 / entry.data.len() as f64
522                    };
523
524                    if rate > MIN_DELTA_RATE {
525                        Some((rate, try_base))
526                    } else {
527                        None
528                    }
529                })
530                .collect();
531
532            for (rate, try_base) in candidates {
533                match best_base {
534                    None => {
535                        best_rate = rate;
536                        //best_base_offset = current_offset - try_base.1;
537                        best_base = Some(try_base);
538                    }
539                    Some(best_base_ref) => {
540                        let is_better = if rate > best_rate + tie_epsilon {
541                            true
542                        } else if (rate - best_rate).abs() <= tie_epsilon {
543                            try_base.0.chain_len > best_base_ref.0.chain_len
544                        } else {
545                            false
546                        };
547
548                        if is_better {
549                            best_rate = rate;
550                            best_base = Some(try_base);
551                        }
552                    }
553                }
554            }
555
556            let mut entry_for_window = entry.clone();
557
558            let offset = best_base.map(|best_base| {
559                let delta = if enable_zstdelta {
560                    entry.obj_type = ObjectType::OffsetZstdelta;
561                    zstdelta::diff(&best_base.0.data, &entry.data)
562                        .map_err(|e| {
563                            GitError::DeltaObjectError(format!("zstdelta diff failed: {e}"))
564                        })
565                        .unwrap()
566                } else {
567                    entry.obj_type = ObjectType::OffsetDelta;
568                    delta::encode(&best_base.0.data, &entry.data)
569                };
570                //entry.obj_type = ObjectType::OffsetDelta;
571                entry.data = delta;
572                entry.chain_len = best_base.0.chain_len + 1;
573                current_offset - best_base.1
574            });
575
576            entry_for_window.chain_len = entry.chain_len;
577            let obj_data = encode_one_object(entry, offset)?;
578            window.push_back((entry_for_window, current_offset));
579            if window.len() > window_size {
580                window.pop_front();
581            }
582            res.push((obj_data.clone(), IndexEntry::new(entry, 0)));
583            current_offset += obj_data.len();
584        }
585        Ok(res)
586    }
587
588    /// Parallel encode with rayon, only works when window_size == 0 (no delta)
589    pub async fn parallel_encode(
590        &mut self,
591        mut entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
592    ) -> Result<(), GitError> {
593        if self.window_size != 0 {
594            return Err(GitError::PackEncodeError(
595                "parallel encode only works when window_size == 0".to_string(),
596            ));
597        }
598
599        let head = encode_header(self.object_number);
600        self.send_data(head.clone()).await;
601        self.inner_hash.update(&head);
602
603        // ensure only one decode can only invoke once
604        if self.start_encoding {
605            return Err(GitError::PackEncodeError(
606                "encoding operation is already in progress".to_string(),
607            ));
608        }
609
610        let mut idx_entries = Vec::new();
611        let batch_size = usize::max(1000, entry_rx.max_capacity() / 10); // A temporary value, not optimized
612        tracing::info!("encode with batch size: {}", batch_size);
613        loop {
614            let mut batch_entries = Vec::with_capacity(batch_size);
615            time_it!("parallel encode: receive batch", {
616                for _ in 0..batch_size {
617                    match entry_rx.recv().await {
618                        Some(entry) => {
619                            batch_entries.push(entry.inner);
620                            self.process_index += 1;
621                        }
622                        None => break,
623                    }
624                }
625            });
626
627            if batch_entries.is_empty() {
628                break;
629            }
630
631            // use `collect` will return result in order, refs: https://github.com/rayon-rs/rayon/issues/551#issuecomment-371657900
632            let batch_result: Vec<(Vec<u8>, IndexEntry)> =
633                time_it!("parallel encode: encode batch", {
634                    batch_entries
635                        .par_iter()
636                        .map(|entry| {
637                            (
638                                encode_one_object(entry, None).unwrap(),
639                                IndexEntry::new(entry, 0),
640                            )
641                        })
642                        .collect()
643                });
644
645            time_it!("parallel encode: write batch", {
646                for mut obj_data in batch_result {
647                    obj_data.1.offset = self.inner_offset as u64;
648                    self.write_all_and_update(&obj_data.0).await;
649                    idx_entries.push(obj_data.1);
650                }
651            });
652        }
653
654        tracing::debug!("parallel encode idx entries: {:?}", idx_entries.len());
655        if self.process_index != self.object_number {
656            panic!(
657                "not all objects are encoded, process:{}, total:{}",
658                self.process_index, self.object_number
659            );
660        }
661
662        // hash signature
663        let hash_result = self.inner_hash.clone().finalize();
664        self.final_hash = Some(ObjectHash::from_bytes(&hash_result).unwrap());
665        self.send_data(hash_result.to_vec()).await;
666        self.drop_sender();
667
668        self.idx_entries = Some(idx_entries);
669        Ok(())
670    }
671
672    /// Write data to writer and update hash & offset
673    async fn write_all_and_update(&mut self, data: &[u8]) {
674        self.inner_hash.update(data);
675        self.inner_offset += data.len();
676        self.send_data(data.to_vec()).await;
677    }
678
679    async fn generate_idx_file(&mut self) -> Result<(), GitError> {
680        let final_hash = self.final_hash
681            .ok_or(GitError::PackEncodeError("final_hash is missing,The pack file must be generated before the index file is produced.".into()))?;
682        let idx_entries = self.idx_entries.clone().ok_or(GitError::PackEncodeError(
683            "The pack file must be generated before the index file is produced.".into(),
684        ))?;
685        let mut idx_builder = IdxBuilder::new(
686            self.object_number,
687            self.idx_sender.clone().unwrap(),
688            final_hash,
689        );
690        idx_builder.write_idx(idx_entries).await?;
691        Ok(())
692    }
693
694    /// async version of encode, result data will be returned by JoinHandle.
695    /// It will consume PackEncoder, so you can't use it after calling this function.
696    /// when window_size = 0, it executes parallel_encode which retains stream transmission
697    /// when window_size = 0,it executes encode which uses magic sort and delta.
698    /// It seems that all other modules rely on this api
699    pub async fn encode_async(
700        mut self,
701        rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
702    ) -> Result<JoinHandle<()>, GitError> {
703        Ok(tokio::spawn(async move {
704            if self.window_size == 0 {
705                self.parallel_encode(rx).await.unwrap()
706            } else {
707                self.encode(rx).await.unwrap()
708            }
709        }))
710    }
711
712    /// async version of encode_with_zstdelta, result data will be returned by JoinHandle.
713    pub async fn encode_async_with_zstdelta(
714        mut self,
715        rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
716    ) -> Result<JoinHandle<()>, GitError> {
717        Ok(tokio::spawn(async move {
718            // Do not use parallel encode with zstdelta because it make no sense.
719            self.encode_with_zstdelta(rx).await.unwrap()
720        }))
721    }
722
723    /// Generate idx file after pack file has been generated
724    pub async fn encode_idx_file(&mut self) -> Result<(), GitError> {
725        if self.idx_sender.is_none() {
726            return Err(GitError::PackEncodeError(String::from(
727                "idx sender is none",
728            )));
729        }
730        self.generate_idx_file().await?;
731        // drop sender so downstream consumer can finish
732        self.idx_sender.take();
733        Ok(())
734    }
735}
736
737#[cfg(test)]
738mod tests {
739    use std::{env, io::Cursor, path::PathBuf, sync::Arc, time::Instant};
740
741    use tempfile::tempdir;
742    use tokio::sync::Mutex;
743
744    use super::*;
745    use crate::{
746        hash::{HashKind, ObjectHash, set_hash_kind_for_test},
747        internal::{
748            object::blob::Blob,
749            pack::{Pack, tests::init_logger, utils::read_offset_encoding},
750        },
751        time_it,
752    };
753
754    /// Check if the given data is a valid pack file format by attempting to decode it.
755    fn check_format(data: &Vec<u8>) {
756        let mut p = Pack::new(
757            None,
758            Some(1024 * 1024 * 1024 * 6), // 6GB
759            Some(PathBuf::from("/tmp/.cache_temp")),
760            true,
761        );
762        let mut reader = Cursor::new(data);
763        tracing::debug!("start check format");
764        p.decode(&mut reader, |_| {}, None::<fn(ObjectHash)>)
765            .expect("pack file format error");
766    }
767
768    #[tokio::test]
769    async fn test_pack_encoder() {
770        let _guard = set_hash_kind_for_test(HashKind::Sha1);
771        async fn encode_once(window_size: usize) -> Vec<u8> {
772            let (tx, mut rx) = mpsc::channel(100);
773            let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
774
775            // make some different objects, or decode will fail
776            let str_vec = vec!["hello, word", "hello, world.", "!", "123141251251"];
777            let encoder = PackEncoder::new(str_vec.len(), window_size, tx);
778            encoder.encode_async(entry_rx).await.unwrap();
779
780            for str in str_vec {
781                let blob = Blob::from_content(str);
782                let entry: Entry = blob.into();
783                entry_tx
784                    .send(MetaAttached {
785                        inner: entry,
786                        meta: EntryMeta::new(),
787                    })
788                    .await
789                    .unwrap();
790            }
791            drop(entry_tx);
792            // assert!(encoder.get_hash().is_some());
793            let mut result = Vec::new();
794            while let Some(chunk) = rx.recv().await {
795                result.extend(chunk);
796            }
797            result
798        }
799
800        // without delta
801        let pack_without_delta = encode_once(0).await;
802        let pack_without_delta_size = pack_without_delta.len();
803        check_format(&pack_without_delta);
804
805        // with delta
806        let pack_with_delta = encode_once(4).await;
807        assert!(pack_with_delta.len() <= pack_without_delta_size);
808        check_format(&pack_with_delta);
809    }
810    #[tokio::test]
811    async fn test_pack_encoder_sha256() {
812        let _guard = set_hash_kind_for_test(HashKind::Sha256);
813
814        async fn encode_once(window_size: usize) -> Vec<u8> {
815            let (tx, mut rx) = mpsc::channel(100);
816            let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
817
818            let str_vec = vec!["hello, word", "hello, world.", "!", "123141251251"];
819            let encoder = PackEncoder::new(str_vec.len(), window_size, tx);
820            encoder.encode_async(entry_rx).await.unwrap();
821
822            for s in str_vec {
823                let blob = Blob::from_content(s);
824                let entry: Entry = blob.into();
825                entry_tx
826                    .send(MetaAttached {
827                        inner: entry,
828                        meta: EntryMeta::new(),
829                    })
830                    .await
831                    .unwrap();
832            }
833            drop(entry_tx);
834
835            let mut result = Vec::new();
836            while let Some(chunk) = rx.recv().await {
837                result.extend(chunk);
838            }
839            result
840        }
841
842        // without delta
843        let pack_without_delta = encode_once(0).await;
844        let pack_without_delta_size = pack_without_delta.len();
845        check_format(&pack_without_delta);
846
847        // with delta
848        let pack_with_delta = encode_once(4).await;
849        assert!(pack_with_delta.len() <= pack_without_delta_size);
850        check_format(&pack_with_delta);
851    }
852
853    async fn get_entries_for_test() -> Arc<Mutex<Vec<Entry>>> {
854        let source = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
855            .join("tests/data/packs/encode-test-sha1.pack");
856
857        let mut p = Pack::new(None, None, Some(PathBuf::from("/tmp/.cache_temp")), true);
858
859        let f = std::fs::File::open(&source).unwrap();
860        tracing::info!("pack file size: {}", f.metadata().unwrap().len());
861        let mut reader = std::io::BufReader::new(f);
862        let entries = Arc::new(Mutex::new(Vec::new()));
863        let entries_clone = entries.clone();
864        p.decode(
865            &mut reader,
866            move |entry| {
867                let mut entries = entries_clone.blocking_lock();
868                entries.push(entry.inner);
869            },
870            None::<fn(ObjectHash)>,
871        )
872        .unwrap();
873        assert_eq!(p.number, entries.lock().await.len());
874        tracing::info!("total entries: {}", p.number);
875        drop(p);
876
877        entries
878    }
879    async fn get_entries_for_test_sha256() -> Arc<Mutex<Vec<Entry>>> {
880        let source = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
881            .join("tests/data/packs/encode-test-sha256.pack");
882
883        let mut p = Pack::new(None, None, Some(PathBuf::from("/tmp/.cache_temp")), true);
884
885        let f = std::fs::File::open(&source).unwrap();
886        tracing::info!("pack file size: {}", f.metadata().unwrap().len());
887        let mut reader = std::io::BufReader::new(f);
888        let entries = Arc::new(Mutex::new(Vec::new()));
889        let entries_clone = entries.clone();
890        p.decode(
891            &mut reader,
892            move |entry| {
893                let mut entries = entries_clone.blocking_lock();
894                entries.push(entry.inner);
895            },
896            None::<fn(ObjectHash)>,
897        )
898        .unwrap();
899        assert_eq!(p.number, entries.lock().await.len());
900        tracing::info!("total entries: {}", p.number);
901        drop(p);
902
903        entries
904    }
905
906    #[tokio::test]
907    async fn test_pack_encoder_parallel_large_file() {
908        let _guard = set_hash_kind_for_test(HashKind::Sha1);
909        init_logger();
910
911        let start = Instant::now();
912        let entries = get_entries_for_test().await;
913        let entries_number = entries.lock().await.len();
914
915        let total_original_size: usize = entries
916            .lock()
917            .await
918            .iter()
919            .map(|entry| entry.data.len())
920            .sum();
921
922        // encode entries with parallel
923        let (tx, mut rx) = mpsc::channel(1_000_000);
924        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1_000_000);
925
926        let mut encoder = PackEncoder::new(entries_number, 0, tx);
927        tokio::spawn(async move {
928            time_it!("test parallel encode", {
929                encoder.parallel_encode(entry_rx).await.unwrap();
930            });
931        });
932
933        // spawn a task to send entries
934        tokio::spawn(async move {
935            let entries = entries.lock().await;
936            for entry in entries.iter() {
937                entry_tx
938                    .send(MetaAttached {
939                        inner: entry.clone(),
940                        meta: EntryMeta::new(),
941                    })
942                    .await
943                    .unwrap();
944            }
945            drop(entry_tx);
946            tracing::info!("all entries sent");
947        });
948
949        let mut result = Vec::new();
950        while let Some(chunk) = rx.recv().await {
951            result.extend(chunk);
952        }
953
954        let pack_size = result.len();
955        let compression_rate = if total_original_size > 0 {
956            1.0 - (pack_size as f64 / total_original_size as f64)
957        } else {
958            0.0
959        };
960
961        let duration = start.elapsed();
962        tracing::info!("test executed in: {:.2?}", duration);
963        tracing::info!("new pack file size: {}", result.len());
964        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
965        // check format
966        check_format(&result);
967    }
968    #[tokio::test]
969    async fn test_pack_encoder_parallel_large_file_sha256() {
970        let _guard = set_hash_kind_for_test(HashKind::Sha256);
971        init_logger();
972
973        let start = Instant::now();
974        // use sha256 pack file for testing
975        let entries = get_entries_for_test_sha256().await;
976        let entries_number = entries.lock().await.len();
977
978        let total_original_size: usize = entries
979            .lock()
980            .await
981            .iter()
982            .map(|entry| entry.data.len())
983            .sum();
984
985        let (tx, mut rx) = mpsc::channel(1_000_000);
986        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1_000_000);
987
988        let mut encoder = PackEncoder::new(entries_number, 0, tx);
989        tokio::spawn(async move {
990            time_it!("test parallel encode sha256", {
991                encoder.parallel_encode(entry_rx).await.unwrap();
992            });
993        });
994
995        tokio::spawn(async move {
996            let entries = entries.lock().await;
997            for entry in entries.iter() {
998                entry_tx
999                    .send(MetaAttached {
1000                        inner: entry.clone(),
1001                        meta: EntryMeta::new(),
1002                    })
1003                    .await
1004                    .unwrap();
1005            }
1006            drop(entry_tx);
1007            tracing::info!("all entries sent");
1008        });
1009
1010        let mut result = Vec::new();
1011        while let Some(chunk) = rx.recv().await {
1012            result.extend(chunk);
1013        }
1014
1015        let pack_size = result.len();
1016        let compression_rate = if total_original_size > 0 {
1017            1.0 - (pack_size as f64 / total_original_size as f64)
1018        } else {
1019            0.0
1020        };
1021
1022        let duration = start.elapsed();
1023        tracing::info!("sha256 test executed in: {:.2?}", duration);
1024        tracing::info!("new pack file size: {}", result.len());
1025        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1026        check_format(&result);
1027    }
1028
1029    #[tokio::test]
1030    async fn test_pack_encoder_large_file() {
1031        let _guard = set_hash_kind_for_test(HashKind::Sha1);
1032        init_logger();
1033        let entries = get_entries_for_test().await;
1034        let entries_number = entries.lock().await.len();
1035
1036        let total_original_size: usize = entries
1037            .lock()
1038            .await
1039            .iter()
1040            .map(|entry| entry.data.len())
1041            .sum();
1042
1043        let start = Instant::now();
1044        // encode entries
1045        let (tx, mut rx) = mpsc::channel(100_000);
1046        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1047
1048        let mut encoder = PackEncoder::new(entries_number, 0, tx);
1049        tokio::spawn(async move {
1050            time_it!("test encode no parallel", {
1051                encoder.encode(entry_rx).await.unwrap();
1052            });
1053        });
1054
1055        // spawn a task to send entries
1056        tokio::spawn(async move {
1057            let entries = entries.lock().await;
1058            for entry in entries.iter() {
1059                entry_tx
1060                    .send(MetaAttached {
1061                        inner: entry.clone(),
1062                        meta: EntryMeta::new(),
1063                    })
1064                    .await
1065                    .unwrap();
1066            }
1067            drop(entry_tx);
1068            tracing::info!("all entries sent");
1069        });
1070
1071        // // only receive data
1072        // while (rx.recv().await).is_some() {
1073        //     // do nothing
1074        // }
1075
1076        let mut result = Vec::new();
1077        while let Some(chunk) = rx.recv().await {
1078            result.extend(chunk);
1079        }
1080
1081        let pack_size = result.len();
1082        let compression_rate = if total_original_size > 0 {
1083            1.0 - (pack_size as f64 / total_original_size as f64)
1084        } else {
1085            0.0
1086        };
1087
1088        let duration = start.elapsed();
1089        tracing::info!("test executed in: {:.2?}", duration);
1090        tracing::info!("new pack file size: {}", pack_size);
1091        tracing::info!("original total size: {}", total_original_size);
1092        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1093        tracing::info!(
1094            "space saved: {} bytes",
1095            total_original_size.saturating_sub(pack_size)
1096        );
1097    }
1098    #[tokio::test]
1099    async fn test_pack_encoder_large_file_sha256() {
1100        let _guard = set_hash_kind_for_test(HashKind::Sha256);
1101        init_logger();
1102        let entries = get_entries_for_test_sha256().await;
1103        let entries_number = entries.lock().await.len();
1104
1105        let total_original_size: usize = entries
1106            .lock()
1107            .await
1108            .iter()
1109            .map(|entry| entry.data.len())
1110            .sum();
1111
1112        let start = Instant::now();
1113        // encode entries
1114        let (tx, mut rx) = mpsc::channel(100_000);
1115        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1116
1117        let mut encoder = PackEncoder::new(entries_number, 0, tx);
1118        tokio::spawn(async move {
1119            time_it!("test encode no parallel sha256", {
1120                encoder.encode(entry_rx).await.unwrap();
1121            });
1122        });
1123
1124        // spawn a task to send entries
1125        tokio::spawn(async move {
1126            let entries = entries.lock().await;
1127            for entry in entries.iter() {
1128                entry_tx
1129                    .send(MetaAttached {
1130                        inner: entry.clone(),
1131                        meta: EntryMeta::new(),
1132                    })
1133                    .await
1134                    .unwrap();
1135            }
1136            drop(entry_tx);
1137            tracing::info!("all entries sent");
1138        });
1139
1140        // // only receive data
1141        // while (rx.recv().await).is_some() {
1142        //     // do nothing
1143        // }
1144
1145        let mut result = Vec::new();
1146        while let Some(chunk) = rx.recv().await {
1147            result.extend(chunk);
1148        }
1149
1150        let pack_size = result.len();
1151        let compression_rate = if total_original_size > 0 {
1152            1.0 - (pack_size as f64 / total_original_size as f64)
1153        } else {
1154            0.0
1155        };
1156
1157        let duration = start.elapsed();
1158        tracing::info!("test executed in: {:.2?}", duration);
1159        tracing::info!("new pack file size: {}", pack_size);
1160        tracing::info!("original total size: {}", total_original_size);
1161        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1162        tracing::info!(
1163            "space saved: {} bytes",
1164            total_original_size.saturating_sub(pack_size)
1165        );
1166    }
1167
1168    #[tokio::test]
1169    async fn test_pack_encoder_with_zstdelta() {
1170        let _guard = set_hash_kind_for_test(HashKind::Sha1);
1171        init_logger();
1172        let entries = get_entries_for_test().await;
1173        let entries_number = entries.lock().await.len();
1174
1175        let total_original_size: usize = entries
1176            .lock()
1177            .await
1178            .iter()
1179            .map(|entry| entry.data.len())
1180            .sum();
1181
1182        let start = Instant::now();
1183        let (tx, mut rx) = mpsc::channel(100_000);
1184        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1185
1186        let encoder = PackEncoder::new(entries_number, 10, tx);
1187        encoder.encode_async_with_zstdelta(entry_rx).await.unwrap();
1188
1189        // spawn a task to send entries
1190        tokio::spawn(async move {
1191            let entries = entries.lock().await;
1192            for entry in entries.iter() {
1193                entry_tx
1194                    .send(MetaAttached {
1195                        inner: entry.clone(),
1196                        meta: EntryMeta::new(),
1197                    })
1198                    .await
1199                    .unwrap();
1200            }
1201            drop(entry_tx);
1202            tracing::info!("all entries sent");
1203        });
1204
1205        let mut result = Vec::new();
1206        while let Some(chunk) = rx.recv().await {
1207            result.extend(chunk);
1208        }
1209
1210        let pack_size = result.len();
1211        let compression_rate = if total_original_size > 0 {
1212            1.0 - (pack_size as f64 / total_original_size as f64)
1213        } else {
1214            0.0
1215        };
1216
1217        let duration = start.elapsed();
1218        tracing::info!("test executed in: {:.2?}", duration);
1219        tracing::info!("new pack file size: {}", pack_size);
1220        tracing::info!("original total size: {}", total_original_size);
1221        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1222        tracing::info!(
1223            "space saved: {} bytes",
1224            total_original_size.saturating_sub(pack_size)
1225        );
1226
1227        // check format
1228        check_format(&result);
1229    }
1230    #[tokio::test]
1231    async fn test_pack_encoder_with_zstdelta_sha256() {
1232        let _guard = set_hash_kind_for_test(HashKind::Sha256);
1233        init_logger();
1234        let entries = get_entries_for_test_sha256().await;
1235        let entries_number = entries.lock().await.len();
1236
1237        let total_original_size: usize = entries
1238            .lock()
1239            .await
1240            .iter()
1241            .map(|entry| entry.data.len())
1242            .sum();
1243
1244        let start = Instant::now();
1245        let (tx, mut rx) = mpsc::channel(100_000);
1246        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1247
1248        let encoder = PackEncoder::new(entries_number, 10, tx);
1249        encoder.encode_async_with_zstdelta(entry_rx).await.unwrap();
1250
1251        // spawn a task to send entries
1252        tokio::spawn(async move {
1253            let entries = entries.lock().await;
1254            for entry in entries.iter() {
1255                entry_tx
1256                    .send(MetaAttached {
1257                        inner: entry.clone(),
1258                        meta: EntryMeta::new(),
1259                    })
1260                    .await
1261                    .unwrap();
1262            }
1263            drop(entry_tx);
1264            tracing::info!("all entries sent");
1265        });
1266
1267        let mut result = Vec::new();
1268        while let Some(chunk) = rx.recv().await {
1269            result.extend(chunk);
1270        }
1271
1272        let pack_size = result.len();
1273        let compression_rate = if total_original_size > 0 {
1274            1.0 - (pack_size as f64 / total_original_size as f64)
1275        } else {
1276            0.0
1277        };
1278
1279        let duration = start.elapsed();
1280        tracing::info!("test executed in: {:.2?}", duration);
1281        tracing::info!("new pack file size: {}", pack_size);
1282        tracing::info!("original total size: {}", total_original_size);
1283        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1284        tracing::info!(
1285            "space saved: {} bytes",
1286            total_original_size.saturating_sub(pack_size)
1287        );
1288
1289        // check format
1290        check_format(&result);
1291    }
1292
1293    #[test]
1294    fn test_encode_offset() {
1295        // let value = 11013;
1296        let value = 16389;
1297
1298        let data = encode_offset(value);
1299        println!("{data:?}");
1300        let mut reader = Cursor::new(data);
1301        let (result, _) = read_offset_encoding(&mut reader).unwrap();
1302        println!("result: {result}");
1303        assert_eq!(result, value as u64);
1304    }
1305
1306    #[tokio::test]
1307    async fn test_pack_encoder_large_file_with_delta() {
1308        let _guard = set_hash_kind_for_test(HashKind::Sha1);
1309        init_logger();
1310        let entries = get_entries_for_test().await;
1311        let entries_number = entries.lock().await.len();
1312
1313        let total_original_size: usize = entries
1314            .lock()
1315            .await
1316            .iter()
1317            .map(|entry| entry.data.len())
1318            .sum();
1319
1320        let (tx, mut rx) = mpsc::channel(100_000);
1321        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1322
1323        let encoder = PackEncoder::new(entries_number, 10, tx);
1324
1325        let start = Instant::now(); // 开始时间
1326        encoder.encode_async(entry_rx).await.unwrap();
1327
1328        // spawn a task to send entries
1329        tokio::spawn(async move {
1330            let entries = entries.lock().await;
1331            for entry in entries.iter() {
1332                entry_tx
1333                    .send(MetaAttached {
1334                        inner: entry.clone(),
1335                        meta: EntryMeta::new(),
1336                    })
1337                    .await
1338                    .unwrap();
1339            }
1340            drop(entry_tx);
1341            tracing::info!("all entries sent");
1342        });
1343
1344        let mut result = Vec::new();
1345        while let Some(chunk) = rx.recv().await {
1346            result.extend(chunk);
1347        }
1348
1349        let pack_size = result.len();
1350        let compression_rate = if total_original_size > 0 {
1351            1.0 - (pack_size as f64 / total_original_size as f64)
1352        } else {
1353            0.0
1354        };
1355
1356        let duration = start.elapsed();
1357        tracing::info!("test executed in: {:.2?}", duration);
1358        tracing::info!("new pack file size: {}", pack_size);
1359        tracing::info!("original total size: {}", total_original_size);
1360        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1361        tracing::info!(
1362            "space saved: {} bytes",
1363            total_original_size.saturating_sub(pack_size)
1364        );
1365
1366        // check format
1367        check_format(&result);
1368    }
1369    #[tokio::test]
1370    async fn test_pack_encoder_large_file_with_delta_sha256() {
1371        let _guard = set_hash_kind_for_test(HashKind::Sha256);
1372        init_logger();
1373        let entries = get_entries_for_test_sha256().await;
1374        let entries_number = entries.lock().await.len();
1375
1376        let total_original_size: usize = entries
1377            .lock()
1378            .await
1379            .iter()
1380            .map(|entry| entry.data.len())
1381            .sum();
1382
1383        let (tx, mut rx) = mpsc::channel(100_000);
1384        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1385
1386        let encoder = PackEncoder::new(entries_number, 10, tx);
1387
1388        let start = Instant::now(); // 开始时间
1389        encoder.encode_async(entry_rx).await.unwrap();
1390
1391        // spawn a task to send entries
1392        tokio::spawn(async move {
1393            let entries = entries.lock().await;
1394            for entry in entries.iter() {
1395                entry_tx
1396                    .send(MetaAttached {
1397                        inner: entry.clone(),
1398                        meta: EntryMeta::new(),
1399                    })
1400                    .await
1401                    .unwrap();
1402            }
1403            drop(entry_tx);
1404            tracing::info!("all entries sent");
1405        });
1406
1407        let mut result = Vec::new();
1408        while let Some(chunk) = rx.recv().await {
1409            result.extend(chunk);
1410        }
1411
1412        let pack_size = result.len();
1413        let compression_rate = if total_original_size > 0 {
1414            1.0 - (pack_size as f64 / total_original_size as f64)
1415        } else {
1416            0.0
1417        };
1418
1419        let duration = start.elapsed();
1420        tracing::info!("test executed in: {:.2?}", duration);
1421        tracing::info!("new pack file size: {}", pack_size);
1422        tracing::info!("original total size: {}", total_original_size);
1423        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1424        tracing::info!(
1425            "space saved: {} bytes",
1426            total_original_size.saturating_sub(pack_size)
1427        );
1428
1429        // check format
1430        check_format(&result);
1431    }
1432
1433    #[tokio::test]
1434    async fn test_pack_encoder_output_to_files() {
1435        let _guard = set_hash_kind_for_test(HashKind::Sha1);
1436        init_logger();
1437        let entries = get_entries_for_test().await;
1438        let entries_number = entries.lock().await.len();
1439
1440        let total_original_size: usize = entries
1441            .lock()
1442            .await
1443            .iter()
1444            .map(|entry| entry.data.len())
1445            .sum();
1446
1447        let start = Instant::now();
1448
1449        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1450        // 自动创建临时目录,生命周期结束自动删除
1451        let dir = tempdir().unwrap();
1452        let path = dir.path();
1453
1454        // spawn a task to send entries
1455        tokio::spawn(async move {
1456            let entries = entries.lock().await;
1457            for entry in entries.iter() {
1458                entry_tx
1459                    .send(MetaAttached {
1460                        inner: entry.clone(),
1461                        meta: EntryMeta::new(),
1462                    })
1463                    .await
1464                    .unwrap();
1465            }
1466            drop(entry_tx);
1467            tracing::info!("all entries sent");
1468        });
1469
1470        encode_and_output_to_files(entry_rx, entries_number, path.to_path_buf(), 0)
1471            .await
1472            .unwrap();
1473
1474        // 验证临时目录下生成的 pack/idx 文件
1475        let mut pack_file = None;
1476        let mut idx_file = None;
1477        for entry in std::fs::read_dir(path).unwrap() {
1478            let entry = entry.unwrap();
1479            let file_name = entry.file_name();
1480            tracing::info!("file name: {:?}", file_name);
1481            let file_name = file_name.to_string_lossy();
1482            if file_name.ends_with(".pack") {
1483                pack_file = Some(entry.path());
1484            } else if file_name.ends_with(".idx") {
1485                idx_file = Some(entry.path());
1486            }
1487        }
1488        let pack_file = pack_file.expect("pack file not generated");
1489        let idx_file = idx_file.expect("idx file not generated");
1490        assert!(
1491            pack_file.metadata().unwrap().len() > 0,
1492            "pack file is empty"
1493        );
1494        assert!(idx_file.metadata().unwrap().len() > 0, "idx file is empty");
1495
1496        let duration = start.elapsed();
1497        tracing::info!("test executed in: {:.2?}", duration);
1498        tracing::info!("original total size: {}", total_original_size);
1499    }
1500
1501    #[tokio::test]
1502    async fn test_pack_encoder_output_to_files_with_delta() {
1503        let _guard = set_hash_kind_for_test(HashKind::Sha1);
1504        init_logger();
1505        let entries = get_entries_for_test().await;
1506        let entries_number = entries.lock().await.len();
1507
1508        let total_original_size: usize = entries
1509            .lock()
1510            .await
1511            .iter()
1512            .map(|entry| entry.data.len())
1513            .sum();
1514
1515        let start = Instant::now();
1516
1517        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1518        // 自动创建临时目录,生命周期结束自动删除
1519        let dir = tempdir().unwrap();
1520        let path = dir.path();
1521
1522        // spawn a task to send entries
1523        tokio::spawn(async move {
1524            let entries = entries.lock().await;
1525            for entry in entries.iter() {
1526                entry_tx
1527                    .send(MetaAttached {
1528                        inner: entry.clone(),
1529                        meta: EntryMeta::new(),
1530                    })
1531                    .await
1532                    .unwrap();
1533            }
1534            drop(entry_tx);
1535            tracing::info!("all entries sent");
1536        });
1537
1538        encode_and_output_to_files(entry_rx, entries_number, path.to_path_buf(), 10)
1539            .await
1540            .unwrap();
1541
1542        // 验证临时目录下生成的 pack/idx 文件
1543        let mut pack_file = None;
1544        let mut idx_file = None;
1545        for entry in std::fs::read_dir(path).unwrap() {
1546            let entry = entry.unwrap();
1547            let file_name = entry.file_name();
1548            tracing::info!("file name: {:?}", file_name);
1549            let file_name = file_name.to_string_lossy();
1550            if file_name.ends_with(".pack") {
1551                pack_file = Some(entry.path());
1552            } else if file_name.ends_with(".idx") {
1553                idx_file = Some(entry.path());
1554            }
1555        }
1556        let pack_file = pack_file.expect("pack file not generated");
1557        let idx_file = idx_file.expect("idx file not generated");
1558        assert!(
1559            pack_file.metadata().unwrap().len() > 0,
1560            "pack file is empty"
1561        );
1562        assert!(idx_file.metadata().unwrap().len() > 0, "idx file is empty");
1563
1564        let duration = start.elapsed();
1565        tracing::info!("test executed in: {:.2?}", duration);
1566        tracing::info!("original total size: {}", total_original_size);
1567    }
1568}