Skip to main content

git_internal/internal/pack/
encode.rs

1//! Pack encoder capable of building streamed `.pack`/`.idx` pairs with optional delta compression,
2//! windowing, and asynchronous writers.
3
4use std::{
5    cmp::Ordering,
6    collections::VecDeque,
7    hash::{Hash, Hasher},
8    io::Write,
9    path::{Path, PathBuf},
10};
11
12use ahash::AHasher;
13// use libc::ungetc;
14use chrono::Utc;
15use flate2::write::ZlibEncoder;
16use natord::compare;
17use rayon::prelude::*;
18//use tokio::io::AsyncWriteExt;
19use tokio::io::AsyncWriteExt as TokioAsyncWriteExt;
20use tokio::{fs::File, sync::mpsc, task::JoinHandle};
21
22//use std::io as stdio;
23use crate::delta;
24use crate::{
25    errors::GitError,
26    hash::ObjectHash,
27    internal::{
28        metadata::{EntryMeta, MetaAttached},
29        object::types::ObjectType,
30        pack::{entry::Entry, index_entry::IndexEntry, pack_index::IdxBuilder},
31    },
32    time_it,
33    utils::HashAlgorithm,
34    zstdelta,
35};
36
37const MAX_CHAIN_LEN: usize = 50;
38const MIN_DELTA_RATE: f64 = 0.5; // minimum delta rate
39//const MAX_ZSTDELTA_CHAIN_LEN: usize = 50;
40
41/// A encoder for generating pack files with delta objects.
42pub struct PackEncoder {
43    //path: Option<PathBuf>,
44    object_number: usize,
45    process_index: usize,
46    window_size: usize,
47    // window: VecDeque<(Entry, usize)>, // entry and offset
48    pack_sender: Option<mpsc::Sender<Vec<u8>>>,
49    idx_sender: Option<mpsc::Sender<Vec<u8>>>,
50    //idx_sender: Option<mpsc::Sender<Vec<u8>>>,
51    idx_entries: Option<Vec<IndexEntry>>,
52    inner_offset: usize,       // offset of current entry
53    inner_hash: HashAlgorithm, // introduce different hash algorithm
54    final_hash: Option<ObjectHash>,
55    start_encoding: bool,
56}
57
58/// Encode entries into a pack, write `.pack`/`.idx` files to `output_dir`.
59/// - Spawns background writers to consume pack/idx channels to avoid back-pressure.
60/// - Uses `window_size` to control delta: `0` means no delta (parallel encode), otherwise enable delta window.
61/// # Arguments
62/// * `raw_entries_rx` - receiver providing entries with metadata
63/// * `object_number` - expected total object count for the pack header
64/// * `output_dir` - target directory to place the generated files
65/// * `window_size` - delta window size; `0` disables delta
66/// # Returns
67/// * `Ok(())` on success, `GitError` on failure
68pub async fn encode_and_output_to_files(
69    raw_entries_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
70    object_number: usize,
71    output_dir: PathBuf,
72    window_size: usize,
73) -> Result<(), GitError> {
74    let (pack_tx, mut pack_rx) = mpsc::channel(1024);
75    let (idx_tx, mut idx_rx) = mpsc::channel(1024);
76    let mut pack_encoder = PackEncoder::new_with_idx(object_number, window_size, pack_tx, idx_tx);
77
78    // timestamp for temp filename
79    let now = Utc::now();
80    let timestamp = now.format("%Y%m%d%H%M%S%.3f").to_string(); // 例如 20251209235959.123
81    let tmp_path = output_dir.join(format!("{}objects.pack.tmp", timestamp));
82    let mut pack_file = File::create(&tmp_path).await?;
83
84    let pack_writer = tokio::spawn(async move {
85        while let Some(chunk) = pack_rx.recv().await {
86            TokioAsyncWriteExt::write_all(&mut pack_file, &chunk).await?;
87        }
88        //pack_file.flush().await?;
89        TokioAsyncWriteExt::flush(&mut pack_file).await?;
90        Ok::<(), GitError>(())
91    });
92
93    pack_encoder.encode(raw_entries_rx).await?;
94
95    // 等待 pack 写入完成
96    let pack_write_result = pack_writer
97        .await
98        .map_err(|e| GitError::PackEncodeError(format!("pack writer task join error: {e}")))?;
99    pack_write_result?;
100
101    let final_pack_name =
102        output_dir.join(format!("pack-{}.pack", pack_encoder.final_hash.unwrap()));
103    let final_idx_name = output_dir.join(format!("pack-{}.idx", pack_encoder.final_hash.unwrap()));
104    tokio::fs::rename(tmp_path, &final_pack_name).await?;
105
106    let mut idx_file = File::create(&final_idx_name).await?;
107    let idx_writer = tokio::spawn(async move {
108        while let Some(chunk) = idx_rx.recv().await {
109            //idx_file.write_all(&chunk).await?;
110            TokioAsyncWriteExt::write_all(&mut idx_file, &chunk).await?;
111        }
112        //idx_file.flush().await?;
113        TokioAsyncWriteExt::flush(&mut idx_file).await?;
114        Ok::<(), GitError>(())
115    });
116
117    //build idx
118    pack_encoder.encode_idx_file().await?;
119
120    let idx_write_result = idx_writer
121        .await
122        .map_err(|e| GitError::PackEncodeError(format!("idx writer task join error: {e}")))?;
123    idx_write_result?;
124
125    Ok(())
126}
127
128/// Encode header of pack file (12 byte)<br>
129/// Content: 'PACK', Version(2), number of objects
130fn encode_header(object_number: usize) -> Vec<u8> {
131    let mut result: Vec<u8> = vec![
132        b'P', b'A', b'C', b'K', // The logotype of the Pack File
133        0, 0, 0, 2, // generates version 2 only.
134    ];
135    assert_ne!(object_number, 0); // guarantee self.number_of_objects!=0
136    assert!(object_number <= u32::MAX as usize);
137    //TODO: GitError:numbers of objects should < 4G ,
138    result.append((object_number as u32).to_be_bytes().to_vec().as_mut()); // to 4 bytes (network byte order aka. big-endian)
139    result
140}
141
142/// Encode offset of delta object
143fn encode_offset(mut value: usize) -> Vec<u8> {
144    assert_ne!(value, 0, "offset can't be zero");
145    let mut bytes = Vec::new();
146
147    bytes.push((value & 0x7F) as u8);
148    value >>= 7;
149    while value != 0 {
150        value -= 1;
151        let byte = (value & 0x7F) as u8 | 0x80; // set first bit one
152        value >>= 7;
153        bytes.push(byte);
154    }
155    bytes.reverse();
156    bytes
157}
158
159/// Encode one object, and update the hash
160/// @offset: offset of this object if it's a delta object. For other object, it's None
161fn encode_one_object(entry: &Entry, offset: Option<usize>) -> Result<Vec<u8>, GitError> {
162    // try encode as delta
163    let obj_data = &entry.data;
164    let obj_data_len = obj_data.len();
165    let obj_type_number = entry.obj_type.to_pack_type_u8()?;
166
167    let mut encoded_data = Vec::new();
168
169    // **header** encoding
170    let mut header_data = vec![(0x80 | (obj_type_number << 4)) + (obj_data_len & 0x0f) as u8];
171    let mut size = obj_data_len >> 4; // 4 bit has been used in first byte
172    if size > 0 {
173        while size > 0 {
174            if size >> 7 > 0 {
175                header_data.push((0x80 | size) as u8);
176                size >>= 7;
177            } else {
178                header_data.push(size as u8);
179                break;
180            }
181        }
182    } else {
183        header_data.push(0);
184    }
185    encoded_data.extend(header_data);
186
187    // **offset** encoding
188    if entry.obj_type == ObjectType::OffsetDelta || entry.obj_type == ObjectType::OffsetZstdelta {
189        let offset_data = encode_offset(offset.unwrap());
190        encoded_data.extend(offset_data);
191    } else if entry.obj_type == ObjectType::HashDelta {
192        unreachable!("unsupported type")
193    }
194
195    // **data** encoding, need zlib compress
196    let mut inflate = ZlibEncoder::new(Vec::new(), flate2::Compression::default());
197    inflate
198        .write_all(obj_data)
199        .expect("zlib compress should never failed");
200    inflate.flush().expect("zlib flush should never failed");
201    let compressed_data = inflate.finish().expect("zlib compress should never failed");
202    // self.write_all_and_update(&compressed_data).await;
203    encoded_data.extend(compressed_data);
204    Ok(encoded_data)
205}
206
207/// Magic sort function for entries
208fn magic_sort(a: &MetaAttached<Entry, EntryMeta>, b: &MetaAttached<Entry, EntryMeta>) -> Ordering {
209    let path_a = a.meta.file_path.as_ref();
210    let path_b = b.meta.file_path.as_ref();
211
212    // 1. Handle path existence: entries with paths sort first
213    match (path_a, path_b) {
214        (Some(pa), Some(pb)) => {
215            let pa = Path::new(pa);
216            let pb = Path::new(pb);
217
218            // 1. Compare parent directory paths
219            let dir_ord = pa.parent().cmp(&pb.parent());
220            if dir_ord != Ordering::Equal {
221                return dir_ord;
222            }
223
224            // 2. Compare filenames (natural sort)
225            let name_a = pa.file_name().unwrap_or_default().to_string_lossy();
226            let name_b = pb.file_name().unwrap_or_default().to_string_lossy();
227            let name_ord = compare(&name_a, &name_b);
228            if name_ord != Ordering::Equal {
229                return name_ord;
230            }
231        }
232        (Some(_), None) => return Ordering::Less, // entries with paths sort first
233        (None, Some(_)) => return Ordering::Greater, // entries without paths sort last
234        (None, None) => {}
235    }
236
237    let ord = b.inner.data.len().cmp(&a.inner.data.len());
238    if ord != Ordering::Equal {
239        return ord;
240    }
241
242    // fallback pointer order (newest first)
243    (a as *const MetaAttached<Entry, EntryMeta>).cmp(&(b as *const MetaAttached<Entry, EntryMeta>))
244}
245
246/// Calculate hash of data
247fn calc_hash(data: &[u8]) -> u64 {
248    let mut hasher = AHasher::default();
249    data.hash(&mut hasher);
250    hasher.finish()
251}
252
253/// Cheap check if two byte slices are similar by comparing their hashes of the first 128 bytes.
254fn cheap_similar(a: &[u8], b: &[u8]) -> bool {
255    let k = a.len().min(b.len()).min(128);
256    if k == 0 {
257        return false;
258    }
259    calc_hash(&a[..k]) == calc_hash(&b[..k])
260}
261
262impl PackEncoder {
263    pub fn new(object_number: usize, window_size: usize, sender: mpsc::Sender<Vec<u8>>) -> Self {
264        PackEncoder {
265            object_number,
266            window_size,
267            process_index: 0,
268            // window: VecDeque::with_capacity(window_size),
269            pack_sender: Some(sender),
270            idx_sender: None,
271            idx_entries: None,
272            inner_offset: 12, // start  after 12 bytes pack header(signature + version + object count).
273            inner_hash: HashAlgorithm::new(), // introduce different hash algorithm
274            final_hash: None,
275            start_encoding: false,
276        }
277    }
278
279    pub fn new_with_idx(
280        object_number: usize,
281        window_size: usize,
282        pack_sender: mpsc::Sender<Vec<u8>>,
283        idx_sender: mpsc::Sender<Vec<u8>>,
284    ) -> Self {
285        PackEncoder {
286            //path: Some(path),
287            object_number,
288            window_size,
289            process_index: 0,
290            // window: VecDeque::with_capacity(window_size),
291            pack_sender: Some(pack_sender),
292            idx_sender: Some(idx_sender),
293            idx_entries: None,
294            inner_offset: 12, // start  after 12 bytes pack header(signature + version + object count).
295            inner_hash: HashAlgorithm::new(), // introduce different hash algorithm
296            final_hash: None,
297            start_encoding: false,
298        }
299    }
300
301    pub fn drop_sender(&mut self) {
302        self.pack_sender.take(); // Take the sender out, dropping it
303    }
304
305    pub async fn send_data(&mut self, data: Vec<u8>) {
306        if let Some(sender) = &self.pack_sender {
307            sender.send(data).await.unwrap();
308        }
309    }
310
311    /// Get the hash of the pack file. if the pack file is not finished, return None
312    pub fn get_hash(&self) -> Option<ObjectHash> {
313        self.final_hash
314    }
315
316    /// Encodes entries into a pack file with delta objects and outputs them through the specified writer.
317    /// # Arguments
318    /// - `rx` - A receiver channel (`mpsc::Receiver<Entry>`) from which entries to be encoded are received.
319    /// # Returns
320    /// Returns `Ok(())` if encoding is successful, or a `GitError` in case of failure.
321    /// - Returns a `GitError` if there is a failure during the encoding process.
322    /// - Returns `PackEncodeError` if an encoding operation is already in progress.
323    pub async fn encode(
324        &mut self,
325        entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
326    ) -> Result<(), GitError> {
327        //self.inner_encode(entry_rx, false).await
328        if self.window_size == 0 {
329            self.parallel_encode(entry_rx).await
330        } else {
331            self.inner_encode(entry_rx, false).await
332        }
333    }
334
335    /// Encode with zstdelta
336    pub async fn encode_with_zstdelta(
337        &mut self,
338        entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
339    ) -> Result<(), GitError> {
340        self.inner_encode(entry_rx, true).await
341    }
342
343    /// Delta selection heuristics are based on:
344    ///   https://github.com/git/git/blob/master/Documentation/technical/pack-heuristics.adoc
345    async fn inner_encode(
346        &mut self,
347        mut entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
348        enable_zstdelta: bool,
349    ) -> Result<(), GitError> {
350        let head = encode_header(self.object_number);
351        self.send_data(head.clone()).await;
352        self.inner_hash.update(&head);
353
354        // ensure only one decode can only invoke once
355        if self.start_encoding {
356            return Err(GitError::PackEncodeError(
357                "encoding operation is already in progress".to_string(),
358            ));
359        }
360
361        let mut commits: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
362        let mut trees: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
363        let mut blobs: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
364        let mut tags: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
365        while let Some(entry) = entry_rx.recv().await {
366            match entry.inner.obj_type {
367                ObjectType::Commit => {
368                    commits.push(entry);
369                }
370                ObjectType::Tree => {
371                    trees.push(entry);
372                }
373                ObjectType::Blob => {
374                    blobs.push(entry);
375                }
376                ObjectType::Tag => {
377                    tags.push(entry);
378                }
379                _ => {
380                    return Err(GitError::PackEncodeError(format!(
381                        "object type `{}` is not supported by delta-window pack encoding",
382                        entry.inner.obj_type
383                    )));
384                }
385            }
386        }
387
388        commits.sort_by(magic_sort);
389        trees.sort_by(magic_sort);
390        blobs.sort_by(magic_sort);
391        tags.sort_by(magic_sort);
392        tracing::info!(
393            "numbers :  commits: {:?} trees: {:?} blobs:{:?} tag :{:?}",
394            commits.len(),
395            trees.len(),
396            blobs.len(),
397            tags.len()
398        );
399
400        // parallel encoding vec with different object_type
401        let (commit_results, tree_results, blob_results, tag_results) = tokio::try_join!(
402            tokio::task::spawn_blocking(move || {
403                Self::try_as_offset_delta(
404                    commits
405                        .into_iter()
406                        .map(|entry_with_meta| entry_with_meta.inner)
407                        .collect(),
408                    10,
409                    enable_zstdelta,
410                )
411            }),
412            tokio::task::spawn_blocking(move || {
413                Self::try_as_offset_delta(
414                    trees
415                        .into_iter()
416                        .map(|entry_with_meta| entry_with_meta.inner)
417                        .collect(),
418                    10,
419                    enable_zstdelta,
420                )
421            }),
422            tokio::task::spawn_blocking(move || {
423                Self::try_as_offset_delta(
424                    blobs
425                        .into_iter()
426                        .map(|entry_with_meta| entry_with_meta.inner)
427                        .collect(),
428                    10,
429                    enable_zstdelta,
430                )
431            }),
432            tokio::task::spawn_blocking(move || {
433                Self::try_as_offset_delta(
434                    tags.into_iter()
435                        .map(|entry_with_meta| entry_with_meta.inner)
436                        .collect(),
437                    10,
438                    enable_zstdelta,
439                )
440            }),
441        )
442        .map_err(|e| GitError::PackEncodeError(format!("Task join error: {e}")))?;
443
444        let commit_res = commit_results?;
445        let tree_res = tree_results?;
446        let blob_res = blob_results?;
447        let tag_res = tag_results?;
448
449        let mut all_res = vec![commit_res, tree_res, blob_res, tag_res];
450
451        let mut idx_entries = Vec::new();
452        for res in &mut all_res {
453            for data in res {
454                data.1.offset = self.inner_offset as u64;
455                self.write_all_and_update(&data.0).await;
456                idx_entries.push(data.1.clone());
457            }
458        }
459
460        self.idx_entries = Some(idx_entries);
461
462        // Hash signature
463        let hash_result = self.inner_hash.clone().finalize();
464        self.final_hash = Some(ObjectHash::from_bytes(&hash_result).unwrap());
465        self.send_data(hash_result.to_vec()).await;
466
467        self.drop_sender();
468        Ok(())
469    }
470
471    /// Try to encode as delta using objects in window
472    /// delta & zstdelta have been gathered here
473    /// Refs: https://sapling-scm.com/docs/dev/internals/zstdelta/
474    /// the sliding window was moved here
475    /// # Returns
476    /// - Return (Vec<Vec<u8>) if success make delta
477    /// - Return (None) if didn't delta,
478    fn try_as_offset_delta(
479        mut bucket: Vec<Entry>,
480        window_size: usize,
481        enable_zstdelta: bool,
482    ) -> Result<Vec<(Vec<u8>, IndexEntry)>, GitError> {
483        let mut current_offset = 0usize;
484        let mut window: VecDeque<(Entry, usize)> = VecDeque::with_capacity(window_size);
485        let mut res: Vec<(Vec<u8>, IndexEntry)> = Vec::new();
486        //let mut idx_entries: Vec<IndexEntry> = Vec::new();
487
488        for entry in bucket.iter_mut() {
489            //let entry_for_window = entry.clone();
490            // 每次循环重置最佳基对象选择
491            let mut best_base: Option<&(Entry, usize)> = None;
492            let mut best_rate: f64 = 0.0;
493            let tie_epsilon: f64 = 0.15;
494
495            let candidates: Vec<_> = window
496                .par_iter()
497                .with_min_len(3)
498                .filter_map(|try_base| {
499                    if try_base.0.obj_type != entry.obj_type {
500                        return None;
501                    }
502
503                    if try_base.0.chain_len >= MAX_CHAIN_LEN {
504                        return None;
505                    }
506
507                    if try_base.0.hash == entry.hash {
508                        return None;
509                    }
510
511                    let sym_ratio = (try_base.0.data.len().min(entry.data.len()) as f64)
512                        / (try_base.0.data.len().max(entry.data.len()) as f64);
513                    if sym_ratio < 0.5 {
514                        return None;
515                    }
516
517                    if !cheap_similar(&try_base.0.data, &entry.data) {
518                        return None;
519                    }
520
521                    let rate = if (try_base.0.data.len() + entry.data.len()) / 2 > 64 {
522                        delta::heuristic_encode_rate_parallel(&try_base.0.data, &entry.data)
523                    } else {
524                        delta::encode_rate(&try_base.0.data, &entry.data)
525                        // let try_delta_obj = zstdelta::diff(&try_base.0.data, &entry.data).unwrap();
526                        // 1.0 - try_delta_obj.len() as f64 / entry.data.len() as f64
527                    };
528
529                    if rate > MIN_DELTA_RATE {
530                        Some((rate, try_base))
531                    } else {
532                        None
533                    }
534                })
535                .collect();
536
537            for (rate, try_base) in candidates {
538                match best_base {
539                    None => {
540                        best_rate = rate;
541                        //best_base_offset = current_offset - try_base.1;
542                        best_base = Some(try_base);
543                    }
544                    Some(best_base_ref) => {
545                        let is_better = if rate > best_rate + tie_epsilon {
546                            true
547                        } else if (rate - best_rate).abs() <= tie_epsilon {
548                            try_base.0.chain_len > best_base_ref.0.chain_len
549                        } else {
550                            false
551                        };
552
553                        if is_better {
554                            best_rate = rate;
555                            best_base = Some(try_base);
556                        }
557                    }
558                }
559            }
560
561            let mut entry_for_window = entry.clone();
562
563            let offset = best_base.map(|best_base| {
564                let delta = if enable_zstdelta {
565                    entry.obj_type = ObjectType::OffsetZstdelta;
566                    zstdelta::diff(&best_base.0.data, &entry.data)
567                        .map_err(|e| {
568                            GitError::DeltaObjectError(format!("zstdelta diff failed: {e}"))
569                        })
570                        .unwrap()
571                } else {
572                    entry.obj_type = ObjectType::OffsetDelta;
573                    delta::encode(&best_base.0.data, &entry.data)
574                };
575                //entry.obj_type = ObjectType::OffsetDelta;
576                entry.data = delta;
577                entry.chain_len = best_base.0.chain_len + 1;
578                current_offset - best_base.1
579            });
580
581            entry_for_window.chain_len = entry.chain_len;
582            let obj_data = encode_one_object(entry, offset)?;
583            window.push_back((entry_for_window, current_offset));
584            if window.len() > window_size {
585                window.pop_front();
586            }
587            res.push((obj_data.clone(), IndexEntry::new(entry, 0)));
588            current_offset += obj_data.len();
589        }
590        Ok(res)
591    }
592
593    /// Parallel encode with rayon, only works when window_size == 0 (no delta)
594    pub async fn parallel_encode(
595        &mut self,
596        mut entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
597    ) -> Result<(), GitError> {
598        if self.window_size != 0 {
599            return Err(GitError::PackEncodeError(
600                "parallel encode only works when window_size == 0".to_string(),
601            ));
602        }
603
604        let head = encode_header(self.object_number);
605        self.send_data(head.clone()).await;
606        self.inner_hash.update(&head);
607
608        // ensure only one decode can only invoke once
609        if self.start_encoding {
610            return Err(GitError::PackEncodeError(
611                "encoding operation is already in progress".to_string(),
612            ));
613        }
614
615        let mut idx_entries = Vec::new();
616        let batch_size = usize::max(1000, entry_rx.max_capacity() / 10); // A temporary value, not optimized
617        tracing::info!("encode with batch size: {}", batch_size);
618        loop {
619            let mut batch_entries = Vec::with_capacity(batch_size);
620            time_it!("parallel encode: receive batch", {
621                for _ in 0..batch_size {
622                    match entry_rx.recv().await {
623                        Some(entry) => {
624                            batch_entries.push(entry.inner);
625                            self.process_index += 1;
626                        }
627                        None => break,
628                    }
629                }
630            });
631
632            if batch_entries.is_empty() {
633                break;
634            }
635
636            // use `collect` will return result in order, refs: https://github.com/rayon-rs/rayon/issues/551#issuecomment-371657900
637            let batch_result: Vec<Result<(Vec<u8>, IndexEntry), GitError>> =
638                time_it!("parallel encode: encode batch", {
639                    batch_entries
640                        .par_iter()
641                        .map(|entry| {
642                            encode_one_object(entry, None)
643                                .map(|encoded| (encoded, IndexEntry::new(entry, 0)))
644                        })
645                        .collect()
646                });
647
648            time_it!("parallel encode: write batch", {
649                for obj_data in batch_result {
650                    let mut obj_data = obj_data?;
651                    obj_data.1.offset = self.inner_offset as u64;
652                    self.write_all_and_update(&obj_data.0).await;
653                    idx_entries.push(obj_data.1);
654                }
655            });
656        }
657
658        tracing::debug!("parallel encode idx entries: {:?}", idx_entries.len());
659        if self.process_index != self.object_number {
660            panic!(
661                "not all objects are encoded, process:{}, total:{}",
662                self.process_index, self.object_number
663            );
664        }
665
666        // hash signature
667        let hash_result = self.inner_hash.clone().finalize();
668        self.final_hash = Some(ObjectHash::from_bytes(&hash_result).unwrap());
669        self.send_data(hash_result.to_vec()).await;
670        self.drop_sender();
671
672        self.idx_entries = Some(idx_entries);
673        Ok(())
674    }
675
676    /// Write data to writer and update hash & offset
677    async fn write_all_and_update(&mut self, data: &[u8]) {
678        self.inner_hash.update(data);
679        self.inner_offset += data.len();
680        self.send_data(data.to_vec()).await;
681    }
682
683    async fn generate_idx_file(&mut self) -> Result<(), GitError> {
684        let final_hash = self.final_hash
685            .ok_or(GitError::PackEncodeError("final_hash is missing,The pack file must be generated before the index file is produced.".into()))?;
686        let idx_entries = self.idx_entries.clone().ok_or(GitError::PackEncodeError(
687            "The pack file must be generated before the index file is produced.".into(),
688        ))?;
689        let mut idx_builder = IdxBuilder::new(
690            self.object_number,
691            self.idx_sender.clone().unwrap(),
692            final_hash,
693        );
694        idx_builder.write_idx(idx_entries).await?;
695        Ok(())
696    }
697
698    /// async version of encode, result data will be returned by JoinHandle.
699    /// It will consume PackEncoder, so you can't use it after calling this function.
700    /// when window_size = 0, it executes parallel_encode which retains stream transmission
701    /// when window_size = 0,it executes encode which uses magic sort and delta.
702    /// It seems that all other modules rely on this api
703    pub async fn encode_async(
704        mut self,
705        rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
706    ) -> Result<JoinHandle<()>, GitError> {
707        Ok(tokio::spawn(async move {
708            if self.window_size == 0 {
709                self.parallel_encode(rx).await.unwrap()
710            } else {
711                self.encode(rx).await.unwrap()
712            }
713        }))
714    }
715
716    /// async version of encode_with_zstdelta, result data will be returned by JoinHandle.
717    pub async fn encode_async_with_zstdelta(
718        mut self,
719        rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
720    ) -> Result<JoinHandle<()>, GitError> {
721        Ok(tokio::spawn(async move {
722            // Do not use parallel encode with zstdelta because it make no sense.
723            self.encode_with_zstdelta(rx).await.unwrap()
724        }))
725    }
726
727    /// Generate idx file after pack file has been generated
728    pub async fn encode_idx_file(&mut self) -> Result<(), GitError> {
729        if self.idx_sender.is_none() {
730            return Err(GitError::PackEncodeError(String::from(
731                "idx sender is none",
732            )));
733        }
734        self.generate_idx_file().await?;
735        // drop sender so downstream consumer can finish
736        self.idx_sender.take();
737        Ok(())
738    }
739}
740
741#[cfg(test)]
742mod tests {
743    use std::{env, io::Cursor, path::PathBuf, sync::Arc, time::Instant};
744
745    use tempfile::tempdir;
746    use tokio::sync::Mutex;
747
748    use super::*;
749    use crate::{
750        hash::{HashKind, ObjectHash, set_hash_kind_for_test},
751        internal::{
752            object::{blob::Blob, types::ObjectType},
753            pack::{Pack, tests::init_logger, utils::read_offset_encoding},
754        },
755        time_it,
756    };
757
758    /// Check if the given data is a valid pack file format by attempting to decode it.
759    fn check_format(data: &Vec<u8>) {
760        // Use a smaller cap on 32-bit targets to avoid usize overflow.
761        let max_pack_size_u64 = if cfg!(target_pointer_width = "64") {
762            6u64 * 1024 * 1024 * 1024
763        } else {
764            2u64 * 1024 * 1024 * 1024
765        };
766        let max_pack_size = usize::try_from(max_pack_size_u64).unwrap_or_else(|_| {
767            panic!(
768                "internal assertion failed: pack size cap {} does not fit in usize on this \
769                 target; this should be unreachable given the target_pointer_width configuration",
770                max_pack_size_u64
771            )
772        });
773        let mut p = Pack::new(
774            None,
775            Some(max_pack_size), // 6GB on 64-bit, 2GB on 32-bit
776            Some(PathBuf::from("/tmp/.cache_temp")),
777            true,
778        );
779        let mut reader = Cursor::new(data);
780        tracing::debug!("start check format");
781        p.decode(&mut reader, |_| {}, None::<fn(ObjectHash)>)
782            .expect("pack file format error");
783    }
784
785    #[tokio::test]
786    async fn test_pack_encoder() {
787        let _guard = set_hash_kind_for_test(HashKind::Sha1);
788        async fn encode_once(window_size: usize) -> Vec<u8> {
789            let (tx, mut rx) = mpsc::channel(100);
790            let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
791
792            // make some different objects, or decode will fail
793            let str_vec = vec!["hello, word", "hello, world.", "!", "123141251251"];
794            let encoder = PackEncoder::new(str_vec.len(), window_size, tx);
795            encoder.encode_async(entry_rx).await.unwrap();
796
797            for str in str_vec {
798                let blob = Blob::from_content(str);
799                let entry: Entry = blob.into();
800                entry_tx
801                    .send(MetaAttached {
802                        inner: entry,
803                        meta: EntryMeta::new(),
804                    })
805                    .await
806                    .unwrap();
807            }
808            drop(entry_tx);
809            // assert!(encoder.get_hash().is_some());
810            let mut result = Vec::new();
811            while let Some(chunk) = rx.recv().await {
812                result.extend(chunk);
813            }
814            result
815        }
816
817        // without delta
818        let pack_without_delta = encode_once(0).await;
819        let pack_without_delta_size = pack_without_delta.len();
820        check_format(&pack_without_delta);
821
822        // with delta
823        let pack_with_delta = encode_once(4).await;
824        assert!(pack_with_delta.len() <= pack_without_delta_size);
825        check_format(&pack_with_delta);
826    }
827    #[tokio::test]
828    async fn test_pack_encoder_sha256() {
829        let _guard = set_hash_kind_for_test(HashKind::Sha256);
830
831        async fn encode_once(window_size: usize) -> Vec<u8> {
832            let (tx, mut rx) = mpsc::channel(100);
833            let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
834
835            let str_vec = vec!["hello, word", "hello, world.", "!", "123141251251"];
836            let encoder = PackEncoder::new(str_vec.len(), window_size, tx);
837            encoder.encode_async(entry_rx).await.unwrap();
838
839            for s in str_vec {
840                let blob = Blob::from_content(s);
841                let entry: Entry = blob.into();
842                entry_tx
843                    .send(MetaAttached {
844                        inner: entry,
845                        meta: EntryMeta::new(),
846                    })
847                    .await
848                    .unwrap();
849            }
850            drop(entry_tx);
851
852            let mut result = Vec::new();
853            while let Some(chunk) = rx.recv().await {
854                result.extend(chunk);
855            }
856            result
857        }
858
859        // without delta
860        let pack_without_delta = encode_once(0).await;
861        let pack_without_delta_size = pack_without_delta.len();
862        check_format(&pack_without_delta);
863
864        // with delta
865        let pack_with_delta = encode_once(4).await;
866        assert!(pack_with_delta.len() <= pack_without_delta_size);
867        check_format(&pack_with_delta);
868    }
869
870    #[tokio::test]
871    async fn test_pack_encoder_rejects_unencodable_ai_type_parallel() {
872        let (tx, _rx) = mpsc::channel(8);
873        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
874        let mut encoder = PackEncoder::new(1, 0, tx);
875
876        let mut entry: Entry = Blob::from_content("ai").into();
877        entry.obj_type = ObjectType::Task;
878        entry_tx
879            .send(MetaAttached {
880                inner: entry,
881                meta: EntryMeta::new(),
882            })
883            .await
884            .expect("send entry");
885        drop(entry_tx);
886
887        let err = encoder
888            .encode(entry_rx)
889            .await
890            .expect_err("must reject AI pack type");
891        assert!(matches!(err, GitError::PackEncodeError(_)));
892    }
893
894    #[tokio::test]
895    async fn test_pack_encoder_rejects_unencodable_ai_type_delta_window() {
896        let (tx, _rx) = mpsc::channel(8);
897        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
898        let mut encoder = PackEncoder::new(1, 10, tx);
899
900        let mut entry: Entry = Blob::from_content("ai").into();
901        entry.obj_type = ObjectType::Task;
902        entry_tx
903            .send(MetaAttached {
904                inner: entry,
905                meta: EntryMeta::new(),
906            })
907            .await
908            .expect("send entry");
909        drop(entry_tx);
910
911        let err = encoder
912            .encode(entry_rx)
913            .await
914            .expect_err("must reject AI pack type");
915        assert!(matches!(err, GitError::PackEncodeError(_)));
916    }
917
918    async fn get_entries_for_test() -> Arc<Mutex<Vec<Entry>>> {
919        let source = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
920            .join("tests/data/packs/encode-test-sha1.pack");
921
922        let mut p = Pack::new(None, None, Some(PathBuf::from("/tmp/.cache_temp")), true);
923
924        let f = std::fs::File::open(&source).unwrap();
925        tracing::info!("pack file size: {}", f.metadata().unwrap().len());
926        let mut reader = std::io::BufReader::new(f);
927        let entries = Arc::new(Mutex::new(Vec::new()));
928        let entries_clone = entries.clone();
929        p.decode(
930            &mut reader,
931            move |entry| {
932                let mut entries = entries_clone.blocking_lock();
933                entries.push(entry.inner);
934            },
935            None::<fn(ObjectHash)>,
936        )
937        .unwrap();
938        assert_eq!(p.number, entries.lock().await.len());
939        tracing::info!("total entries: {}", p.number);
940        drop(p);
941
942        entries
943    }
944    async fn get_entries_for_test_sha256() -> Arc<Mutex<Vec<Entry>>> {
945        let source = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
946            .join("tests/data/packs/encode-test-sha256.pack");
947
948        let mut p = Pack::new(None, None, Some(PathBuf::from("/tmp/.cache_temp")), true);
949
950        let f = std::fs::File::open(&source).unwrap();
951        tracing::info!("pack file size: {}", f.metadata().unwrap().len());
952        let mut reader = std::io::BufReader::new(f);
953        let entries = Arc::new(Mutex::new(Vec::new()));
954        let entries_clone = entries.clone();
955        p.decode(
956            &mut reader,
957            move |entry| {
958                let mut entries = entries_clone.blocking_lock();
959                entries.push(entry.inner);
960            },
961            None::<fn(ObjectHash)>,
962        )
963        .unwrap();
964        assert_eq!(p.number, entries.lock().await.len());
965        tracing::info!("total entries: {}", p.number);
966        drop(p);
967
968        entries
969    }
970
971    #[tokio::test]
972    async fn test_pack_encoder_parallel_large_file() {
973        let _guard = set_hash_kind_for_test(HashKind::Sha1);
974        init_logger();
975
976        let start = Instant::now();
977        let entries = get_entries_for_test().await;
978        let entries_number = entries.lock().await.len();
979
980        let total_original_size: usize = entries
981            .lock()
982            .await
983            .iter()
984            .map(|entry| entry.data.len())
985            .sum();
986
987        // encode entries with parallel
988        let (tx, mut rx) = mpsc::channel(1_000_000);
989        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1_000_000);
990
991        let mut encoder = PackEncoder::new(entries_number, 0, tx);
992        tokio::spawn(async move {
993            time_it!("test parallel encode", {
994                encoder.parallel_encode(entry_rx).await.unwrap();
995            });
996        });
997
998        // spawn a task to send entries
999        tokio::spawn(async move {
1000            let entries = entries.lock().await;
1001            for entry in entries.iter() {
1002                entry_tx
1003                    .send(MetaAttached {
1004                        inner: entry.clone(),
1005                        meta: EntryMeta::new(),
1006                    })
1007                    .await
1008                    .unwrap();
1009            }
1010            drop(entry_tx);
1011            tracing::info!("all entries sent");
1012        });
1013
1014        let mut result = Vec::new();
1015        while let Some(chunk) = rx.recv().await {
1016            result.extend(chunk);
1017        }
1018
1019        let pack_size = result.len();
1020        let compression_rate = if total_original_size > 0 {
1021            1.0 - (pack_size as f64 / total_original_size as f64)
1022        } else {
1023            0.0
1024        };
1025
1026        let duration = start.elapsed();
1027        tracing::info!("test executed in: {:.2?}", duration);
1028        tracing::info!("new pack file size: {}", result.len());
1029        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1030        // check format
1031        check_format(&result);
1032    }
1033    #[tokio::test]
1034    async fn test_pack_encoder_parallel_large_file_sha256() {
1035        let _guard = set_hash_kind_for_test(HashKind::Sha256);
1036        init_logger();
1037
1038        let start = Instant::now();
1039        // use sha256 pack file for testing
1040        let entries = get_entries_for_test_sha256().await;
1041        let entries_number = entries.lock().await.len();
1042
1043        let total_original_size: usize = entries
1044            .lock()
1045            .await
1046            .iter()
1047            .map(|entry| entry.data.len())
1048            .sum();
1049
1050        let (tx, mut rx) = mpsc::channel(1_000_000);
1051        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1_000_000);
1052
1053        let mut encoder = PackEncoder::new(entries_number, 0, tx);
1054        tokio::spawn(async move {
1055            time_it!("test parallel encode sha256", {
1056                encoder.parallel_encode(entry_rx).await.unwrap();
1057            });
1058        });
1059
1060        tokio::spawn(async move {
1061            let entries = entries.lock().await;
1062            for entry in entries.iter() {
1063                entry_tx
1064                    .send(MetaAttached {
1065                        inner: entry.clone(),
1066                        meta: EntryMeta::new(),
1067                    })
1068                    .await
1069                    .unwrap();
1070            }
1071            drop(entry_tx);
1072            tracing::info!("all entries sent");
1073        });
1074
1075        let mut result = Vec::new();
1076        while let Some(chunk) = rx.recv().await {
1077            result.extend(chunk);
1078        }
1079
1080        let pack_size = result.len();
1081        let compression_rate = if total_original_size > 0 {
1082            1.0 - (pack_size as f64 / total_original_size as f64)
1083        } else {
1084            0.0
1085        };
1086
1087        let duration = start.elapsed();
1088        tracing::info!("sha256 test executed in: {:.2?}", duration);
1089        tracing::info!("new pack file size: {}", result.len());
1090        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1091        check_format(&result);
1092    }
1093
1094    #[tokio::test]
1095    async fn test_pack_encoder_large_file() {
1096        let _guard = set_hash_kind_for_test(HashKind::Sha1);
1097        init_logger();
1098        let entries = get_entries_for_test().await;
1099        let entries_number = entries.lock().await.len();
1100
1101        let total_original_size: usize = entries
1102            .lock()
1103            .await
1104            .iter()
1105            .map(|entry| entry.data.len())
1106            .sum();
1107
1108        let start = Instant::now();
1109        // encode entries
1110        let (tx, mut rx) = mpsc::channel(100_000);
1111        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1112
1113        let mut encoder = PackEncoder::new(entries_number, 0, tx);
1114        tokio::spawn(async move {
1115            time_it!("test encode no parallel", {
1116                encoder.encode(entry_rx).await.unwrap();
1117            });
1118        });
1119
1120        // spawn a task to send entries
1121        tokio::spawn(async move {
1122            let entries = entries.lock().await;
1123            for entry in entries.iter() {
1124                entry_tx
1125                    .send(MetaAttached {
1126                        inner: entry.clone(),
1127                        meta: EntryMeta::new(),
1128                    })
1129                    .await
1130                    .unwrap();
1131            }
1132            drop(entry_tx);
1133            tracing::info!("all entries sent");
1134        });
1135
1136        // // only receive data
1137        // while (rx.recv().await).is_some() {
1138        //     // do nothing
1139        // }
1140
1141        let mut result = Vec::new();
1142        while let Some(chunk) = rx.recv().await {
1143            result.extend(chunk);
1144        }
1145
1146        let pack_size = result.len();
1147        let compression_rate = if total_original_size > 0 {
1148            1.0 - (pack_size as f64 / total_original_size as f64)
1149        } else {
1150            0.0
1151        };
1152
1153        let duration = start.elapsed();
1154        tracing::info!("test executed in: {:.2?}", duration);
1155        tracing::info!("new pack file size: {}", pack_size);
1156        tracing::info!("original total size: {}", total_original_size);
1157        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1158        tracing::info!(
1159            "space saved: {} bytes",
1160            total_original_size.saturating_sub(pack_size)
1161        );
1162    }
1163    #[tokio::test]
1164    async fn test_pack_encoder_large_file_sha256() {
1165        let _guard = set_hash_kind_for_test(HashKind::Sha256);
1166        init_logger();
1167        let entries = get_entries_for_test_sha256().await;
1168        let entries_number = entries.lock().await.len();
1169
1170        let total_original_size: usize = entries
1171            .lock()
1172            .await
1173            .iter()
1174            .map(|entry| entry.data.len())
1175            .sum();
1176
1177        let start = Instant::now();
1178        // encode entries
1179        let (tx, mut rx) = mpsc::channel(100_000);
1180        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1181
1182        let mut encoder = PackEncoder::new(entries_number, 0, tx);
1183        tokio::spawn(async move {
1184            time_it!("test encode no parallel sha256", {
1185                encoder.encode(entry_rx).await.unwrap();
1186            });
1187        });
1188
1189        // spawn a task to send entries
1190        tokio::spawn(async move {
1191            let entries = entries.lock().await;
1192            for entry in entries.iter() {
1193                entry_tx
1194                    .send(MetaAttached {
1195                        inner: entry.clone(),
1196                        meta: EntryMeta::new(),
1197                    })
1198                    .await
1199                    .unwrap();
1200            }
1201            drop(entry_tx);
1202            tracing::info!("all entries sent");
1203        });
1204
1205        // // only receive data
1206        // while (rx.recv().await).is_some() {
1207        //     // do nothing
1208        // }
1209
1210        let mut result = Vec::new();
1211        while let Some(chunk) = rx.recv().await {
1212            result.extend(chunk);
1213        }
1214
1215        let pack_size = result.len();
1216        let compression_rate = if total_original_size > 0 {
1217            1.0 - (pack_size as f64 / total_original_size as f64)
1218        } else {
1219            0.0
1220        };
1221
1222        let duration = start.elapsed();
1223        tracing::info!("test executed in: {:.2?}", duration);
1224        tracing::info!("new pack file size: {}", pack_size);
1225        tracing::info!("original total size: {}", total_original_size);
1226        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1227        tracing::info!(
1228            "space saved: {} bytes",
1229            total_original_size.saturating_sub(pack_size)
1230        );
1231    }
1232
1233    #[tokio::test]
1234    async fn test_pack_encoder_with_zstdelta() {
1235        let _guard = set_hash_kind_for_test(HashKind::Sha1);
1236        init_logger();
1237        let entries = get_entries_for_test().await;
1238        let entries_number = entries.lock().await.len();
1239
1240        let total_original_size: usize = entries
1241            .lock()
1242            .await
1243            .iter()
1244            .map(|entry| entry.data.len())
1245            .sum();
1246
1247        let start = Instant::now();
1248        let (tx, mut rx) = mpsc::channel(100_000);
1249        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1250
1251        let encoder = PackEncoder::new(entries_number, 10, tx);
1252        encoder.encode_async_with_zstdelta(entry_rx).await.unwrap();
1253
1254        // spawn a task to send entries
1255        tokio::spawn(async move {
1256            let entries = entries.lock().await;
1257            for entry in entries.iter() {
1258                entry_tx
1259                    .send(MetaAttached {
1260                        inner: entry.clone(),
1261                        meta: EntryMeta::new(),
1262                    })
1263                    .await
1264                    .unwrap();
1265            }
1266            drop(entry_tx);
1267            tracing::info!("all entries sent");
1268        });
1269
1270        let mut result = Vec::new();
1271        while let Some(chunk) = rx.recv().await {
1272            result.extend(chunk);
1273        }
1274
1275        let pack_size = result.len();
1276        let compression_rate = if total_original_size > 0 {
1277            1.0 - (pack_size as f64 / total_original_size as f64)
1278        } else {
1279            0.0
1280        };
1281
1282        let duration = start.elapsed();
1283        tracing::info!("test executed in: {:.2?}", duration);
1284        tracing::info!("new pack file size: {}", pack_size);
1285        tracing::info!("original total size: {}", total_original_size);
1286        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1287        tracing::info!(
1288            "space saved: {} bytes",
1289            total_original_size.saturating_sub(pack_size)
1290        );
1291
1292        // check format
1293        check_format(&result);
1294    }
1295    #[tokio::test]
1296    async fn test_pack_encoder_with_zstdelta_sha256() {
1297        let _guard = set_hash_kind_for_test(HashKind::Sha256);
1298        init_logger();
1299        let entries = get_entries_for_test_sha256().await;
1300        let entries_number = entries.lock().await.len();
1301
1302        let total_original_size: usize = entries
1303            .lock()
1304            .await
1305            .iter()
1306            .map(|entry| entry.data.len())
1307            .sum();
1308
1309        let start = Instant::now();
1310        let (tx, mut rx) = mpsc::channel(100_000);
1311        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1312
1313        let encoder = PackEncoder::new(entries_number, 10, tx);
1314        encoder.encode_async_with_zstdelta(entry_rx).await.unwrap();
1315
1316        // spawn a task to send entries
1317        tokio::spawn(async move {
1318            let entries = entries.lock().await;
1319            for entry in entries.iter() {
1320                entry_tx
1321                    .send(MetaAttached {
1322                        inner: entry.clone(),
1323                        meta: EntryMeta::new(),
1324                    })
1325                    .await
1326                    .unwrap();
1327            }
1328            drop(entry_tx);
1329            tracing::info!("all entries sent");
1330        });
1331
1332        let mut result = Vec::new();
1333        while let Some(chunk) = rx.recv().await {
1334            result.extend(chunk);
1335        }
1336
1337        let pack_size = result.len();
1338        let compression_rate = if total_original_size > 0 {
1339            1.0 - (pack_size as f64 / total_original_size as f64)
1340        } else {
1341            0.0
1342        };
1343
1344        let duration = start.elapsed();
1345        tracing::info!("test executed in: {:.2?}", duration);
1346        tracing::info!("new pack file size: {}", pack_size);
1347        tracing::info!("original total size: {}", total_original_size);
1348        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1349        tracing::info!(
1350            "space saved: {} bytes",
1351            total_original_size.saturating_sub(pack_size)
1352        );
1353
1354        // check format
1355        check_format(&result);
1356    }
1357
1358    #[test]
1359    fn test_encode_offset() {
1360        // let value = 11013;
1361        let value = 16389;
1362
1363        let data = encode_offset(value);
1364        println!("{data:?}");
1365        let mut reader = Cursor::new(data);
1366        let (result, _) = read_offset_encoding(&mut reader).unwrap();
1367        println!("result: {result}");
1368        assert_eq!(result, value as u64);
1369    }
1370
1371    #[tokio::test]
1372    async fn test_pack_encoder_large_file_with_delta() {
1373        let _guard = set_hash_kind_for_test(HashKind::Sha1);
1374        init_logger();
1375        let entries = get_entries_for_test().await;
1376        let entries_number = entries.lock().await.len();
1377
1378        let total_original_size: usize = entries
1379            .lock()
1380            .await
1381            .iter()
1382            .map(|entry| entry.data.len())
1383            .sum();
1384
1385        let (tx, mut rx) = mpsc::channel(100_000);
1386        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1387
1388        let encoder = PackEncoder::new(entries_number, 10, tx);
1389
1390        let start = Instant::now(); // 开始时间
1391        encoder.encode_async(entry_rx).await.unwrap();
1392
1393        // spawn a task to send entries
1394        tokio::spawn(async move {
1395            let entries = entries.lock().await;
1396            for entry in entries.iter() {
1397                entry_tx
1398                    .send(MetaAttached {
1399                        inner: entry.clone(),
1400                        meta: EntryMeta::new(),
1401                    })
1402                    .await
1403                    .unwrap();
1404            }
1405            drop(entry_tx);
1406            tracing::info!("all entries sent");
1407        });
1408
1409        let mut result = Vec::new();
1410        while let Some(chunk) = rx.recv().await {
1411            result.extend(chunk);
1412        }
1413
1414        let pack_size = result.len();
1415        let compression_rate = if total_original_size > 0 {
1416            1.0 - (pack_size as f64 / total_original_size as f64)
1417        } else {
1418            0.0
1419        };
1420
1421        let duration = start.elapsed();
1422        tracing::info!("test executed in: {:.2?}", duration);
1423        tracing::info!("new pack file size: {}", pack_size);
1424        tracing::info!("original total size: {}", total_original_size);
1425        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1426        tracing::info!(
1427            "space saved: {} bytes",
1428            total_original_size.saturating_sub(pack_size)
1429        );
1430
1431        // check format
1432        check_format(&result);
1433    }
1434    #[tokio::test]
1435    async fn test_pack_encoder_large_file_with_delta_sha256() {
1436        let _guard = set_hash_kind_for_test(HashKind::Sha256);
1437        init_logger();
1438        let entries = get_entries_for_test_sha256().await;
1439        let entries_number = entries.lock().await.len();
1440
1441        let total_original_size: usize = entries
1442            .lock()
1443            .await
1444            .iter()
1445            .map(|entry| entry.data.len())
1446            .sum();
1447
1448        let (tx, mut rx) = mpsc::channel(100_000);
1449        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1450
1451        let encoder = PackEncoder::new(entries_number, 10, tx);
1452
1453        let start = Instant::now(); // 开始时间
1454        encoder.encode_async(entry_rx).await.unwrap();
1455
1456        // spawn a task to send entries
1457        tokio::spawn(async move {
1458            let entries = entries.lock().await;
1459            for entry in entries.iter() {
1460                entry_tx
1461                    .send(MetaAttached {
1462                        inner: entry.clone(),
1463                        meta: EntryMeta::new(),
1464                    })
1465                    .await
1466                    .unwrap();
1467            }
1468            drop(entry_tx);
1469            tracing::info!("all entries sent");
1470        });
1471
1472        let mut result = Vec::new();
1473        while let Some(chunk) = rx.recv().await {
1474            result.extend(chunk);
1475        }
1476
1477        let pack_size = result.len();
1478        let compression_rate = if total_original_size > 0 {
1479            1.0 - (pack_size as f64 / total_original_size as f64)
1480        } else {
1481            0.0
1482        };
1483
1484        let duration = start.elapsed();
1485        tracing::info!("test executed in: {:.2?}", duration);
1486        tracing::info!("new pack file size: {}", pack_size);
1487        tracing::info!("original total size: {}", total_original_size);
1488        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1489        tracing::info!(
1490            "space saved: {} bytes",
1491            total_original_size.saturating_sub(pack_size)
1492        );
1493
1494        // check format
1495        check_format(&result);
1496    }
1497
1498    #[tokio::test]
1499    async fn test_pack_encoder_output_to_files() {
1500        let _guard = set_hash_kind_for_test(HashKind::Sha1);
1501        init_logger();
1502        let entries = get_entries_for_test().await;
1503        let entries_number = entries.lock().await.len();
1504
1505        let total_original_size: usize = entries
1506            .lock()
1507            .await
1508            .iter()
1509            .map(|entry| entry.data.len())
1510            .sum();
1511
1512        let start = Instant::now();
1513
1514        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1515        // 自动创建临时目录,生命周期结束自动删除
1516        let dir = tempdir().unwrap();
1517        let path = dir.path();
1518
1519        // spawn a task to send entries
1520        tokio::spawn(async move {
1521            let entries = entries.lock().await;
1522            for entry in entries.iter() {
1523                entry_tx
1524                    .send(MetaAttached {
1525                        inner: entry.clone(),
1526                        meta: EntryMeta::new(),
1527                    })
1528                    .await
1529                    .unwrap();
1530            }
1531            drop(entry_tx);
1532            tracing::info!("all entries sent");
1533        });
1534
1535        encode_and_output_to_files(entry_rx, entries_number, path.to_path_buf(), 0)
1536            .await
1537            .unwrap();
1538
1539        // 验证临时目录下生成的 pack/idx 文件
1540        let mut pack_file = None;
1541        let mut idx_file = None;
1542        for entry in std::fs::read_dir(path).unwrap() {
1543            let entry = entry.unwrap();
1544            let file_name = entry.file_name();
1545            tracing::info!("file name: {:?}", file_name);
1546            let file_name = file_name.to_string_lossy();
1547            if file_name.ends_with(".pack") {
1548                pack_file = Some(entry.path());
1549            } else if file_name.ends_with(".idx") {
1550                idx_file = Some(entry.path());
1551            }
1552        }
1553        let pack_file = pack_file.expect("pack file not generated");
1554        let idx_file = idx_file.expect("idx file not generated");
1555        assert!(
1556            pack_file.metadata().unwrap().len() > 0,
1557            "pack file is empty"
1558        );
1559        assert!(idx_file.metadata().unwrap().len() > 0, "idx file is empty");
1560
1561        let duration = start.elapsed();
1562        tracing::info!("test executed in: {:.2?}", duration);
1563        tracing::info!("original total size: {}", total_original_size);
1564    }
1565
1566    #[tokio::test]
1567    async fn test_pack_encoder_output_to_files_with_delta() {
1568        let _guard = set_hash_kind_for_test(HashKind::Sha1);
1569        init_logger();
1570        let entries = get_entries_for_test().await;
1571        let entries_number = entries.lock().await.len();
1572
1573        let total_original_size: usize = entries
1574            .lock()
1575            .await
1576            .iter()
1577            .map(|entry| entry.data.len())
1578            .sum();
1579
1580        let start = Instant::now();
1581
1582        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1583        // 自动创建临时目录,生命周期结束自动删除
1584        let dir = tempdir().unwrap();
1585        let path = dir.path();
1586
1587        // spawn a task to send entries
1588        tokio::spawn(async move {
1589            let entries = entries.lock().await;
1590            for entry in entries.iter() {
1591                entry_tx
1592                    .send(MetaAttached {
1593                        inner: entry.clone(),
1594                        meta: EntryMeta::new(),
1595                    })
1596                    .await
1597                    .unwrap();
1598            }
1599            drop(entry_tx);
1600            tracing::info!("all entries sent");
1601        });
1602
1603        encode_and_output_to_files(entry_rx, entries_number, path.to_path_buf(), 10)
1604            .await
1605            .unwrap();
1606
1607        // 验证临时目录下生成的 pack/idx 文件
1608        let mut pack_file = None;
1609        let mut idx_file = None;
1610        for entry in std::fs::read_dir(path).unwrap() {
1611            let entry = entry.unwrap();
1612            let file_name = entry.file_name();
1613            tracing::info!("file name: {:?}", file_name);
1614            let file_name = file_name.to_string_lossy();
1615            if file_name.ends_with(".pack") {
1616                pack_file = Some(entry.path());
1617            } else if file_name.ends_with(".idx") {
1618                idx_file = Some(entry.path());
1619            }
1620        }
1621        let pack_file = pack_file.expect("pack file not generated");
1622        let idx_file = idx_file.expect("idx file not generated");
1623        assert!(
1624            pack_file.metadata().unwrap().len() > 0,
1625            "pack file is empty"
1626        );
1627        assert!(idx_file.metadata().unwrap().len() > 0, "idx file is empty");
1628
1629        let duration = start.elapsed();
1630        tracing::info!("test executed in: {:.2?}", duration);
1631        tracing::info!("original total size: {}", total_original_size);
1632    }
1633}