Skip to main content

git_internal/internal/pack/
encode.rs

1//! Pack encoder capable of building streamed `.pack`/`.idx` pairs with optional delta compression,
2//! windowing, and asynchronous writers.
3
4use std::{
5    cmp::Ordering,
6    collections::VecDeque,
7    hash::{Hash, Hasher},
8    io::Write,
9    path::{Path, PathBuf},
10};
11
12use ahash::AHasher;
13// use libc::ungetc;
14use chrono::Utc;
15use flate2::write::ZlibEncoder;
16use natord::compare;
17use rayon::prelude::*;
18//use tokio::io::AsyncWriteExt;
19use tokio::io::AsyncWriteExt as TokioAsyncWriteExt;
20use tokio::{fs::File, sync::mpsc, task::JoinHandle};
21
22//use std::io as stdio;
23use crate::delta;
24use crate::{
25    errors::GitError,
26    hash::ObjectHash,
27    internal::{
28        metadata::{EntryMeta, MetaAttached},
29        object::types::ObjectType,
30        pack::{entry::Entry, index_entry::IndexEntry, pack_index::IdxBuilder},
31    },
32    time_it,
33    utils::HashAlgorithm,
34    zstdelta,
35};
36
37const MAX_CHAIN_LEN: usize = 50;
38const MIN_DELTA_RATE: f64 = 0.5; // minimum delta rate
39//const MAX_ZSTDELTA_CHAIN_LEN: usize = 50;
40
41/// A encoder for generating pack files with delta objects.
42pub struct PackEncoder {
43    //path: Option<PathBuf>,
44    object_number: usize,
45    process_index: usize,
46    window_size: usize,
47    // window: VecDeque<(Entry, usize)>, // entry and offset
48    pack_sender: Option<mpsc::Sender<Vec<u8>>>,
49    idx_sender: Option<mpsc::Sender<Vec<u8>>>,
50    //idx_sender: Option<mpsc::Sender<Vec<u8>>>,
51    idx_entries: Option<Vec<IndexEntry>>,
52    inner_offset: usize,       // offset of current entry
53    inner_hash: HashAlgorithm, // introduce different hash algorithm
54    final_hash: Option<ObjectHash>,
55    start_encoding: bool,
56}
57
58/// Encode entries into a pack, write `.pack`/`.idx` files to `output_dir`.
59/// - Spawns background writers to consume pack/idx channels to avoid back-pressure.
60/// - Uses `window_size` to control delta: `0` means no delta (parallel encode), otherwise enable delta window.
61/// # Arguments
62/// * `raw_entries_rx` - receiver providing entries with metadata
63/// * `object_number` - expected total object count for the pack header
64/// * `output_dir` - target directory to place the generated files
65/// * `window_size` - delta window size; `0` disables delta
66/// # Returns
67/// * `Ok(())` on success, `GitError` on failure
68pub async fn encode_and_output_to_files(
69    raw_entries_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
70    object_number: usize,
71    output_dir: PathBuf,
72    window_size: usize,
73) -> Result<(), GitError> {
74    let (pack_tx, mut pack_rx) = mpsc::channel(1024);
75    let (idx_tx, mut idx_rx) = mpsc::channel(1024);
76    let mut pack_encoder = PackEncoder::new_with_idx(object_number, window_size, pack_tx, idx_tx);
77
78    // timestamp for temp filename
79    let now = Utc::now();
80    let timestamp = now.format("%Y%m%d%H%M%S%.3f").to_string(); // 例如 20251209235959.123
81    let tmp_path = output_dir.join(format!("{}objects.pack.tmp", timestamp));
82    let mut pack_file = File::create(&tmp_path).await?;
83
84    let pack_writer = tokio::spawn(async move {
85        while let Some(chunk) = pack_rx.recv().await {
86            TokioAsyncWriteExt::write_all(&mut pack_file, &chunk).await?;
87        }
88        //pack_file.flush().await?;
89        TokioAsyncWriteExt::flush(&mut pack_file).await?;
90        Ok::<(), GitError>(())
91    });
92
93    pack_encoder.encode(raw_entries_rx).await?;
94
95    // 等待 pack 写入完成
96    let pack_write_result = pack_writer
97        .await
98        .map_err(|e| GitError::PackEncodeError(format!("pack writer task join error: {e}")))?;
99    pack_write_result?;
100
101    let final_pack_name =
102        output_dir.join(format!("pack-{}.pack", pack_encoder.final_hash.unwrap()));
103    let final_idx_name = output_dir.join(format!("pack-{}.idx", pack_encoder.final_hash.unwrap()));
104    tokio::fs::rename(tmp_path, &final_pack_name).await?;
105
106    let mut idx_file = File::create(&final_idx_name).await?;
107    let idx_writer = tokio::spawn(async move {
108        while let Some(chunk) = idx_rx.recv().await {
109            //idx_file.write_all(&chunk).await?;
110            TokioAsyncWriteExt::write_all(&mut idx_file, &chunk).await?;
111        }
112        //idx_file.flush().await?;
113        TokioAsyncWriteExt::flush(&mut idx_file).await?;
114        Ok::<(), GitError>(())
115    });
116
117    //build idx
118    pack_encoder.encode_idx_file().await?;
119
120    let idx_write_result = idx_writer
121        .await
122        .map_err(|e| GitError::PackEncodeError(format!("idx writer task join error: {e}")))?;
123    idx_write_result?;
124
125    Ok(())
126}
127
128/// Encode header of pack file (12 byte)<br>
129/// Content: 'PACK', Version(2), number of objects
130fn encode_header(object_number: usize) -> Vec<u8> {
131    let mut result: Vec<u8> = vec![
132        b'P', b'A', b'C', b'K', // The logotype of the Pack File
133        0, 0, 0, 2, // generates version 2 only.
134    ];
135    assert_ne!(object_number, 0); // guarantee self.number_of_objects!=0
136    assert!(object_number <= u32::MAX as usize);
137    //TODO: GitError:numbers of objects should < 4G ,
138    result.append((object_number as u32).to_be_bytes().to_vec().as_mut()); // to 4 bytes (network byte order aka. big-endian)
139    result
140}
141
142/// Encode offset of delta object
143fn encode_offset(mut value: usize) -> Vec<u8> {
144    assert_ne!(value, 0, "offset can't be zero");
145    let mut bytes = Vec::new();
146
147    bytes.push((value & 0x7F) as u8);
148    value >>= 7;
149    while value != 0 {
150        value -= 1;
151        let byte = (value & 0x7F) as u8 | 0x80; // set first bit one
152        value >>= 7;
153        bytes.push(byte);
154    }
155    bytes.reverse();
156    bytes
157}
158
159/// Encode one object, and update the hash
160/// @offset: offset of this object if it's a delta object. For other object, it's None
161fn encode_one_object(entry: &Entry, offset: Option<usize>) -> Result<Vec<u8>, GitError> {
162    // try encode as delta
163    let obj_data = &entry.data;
164    let obj_data_len = obj_data.len();
165    let obj_type_number = entry.obj_type.to_pack_type_u8()?;
166
167    let mut encoded_data = Vec::new();
168
169    // **header** encoding
170    let mut header_data = vec![(0x80 | (obj_type_number << 4)) + (obj_data_len & 0x0f) as u8];
171    let mut size = obj_data_len >> 4; // 4 bit has been used in first byte
172    if size > 0 {
173        while size > 0 {
174            if size >> 7 > 0 {
175                header_data.push((0x80 | size) as u8);
176                size >>= 7;
177            } else {
178                header_data.push(size as u8);
179                break;
180            }
181        }
182    } else {
183        header_data.push(0);
184    }
185    encoded_data.extend(header_data);
186
187    // **offset** encoding
188    if entry.obj_type == ObjectType::OffsetDelta || entry.obj_type == ObjectType::OffsetZstdelta {
189        let offset_data = encode_offset(offset.unwrap());
190        encoded_data.extend(offset_data);
191    } else if entry.obj_type == ObjectType::HashDelta {
192        unreachable!("unsupported type")
193    }
194
195    // **data** encoding, need zlib compress
196    let mut inflate = ZlibEncoder::new(Vec::new(), flate2::Compression::default());
197    inflate
198        .write_all(obj_data)
199        .expect("zlib compress should never failed");
200    inflate.flush().expect("zlib flush should never failed");
201    let compressed_data = inflate.finish().expect("zlib compress should never failed");
202    // self.write_all_and_update(&compressed_data).await;
203    encoded_data.extend(compressed_data);
204    Ok(encoded_data)
205}
206
207/// Magic sort function for entries
208fn magic_sort(a: &MetaAttached<Entry, EntryMeta>, b: &MetaAttached<Entry, EntryMeta>) -> Ordering {
209    let path_a = a.meta.file_path.as_ref();
210    let path_b = b.meta.file_path.as_ref();
211
212    // 1. Handle path existence: entries with paths sort first
213    match (path_a, path_b) {
214        (Some(pa), Some(pb)) => {
215            let pa = Path::new(pa);
216            let pb = Path::new(pb);
217
218            // 1. Compare parent directory paths
219            let dir_ord = pa.parent().cmp(&pb.parent());
220            if dir_ord != Ordering::Equal {
221                return dir_ord;
222            }
223
224            // 2. Compare filenames (natural sort)
225            let name_a = pa.file_name().unwrap_or_default().to_string_lossy();
226            let name_b = pb.file_name().unwrap_or_default().to_string_lossy();
227            let name_ord = compare(&name_a, &name_b);
228            if name_ord != Ordering::Equal {
229                return name_ord;
230            }
231        }
232        (Some(_), None) => return Ordering::Less, // entries with paths sort first
233        (None, Some(_)) => return Ordering::Greater, // entries without paths sort last
234        (None, None) => {}
235    }
236
237    let ord = b.inner.data.len().cmp(&a.inner.data.len());
238    if ord != Ordering::Equal {
239        return ord;
240    }
241
242    // fallback pointer order (newest first)
243    (a as *const MetaAttached<Entry, EntryMeta>).cmp(&(b as *const MetaAttached<Entry, EntryMeta>))
244}
245
246/// Calculate hash of data
247fn calc_hash(data: &[u8]) -> u64 {
248    let mut hasher = AHasher::default();
249    data.hash(&mut hasher);
250    hasher.finish()
251}
252
253/// Cheap check if two byte slices are similar by comparing their hashes of the first 128 bytes.
254fn cheap_similar(a: &[u8], b: &[u8]) -> bool {
255    let k = a.len().min(b.len()).min(128);
256    if k == 0 {
257        return false;
258    }
259    calc_hash(&a[..k]) == calc_hash(&b[..k])
260}
261
262impl PackEncoder {
263    pub fn new(object_number: usize, window_size: usize, sender: mpsc::Sender<Vec<u8>>) -> Self {
264        PackEncoder {
265            object_number,
266            window_size,
267            process_index: 0,
268            // window: VecDeque::with_capacity(window_size),
269            pack_sender: Some(sender),
270            idx_sender: None,
271            idx_entries: None,
272            inner_offset: 12, // start  after 12 bytes pack header(signature + version + object count).
273            inner_hash: HashAlgorithm::new(), // introduce different hash algorithm
274            final_hash: None,
275            start_encoding: false,
276        }
277    }
278
279    pub fn new_with_idx(
280        object_number: usize,
281        window_size: usize,
282        pack_sender: mpsc::Sender<Vec<u8>>,
283        idx_sender: mpsc::Sender<Vec<u8>>,
284    ) -> Self {
285        PackEncoder {
286            //path: Some(path),
287            object_number,
288            window_size,
289            process_index: 0,
290            // window: VecDeque::with_capacity(window_size),
291            pack_sender: Some(pack_sender),
292            idx_sender: Some(idx_sender),
293            idx_entries: None,
294            inner_offset: 12, // start  after 12 bytes pack header(signature + version + object count).
295            inner_hash: HashAlgorithm::new(), // introduce different hash algorithm
296            final_hash: None,
297            start_encoding: false,
298        }
299    }
300
301    pub fn drop_sender(&mut self) {
302        self.pack_sender.take(); // Take the sender out, dropping it
303    }
304
305    pub async fn send_data(&mut self, data: Vec<u8>) {
306        if let Some(sender) = &self.pack_sender {
307            sender.send(data).await.unwrap();
308        }
309    }
310
311    /// Get the hash of the pack file. if the pack file is not finished, return None
312    pub fn get_hash(&self) -> Option<ObjectHash> {
313        self.final_hash
314    }
315
316    /// Encodes entries into a pack file with delta objects and outputs them through the specified writer.
317    /// # Arguments
318    /// - `rx` - A receiver channel (`mpsc::Receiver<Entry>`) from which entries to be encoded are received.
319    /// # Returns
320    /// Returns `Ok(())` if encoding is successful, or a `GitError` in case of failure.
321    /// - Returns a `GitError` if there is a failure during the encoding process.
322    /// - Returns `PackEncodeError` if an encoding operation is already in progress.
323    pub async fn encode(
324        &mut self,
325        entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
326    ) -> Result<(), GitError> {
327        //self.inner_encode(entry_rx, false).await
328        if self.window_size == 0 {
329            self.parallel_encode(entry_rx).await
330        } else {
331            self.inner_encode(entry_rx, false).await
332        }
333    }
334
335    /// Encode with zstdelta
336    pub async fn encode_with_zstdelta(
337        &mut self,
338        entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
339    ) -> Result<(), GitError> {
340        self.inner_encode(entry_rx, true).await
341    }
342
343    /// Delta selection heuristics are based on:
344    ///   https://github.com/git/git/blob/master/Documentation/technical/pack-heuristics.adoc
345    async fn inner_encode(
346        &mut self,
347        mut entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
348        enable_zstdelta: bool,
349    ) -> Result<(), GitError> {
350        let head = encode_header(self.object_number);
351        self.send_data(head.clone()).await;
352        self.inner_hash.update(&head);
353
354        // ensure only one decode can only invoke once
355        if self.start_encoding {
356            return Err(GitError::PackEncodeError(
357                "encoding operation is already in progress".to_string(),
358            ));
359        }
360
361        let mut commits: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
362        let mut trees: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
363        let mut blobs: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
364        let mut tags: Vec<MetaAttached<Entry, EntryMeta>> = Vec::new();
365        while let Some(entry) = entry_rx.recv().await {
366            match entry.inner.obj_type {
367                ObjectType::Commit => {
368                    commits.push(entry);
369                }
370                ObjectType::Tree => {
371                    trees.push(entry);
372                }
373                ObjectType::Blob => {
374                    blobs.push(entry);
375                }
376                ObjectType::Tag => {
377                    tags.push(entry);
378                }
379                _ => {
380                    return Err(GitError::PackEncodeError(format!(
381                        "object type `{}` is not supported by delta-window pack encoding",
382                        entry.inner.obj_type
383                    )));
384                }
385            }
386        }
387
388        commits.sort_by(magic_sort);
389        trees.sort_by(magic_sort);
390        blobs.sort_by(magic_sort);
391        tags.sort_by(magic_sort);
392        tracing::info!(
393            "numbers :  commits: {:?} trees: {:?} blobs:{:?} tag :{:?}",
394            commits.len(),
395            trees.len(),
396            blobs.len(),
397            tags.len()
398        );
399
400        // parallel encoding vec with different object_type
401        let (commit_results, tree_results, blob_results, tag_results) = tokio::try_join!(
402            tokio::task::spawn_blocking(move || {
403                Self::try_as_offset_delta(
404                    commits
405                        .into_iter()
406                        .map(|entry_with_meta| entry_with_meta.inner)
407                        .collect(),
408                    10,
409                    enable_zstdelta,
410                )
411            }),
412            tokio::task::spawn_blocking(move || {
413                Self::try_as_offset_delta(
414                    trees
415                        .into_iter()
416                        .map(|entry_with_meta| entry_with_meta.inner)
417                        .collect(),
418                    10,
419                    enable_zstdelta,
420                )
421            }),
422            tokio::task::spawn_blocking(move || {
423                Self::try_as_offset_delta(
424                    blobs
425                        .into_iter()
426                        .map(|entry_with_meta| entry_with_meta.inner)
427                        .collect(),
428                    10,
429                    enable_zstdelta,
430                )
431            }),
432            tokio::task::spawn_blocking(move || {
433                Self::try_as_offset_delta(
434                    tags.into_iter()
435                        .map(|entry_with_meta| entry_with_meta.inner)
436                        .collect(),
437                    10,
438                    enable_zstdelta,
439                )
440            }),
441        )
442        .map_err(|e| GitError::PackEncodeError(format!("Task join error: {e}")))?;
443
444        let commit_res = commit_results?;
445        let tree_res = tree_results?;
446        let blob_res = blob_results?;
447        let tag_res = tag_results?;
448
449        let mut all_res = vec![commit_res, tree_res, blob_res, tag_res];
450
451        let mut idx_entries = Vec::new();
452        for res in &mut all_res {
453            for data in res {
454                data.1.offset = self.inner_offset as u64;
455                self.write_all_and_update(&data.0).await;
456                idx_entries.push(data.1.clone());
457            }
458        }
459
460        self.idx_entries = Some(idx_entries);
461
462        // Hash signature
463        let hash_result = self.inner_hash.clone().finalize();
464        self.final_hash = Some(ObjectHash::from_bytes(&hash_result).unwrap());
465        self.send_data(hash_result.to_vec()).await;
466
467        self.drop_sender();
468        Ok(())
469    }
470
471    /// Try to encode as delta using objects in window
472    /// delta & zstdelta have been gathered here
473    /// Refs: https://sapling-scm.com/docs/dev/internals/zstdelta/
474    /// the sliding window was moved here
475    /// # Returns
476    /// - Return (Vec<Vec<u8>) if success make delta
477    /// - Return (None) if didn't delta,
478    fn try_as_offset_delta(
479        mut bucket: Vec<Entry>,
480        window_size: usize,
481        enable_zstdelta: bool,
482    ) -> Result<Vec<(Vec<u8>, IndexEntry)>, GitError> {
483        let mut current_offset = 0usize;
484        let mut window: VecDeque<(Entry, usize)> = VecDeque::with_capacity(window_size);
485        let mut res: Vec<(Vec<u8>, IndexEntry)> = Vec::new();
486        //let mut idx_entries: Vec<IndexEntry> = Vec::new();
487
488        for entry in bucket.iter_mut() {
489            //let entry_for_window = entry.clone();
490            // 每次循环重置最佳基对象选择
491            let mut best_base: Option<&(Entry, usize)> = None;
492            let mut best_rate: f64 = 0.0;
493            let tie_epsilon: f64 = 0.15;
494
495            let candidates: Vec<_> = window
496                .par_iter()
497                .with_min_len(3)
498                .filter_map(|try_base| {
499                    if try_base.0.obj_type != entry.obj_type {
500                        return None;
501                    }
502
503                    if try_base.0.chain_len >= MAX_CHAIN_LEN {
504                        return None;
505                    }
506
507                    if try_base.0.hash == entry.hash {
508                        return None;
509                    }
510
511                    let sym_ratio = (try_base.0.data.len().min(entry.data.len()) as f64)
512                        / (try_base.0.data.len().max(entry.data.len()) as f64);
513                    if sym_ratio < 0.5 {
514                        return None;
515                    }
516
517                    if !cheap_similar(&try_base.0.data, &entry.data) {
518                        return None;
519                    }
520
521                    let rate = if (try_base.0.data.len() + entry.data.len()) / 2 > 64 {
522                        delta::heuristic_encode_rate_parallel(&try_base.0.data, &entry.data)
523                    } else {
524                        delta::encode_rate(&try_base.0.data, &entry.data)
525                        // let try_delta_obj = zstdelta::diff(&try_base.0.data, &entry.data).unwrap();
526                        // 1.0 - try_delta_obj.len() as f64 / entry.data.len() as f64
527                    };
528
529                    if rate > MIN_DELTA_RATE {
530                        Some((rate, try_base))
531                    } else {
532                        None
533                    }
534                })
535                .collect();
536
537            for (rate, try_base) in candidates {
538                match best_base {
539                    None => {
540                        best_rate = rate;
541                        //best_base_offset = current_offset - try_base.1;
542                        best_base = Some(try_base);
543                    }
544                    Some(best_base_ref) => {
545                        let is_better = if rate > best_rate + tie_epsilon {
546                            true
547                        } else if (rate - best_rate).abs() <= tie_epsilon {
548                            try_base.0.chain_len > best_base_ref.0.chain_len
549                        } else {
550                            false
551                        };
552
553                        if is_better {
554                            best_rate = rate;
555                            best_base = Some(try_base);
556                        }
557                    }
558                }
559            }
560
561            let mut entry_for_window = entry.clone();
562
563            let offset = best_base.map(|best_base| {
564                let delta = if enable_zstdelta {
565                    entry.obj_type = ObjectType::OffsetZstdelta;
566                    zstdelta::diff(&best_base.0.data, &entry.data)
567                        .map_err(|e| {
568                            GitError::DeltaObjectError(format!("zstdelta diff failed: {e}"))
569                        })
570                        .unwrap()
571                } else {
572                    entry.obj_type = ObjectType::OffsetDelta;
573                    delta::encode(&best_base.0.data, &entry.data)
574                };
575                //entry.obj_type = ObjectType::OffsetDelta;
576                entry.data = delta;
577                entry.chain_len = best_base.0.chain_len + 1;
578                current_offset - best_base.1
579            });
580
581            entry_for_window.chain_len = entry.chain_len;
582            let obj_data = encode_one_object(entry, offset)?;
583            window.push_back((entry_for_window, current_offset));
584            if window.len() > window_size {
585                window.pop_front();
586            }
587            res.push((obj_data.clone(), IndexEntry::new(entry, 0)));
588            current_offset += obj_data.len();
589        }
590        Ok(res)
591    }
592
593    /// Parallel encode with rayon, only works when window_size == 0 (no delta)
594    pub async fn parallel_encode(
595        &mut self,
596        mut entry_rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
597    ) -> Result<(), GitError> {
598        if self.window_size != 0 {
599            return Err(GitError::PackEncodeError(
600                "parallel encode only works when window_size == 0".to_string(),
601            ));
602        }
603
604        let head = encode_header(self.object_number);
605        self.send_data(head.clone()).await;
606        self.inner_hash.update(&head);
607
608        // ensure only one decode can only invoke once
609        if self.start_encoding {
610            return Err(GitError::PackEncodeError(
611                "encoding operation is already in progress".to_string(),
612            ));
613        }
614
615        let mut idx_entries = Vec::new();
616        let batch_size = usize::max(1000, entry_rx.max_capacity() / 10); // A temporary value, not optimized
617        tracing::info!("encode with batch size: {}", batch_size);
618        loop {
619            let mut batch_entries = Vec::with_capacity(batch_size);
620            time_it!("parallel encode: receive batch", {
621                for _ in 0..batch_size {
622                    match entry_rx.recv().await {
623                        Some(entry) => {
624                            if entry.inner.obj_type.is_ai_object() {
625                                return Err(GitError::PackEncodeError(format!(
626                                    "AI object type `{}` cannot be encoded in a pack file",
627                                    entry.inner.obj_type
628                                )));
629                            }
630                            batch_entries.push(entry.inner);
631                            self.process_index += 1;
632                        }
633                        None => break,
634                    }
635                }
636            });
637
638            if batch_entries.is_empty() {
639                break;
640            }
641
642            // use `collect` will return result in order, refs: https://github.com/rayon-rs/rayon/issues/551#issuecomment-371657900
643            let batch_result: Vec<Result<(Vec<u8>, IndexEntry), GitError>> =
644                time_it!("parallel encode: encode batch", {
645                    batch_entries
646                        .par_iter()
647                        .map(|entry| {
648                            encode_one_object(entry, None)
649                                .map(|encoded| (encoded, IndexEntry::new(entry, 0)))
650                        })
651                        .collect()
652                });
653
654            time_it!("parallel encode: write batch", {
655                for obj_data in batch_result {
656                    let mut obj_data = obj_data?;
657                    obj_data.1.offset = self.inner_offset as u64;
658                    self.write_all_and_update(&obj_data.0).await;
659                    idx_entries.push(obj_data.1);
660                }
661            });
662        }
663
664        tracing::debug!("parallel encode idx entries: {:?}", idx_entries.len());
665        if self.process_index != self.object_number {
666            panic!(
667                "not all objects are encoded, process:{}, total:{}",
668                self.process_index, self.object_number
669            );
670        }
671
672        // hash signature
673        let hash_result = self.inner_hash.clone().finalize();
674        self.final_hash = Some(ObjectHash::from_bytes(&hash_result).unwrap());
675        self.send_data(hash_result.to_vec()).await;
676        self.drop_sender();
677
678        self.idx_entries = Some(idx_entries);
679        Ok(())
680    }
681
682    /// Write data to writer and update hash & offset
683    async fn write_all_and_update(&mut self, data: &[u8]) {
684        self.inner_hash.update(data);
685        self.inner_offset += data.len();
686        self.send_data(data.to_vec()).await;
687    }
688
689    async fn generate_idx_file(&mut self) -> Result<(), GitError> {
690        let final_hash = self.final_hash
691            .ok_or(GitError::PackEncodeError("final_hash is missing,The pack file must be generated before the index file is produced.".into()))?;
692        let idx_entries = self.idx_entries.clone().ok_or(GitError::PackEncodeError(
693            "The pack file must be generated before the index file is produced.".into(),
694        ))?;
695        let mut idx_builder = IdxBuilder::new(
696            self.object_number,
697            self.idx_sender.clone().unwrap(),
698            final_hash,
699        );
700        idx_builder.write_idx(idx_entries).await?;
701        Ok(())
702    }
703
704    /// async version of encode, result data will be returned by JoinHandle.
705    /// It will consume PackEncoder, so you can't use it after calling this function.
706    /// when window_size = 0, it executes parallel_encode which retains stream transmission
707    /// when window_size = 0,it executes encode which uses magic sort and delta.
708    /// It seems that all other modules rely on this api
709    pub async fn encode_async(
710        mut self,
711        rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
712    ) -> Result<JoinHandle<()>, GitError> {
713        Ok(tokio::spawn(async move {
714            if self.window_size == 0 {
715                self.parallel_encode(rx).await.unwrap()
716            } else {
717                self.encode(rx).await.unwrap()
718            }
719        }))
720    }
721
722    /// async version of encode_with_zstdelta, result data will be returned by JoinHandle.
723    pub async fn encode_async_with_zstdelta(
724        mut self,
725        rx: mpsc::Receiver<MetaAttached<Entry, EntryMeta>>,
726    ) -> Result<JoinHandle<()>, GitError> {
727        Ok(tokio::spawn(async move {
728            // Do not use parallel encode with zstdelta because it make no sense.
729            self.encode_with_zstdelta(rx).await.unwrap()
730        }))
731    }
732
733    /// Generate idx file after pack file has been generated
734    pub async fn encode_idx_file(&mut self) -> Result<(), GitError> {
735        if self.idx_sender.is_none() {
736            return Err(GitError::PackEncodeError(String::from(
737                "idx sender is none",
738            )));
739        }
740        self.generate_idx_file().await?;
741        // drop sender so downstream consumer can finish
742        self.idx_sender.take();
743        Ok(())
744    }
745}
746
747#[cfg(test)]
748mod tests {
749    use std::{env, io::Cursor, path::PathBuf, sync::Arc, time::Instant};
750
751    use tempfile::tempdir;
752    use tokio::sync::Mutex;
753
754    use super::*;
755    use crate::{
756        hash::{HashKind, ObjectHash, set_hash_kind_for_test},
757        internal::{
758            object::{blob::Blob, types::ObjectType},
759            pack::{Pack, tests::init_logger, utils::read_offset_encoding},
760        },
761        time_it,
762    };
763
764    /// Check if the given data is a valid pack file format by attempting to decode it.
765    fn check_format(data: &Vec<u8>) {
766        // Use a smaller cap on 32-bit targets to avoid usize overflow.
767        let max_pack_size_u64 = if cfg!(target_pointer_width = "64") {
768            6u64 * 1024 * 1024 * 1024
769        } else {
770            2u64 * 1024 * 1024 * 1024
771        };
772        let max_pack_size = usize::try_from(max_pack_size_u64).unwrap_or_else(|_| {
773            panic!(
774                "internal assertion failed: pack size cap {} does not fit in usize on this \
775                 target; this should be unreachable given the target_pointer_width configuration",
776                max_pack_size_u64
777            )
778        });
779        let mut p = Pack::new(
780            None,
781            Some(max_pack_size), // 6GB on 64-bit, 2GB on 32-bit
782            Some(PathBuf::from("/tmp/.cache_temp")),
783            true,
784        );
785        let mut reader = Cursor::new(data);
786        tracing::debug!("start check format");
787        p.decode(&mut reader, |_| {}, None::<fn(ObjectHash)>)
788            .expect("pack file format error");
789    }
790
791    #[tokio::test]
792    async fn test_pack_encoder() {
793        let _guard = set_hash_kind_for_test(HashKind::Sha1);
794        async fn encode_once(window_size: usize) -> Vec<u8> {
795            let (tx, mut rx) = mpsc::channel(100);
796            let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
797
798            // make some different objects, or decode will fail
799            let str_vec = vec!["hello, word", "hello, world.", "!", "123141251251"];
800            let encoder = PackEncoder::new(str_vec.len(), window_size, tx);
801            encoder.encode_async(entry_rx).await.unwrap();
802
803            for str in str_vec {
804                let blob = Blob::from_content(str);
805                let entry: Entry = blob.into();
806                entry_tx
807                    .send(MetaAttached {
808                        inner: entry,
809                        meta: EntryMeta::new(),
810                    })
811                    .await
812                    .unwrap();
813            }
814            drop(entry_tx);
815            // assert!(encoder.get_hash().is_some());
816            let mut result = Vec::new();
817            while let Some(chunk) = rx.recv().await {
818                result.extend(chunk);
819            }
820            result
821        }
822
823        // without delta
824        let pack_without_delta = encode_once(0).await;
825        let pack_without_delta_size = pack_without_delta.len();
826        check_format(&pack_without_delta);
827
828        // with delta
829        let pack_with_delta = encode_once(4).await;
830        assert!(pack_with_delta.len() <= pack_without_delta_size);
831        check_format(&pack_with_delta);
832    }
833    #[tokio::test]
834    async fn test_pack_encoder_sha256() {
835        let _guard = set_hash_kind_for_test(HashKind::Sha256);
836
837        async fn encode_once(window_size: usize) -> Vec<u8> {
838            let (tx, mut rx) = mpsc::channel(100);
839            let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
840
841            let str_vec = vec!["hello, word", "hello, world.", "!", "123141251251"];
842            let encoder = PackEncoder::new(str_vec.len(), window_size, tx);
843            encoder.encode_async(entry_rx).await.unwrap();
844
845            for s in str_vec {
846                let blob = Blob::from_content(s);
847                let entry: Entry = blob.into();
848                entry_tx
849                    .send(MetaAttached {
850                        inner: entry,
851                        meta: EntryMeta::new(),
852                    })
853                    .await
854                    .unwrap();
855            }
856            drop(entry_tx);
857
858            let mut result = Vec::new();
859            while let Some(chunk) = rx.recv().await {
860                result.extend(chunk);
861            }
862            result
863        }
864
865        // without delta
866        let pack_without_delta = encode_once(0).await;
867        let pack_without_delta_size = pack_without_delta.len();
868        check_format(&pack_without_delta);
869
870        // with delta
871        let pack_with_delta = encode_once(4).await;
872        assert!(pack_with_delta.len() <= pack_without_delta_size);
873        check_format(&pack_with_delta);
874    }
875
876    #[tokio::test]
877    async fn test_pack_encoder_rejects_unencodable_ai_type_parallel() {
878        let (tx, _rx) = mpsc::channel(8);
879        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
880        let mut encoder = PackEncoder::new(1, 0, tx);
881
882        let mut entry: Entry = Blob::from_content("ai").into();
883        entry.obj_type = ObjectType::Task;
884        entry_tx
885            .send(MetaAttached {
886                inner: entry,
887                meta: EntryMeta::new(),
888            })
889            .await
890            .expect("send entry");
891        drop(entry_tx);
892
893        let err = encoder
894            .encode(entry_rx)
895            .await
896            .expect_err("must reject AI pack type");
897        assert!(matches!(err, GitError::PackEncodeError(_)));
898    }
899
900    #[tokio::test]
901    async fn test_pack_encoder_rejects_unencodable_ai_type_delta_window() {
902        let (tx, _rx) = mpsc::channel(8);
903        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1);
904        let mut encoder = PackEncoder::new(1, 10, tx);
905
906        let mut entry: Entry = Blob::from_content("ai").into();
907        entry.obj_type = ObjectType::Task;
908        entry_tx
909            .send(MetaAttached {
910                inner: entry,
911                meta: EntryMeta::new(),
912            })
913            .await
914            .expect("send entry");
915        drop(entry_tx);
916
917        let err = encoder
918            .encode(entry_rx)
919            .await
920            .expect_err("must reject AI pack type");
921        assert!(matches!(err, GitError::PackEncodeError(_)));
922    }
923
924    async fn get_entries_for_test() -> Arc<Mutex<Vec<Entry>>> {
925        let source = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
926            .join("tests/data/packs/encode-test-sha1.pack");
927
928        let mut p = Pack::new(None, None, Some(PathBuf::from("/tmp/.cache_temp")), true);
929
930        let f = std::fs::File::open(&source).unwrap();
931        tracing::info!("pack file size: {}", f.metadata().unwrap().len());
932        let mut reader = std::io::BufReader::new(f);
933        let entries = Arc::new(Mutex::new(Vec::new()));
934        let entries_clone = entries.clone();
935        p.decode(
936            &mut reader,
937            move |entry| {
938                let mut entries = entries_clone.blocking_lock();
939                entries.push(entry.inner);
940            },
941            None::<fn(ObjectHash)>,
942        )
943        .unwrap();
944        assert_eq!(p.number, entries.lock().await.len());
945        tracing::info!("total entries: {}", p.number);
946        drop(p);
947
948        entries
949    }
950    async fn get_entries_for_test_sha256() -> Arc<Mutex<Vec<Entry>>> {
951        let source = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
952            .join("tests/data/packs/encode-test-sha256.pack");
953
954        let mut p = Pack::new(None, None, Some(PathBuf::from("/tmp/.cache_temp")), true);
955
956        let f = std::fs::File::open(&source).unwrap();
957        tracing::info!("pack file size: {}", f.metadata().unwrap().len());
958        let mut reader = std::io::BufReader::new(f);
959        let entries = Arc::new(Mutex::new(Vec::new()));
960        let entries_clone = entries.clone();
961        p.decode(
962            &mut reader,
963            move |entry| {
964                let mut entries = entries_clone.blocking_lock();
965                entries.push(entry.inner);
966            },
967            None::<fn(ObjectHash)>,
968        )
969        .unwrap();
970        assert_eq!(p.number, entries.lock().await.len());
971        tracing::info!("total entries: {}", p.number);
972        drop(p);
973
974        entries
975    }
976
977    #[tokio::test]
978    async fn test_pack_encoder_parallel_large_file() {
979        let _guard = set_hash_kind_for_test(HashKind::Sha1);
980        init_logger();
981
982        let start = Instant::now();
983        let entries = get_entries_for_test().await;
984        let entries_number = entries.lock().await.len();
985
986        let total_original_size: usize = entries
987            .lock()
988            .await
989            .iter()
990            .map(|entry| entry.data.len())
991            .sum();
992
993        // encode entries with parallel
994        let (tx, mut rx) = mpsc::channel(1_000_000);
995        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1_000_000);
996
997        let mut encoder = PackEncoder::new(entries_number, 0, tx);
998        tokio::spawn(async move {
999            time_it!("test parallel encode", {
1000                encoder.parallel_encode(entry_rx).await.unwrap();
1001            });
1002        });
1003
1004        // spawn a task to send entries
1005        tokio::spawn(async move {
1006            let entries = entries.lock().await;
1007            for entry in entries.iter() {
1008                entry_tx
1009                    .send(MetaAttached {
1010                        inner: entry.clone(),
1011                        meta: EntryMeta::new(),
1012                    })
1013                    .await
1014                    .unwrap();
1015            }
1016            drop(entry_tx);
1017            tracing::info!("all entries sent");
1018        });
1019
1020        let mut result = Vec::new();
1021        while let Some(chunk) = rx.recv().await {
1022            result.extend(chunk);
1023        }
1024
1025        let pack_size = result.len();
1026        let compression_rate = if total_original_size > 0 {
1027            1.0 - (pack_size as f64 / total_original_size as f64)
1028        } else {
1029            0.0
1030        };
1031
1032        let duration = start.elapsed();
1033        tracing::info!("test executed in: {:.2?}", duration);
1034        tracing::info!("new pack file size: {}", result.len());
1035        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1036        // check format
1037        check_format(&result);
1038    }
1039    #[tokio::test]
1040    async fn test_pack_encoder_parallel_large_file_sha256() {
1041        let _guard = set_hash_kind_for_test(HashKind::Sha256);
1042        init_logger();
1043
1044        let start = Instant::now();
1045        // use sha256 pack file for testing
1046        let entries = get_entries_for_test_sha256().await;
1047        let entries_number = entries.lock().await.len();
1048
1049        let total_original_size: usize = entries
1050            .lock()
1051            .await
1052            .iter()
1053            .map(|entry| entry.data.len())
1054            .sum();
1055
1056        let (tx, mut rx) = mpsc::channel(1_000_000);
1057        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(1_000_000);
1058
1059        let mut encoder = PackEncoder::new(entries_number, 0, tx);
1060        tokio::spawn(async move {
1061            time_it!("test parallel encode sha256", {
1062                encoder.parallel_encode(entry_rx).await.unwrap();
1063            });
1064        });
1065
1066        tokio::spawn(async move {
1067            let entries = entries.lock().await;
1068            for entry in entries.iter() {
1069                entry_tx
1070                    .send(MetaAttached {
1071                        inner: entry.clone(),
1072                        meta: EntryMeta::new(),
1073                    })
1074                    .await
1075                    .unwrap();
1076            }
1077            drop(entry_tx);
1078            tracing::info!("all entries sent");
1079        });
1080
1081        let mut result = Vec::new();
1082        while let Some(chunk) = rx.recv().await {
1083            result.extend(chunk);
1084        }
1085
1086        let pack_size = result.len();
1087        let compression_rate = if total_original_size > 0 {
1088            1.0 - (pack_size as f64 / total_original_size as f64)
1089        } else {
1090            0.0
1091        };
1092
1093        let duration = start.elapsed();
1094        tracing::info!("sha256 test executed in: {:.2?}", duration);
1095        tracing::info!("new pack file size: {}", result.len());
1096        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1097        check_format(&result);
1098    }
1099
1100    #[tokio::test]
1101    async fn test_pack_encoder_large_file() {
1102        let _guard = set_hash_kind_for_test(HashKind::Sha1);
1103        init_logger();
1104        let entries = get_entries_for_test().await;
1105        let entries_number = entries.lock().await.len();
1106
1107        let total_original_size: usize = entries
1108            .lock()
1109            .await
1110            .iter()
1111            .map(|entry| entry.data.len())
1112            .sum();
1113
1114        let start = Instant::now();
1115        // encode entries
1116        let (tx, mut rx) = mpsc::channel(100_000);
1117        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1118
1119        let mut encoder = PackEncoder::new(entries_number, 0, tx);
1120        tokio::spawn(async move {
1121            time_it!("test encode no parallel", {
1122                encoder.encode(entry_rx).await.unwrap();
1123            });
1124        });
1125
1126        // spawn a task to send entries
1127        tokio::spawn(async move {
1128            let entries = entries.lock().await;
1129            for entry in entries.iter() {
1130                entry_tx
1131                    .send(MetaAttached {
1132                        inner: entry.clone(),
1133                        meta: EntryMeta::new(),
1134                    })
1135                    .await
1136                    .unwrap();
1137            }
1138            drop(entry_tx);
1139            tracing::info!("all entries sent");
1140        });
1141
1142        // // only receive data
1143        // while (rx.recv().await).is_some() {
1144        //     // do nothing
1145        // }
1146
1147        let mut result = Vec::new();
1148        while let Some(chunk) = rx.recv().await {
1149            result.extend(chunk);
1150        }
1151
1152        let pack_size = result.len();
1153        let compression_rate = if total_original_size > 0 {
1154            1.0 - (pack_size as f64 / total_original_size as f64)
1155        } else {
1156            0.0
1157        };
1158
1159        let duration = start.elapsed();
1160        tracing::info!("test executed in: {:.2?}", duration);
1161        tracing::info!("new pack file size: {}", pack_size);
1162        tracing::info!("original total size: {}", total_original_size);
1163        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1164        tracing::info!(
1165            "space saved: {} bytes",
1166            total_original_size.saturating_sub(pack_size)
1167        );
1168    }
1169    #[tokio::test]
1170    async fn test_pack_encoder_large_file_sha256() {
1171        let _guard = set_hash_kind_for_test(HashKind::Sha256);
1172        init_logger();
1173        let entries = get_entries_for_test_sha256().await;
1174        let entries_number = entries.lock().await.len();
1175
1176        let total_original_size: usize = entries
1177            .lock()
1178            .await
1179            .iter()
1180            .map(|entry| entry.data.len())
1181            .sum();
1182
1183        let start = Instant::now();
1184        // encode entries
1185        let (tx, mut rx) = mpsc::channel(100_000);
1186        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1187
1188        let mut encoder = PackEncoder::new(entries_number, 0, tx);
1189        tokio::spawn(async move {
1190            time_it!("test encode no parallel sha256", {
1191                encoder.encode(entry_rx).await.unwrap();
1192            });
1193        });
1194
1195        // spawn a task to send entries
1196        tokio::spawn(async move {
1197            let entries = entries.lock().await;
1198            for entry in entries.iter() {
1199                entry_tx
1200                    .send(MetaAttached {
1201                        inner: entry.clone(),
1202                        meta: EntryMeta::new(),
1203                    })
1204                    .await
1205                    .unwrap();
1206            }
1207            drop(entry_tx);
1208            tracing::info!("all entries sent");
1209        });
1210
1211        // // only receive data
1212        // while (rx.recv().await).is_some() {
1213        //     // do nothing
1214        // }
1215
1216        let mut result = Vec::new();
1217        while let Some(chunk) = rx.recv().await {
1218            result.extend(chunk);
1219        }
1220
1221        let pack_size = result.len();
1222        let compression_rate = if total_original_size > 0 {
1223            1.0 - (pack_size as f64 / total_original_size as f64)
1224        } else {
1225            0.0
1226        };
1227
1228        let duration = start.elapsed();
1229        tracing::info!("test executed in: {:.2?}", duration);
1230        tracing::info!("new pack file size: {}", pack_size);
1231        tracing::info!("original total size: {}", total_original_size);
1232        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1233        tracing::info!(
1234            "space saved: {} bytes",
1235            total_original_size.saturating_sub(pack_size)
1236        );
1237    }
1238
1239    #[tokio::test]
1240    async fn test_pack_encoder_with_zstdelta() {
1241        let _guard = set_hash_kind_for_test(HashKind::Sha1);
1242        init_logger();
1243        let entries = get_entries_for_test().await;
1244        let entries_number = entries.lock().await.len();
1245
1246        let total_original_size: usize = entries
1247            .lock()
1248            .await
1249            .iter()
1250            .map(|entry| entry.data.len())
1251            .sum();
1252
1253        let start = Instant::now();
1254        let (tx, mut rx) = mpsc::channel(100_000);
1255        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1256
1257        let encoder = PackEncoder::new(entries_number, 10, tx);
1258        encoder.encode_async_with_zstdelta(entry_rx).await.unwrap();
1259
1260        // spawn a task to send entries
1261        tokio::spawn(async move {
1262            let entries = entries.lock().await;
1263            for entry in entries.iter() {
1264                entry_tx
1265                    .send(MetaAttached {
1266                        inner: entry.clone(),
1267                        meta: EntryMeta::new(),
1268                    })
1269                    .await
1270                    .unwrap();
1271            }
1272            drop(entry_tx);
1273            tracing::info!("all entries sent");
1274        });
1275
1276        let mut result = Vec::new();
1277        while let Some(chunk) = rx.recv().await {
1278            result.extend(chunk);
1279        }
1280
1281        let pack_size = result.len();
1282        let compression_rate = if total_original_size > 0 {
1283            1.0 - (pack_size as f64 / total_original_size as f64)
1284        } else {
1285            0.0
1286        };
1287
1288        let duration = start.elapsed();
1289        tracing::info!("test executed in: {:.2?}", duration);
1290        tracing::info!("new pack file size: {}", pack_size);
1291        tracing::info!("original total size: {}", total_original_size);
1292        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1293        tracing::info!(
1294            "space saved: {} bytes",
1295            total_original_size.saturating_sub(pack_size)
1296        );
1297
1298        // check format
1299        check_format(&result);
1300    }
1301    #[tokio::test]
1302    async fn test_pack_encoder_with_zstdelta_sha256() {
1303        let _guard = set_hash_kind_for_test(HashKind::Sha256);
1304        init_logger();
1305        let entries = get_entries_for_test_sha256().await;
1306        let entries_number = entries.lock().await.len();
1307
1308        let total_original_size: usize = entries
1309            .lock()
1310            .await
1311            .iter()
1312            .map(|entry| entry.data.len())
1313            .sum();
1314
1315        let start = Instant::now();
1316        let (tx, mut rx) = mpsc::channel(100_000);
1317        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1318
1319        let encoder = PackEncoder::new(entries_number, 10, tx);
1320        encoder.encode_async_with_zstdelta(entry_rx).await.unwrap();
1321
1322        // spawn a task to send entries
1323        tokio::spawn(async move {
1324            let entries = entries.lock().await;
1325            for entry in entries.iter() {
1326                entry_tx
1327                    .send(MetaAttached {
1328                        inner: entry.clone(),
1329                        meta: EntryMeta::new(),
1330                    })
1331                    .await
1332                    .unwrap();
1333            }
1334            drop(entry_tx);
1335            tracing::info!("all entries sent");
1336        });
1337
1338        let mut result = Vec::new();
1339        while let Some(chunk) = rx.recv().await {
1340            result.extend(chunk);
1341        }
1342
1343        let pack_size = result.len();
1344        let compression_rate = if total_original_size > 0 {
1345            1.0 - (pack_size as f64 / total_original_size as f64)
1346        } else {
1347            0.0
1348        };
1349
1350        let duration = start.elapsed();
1351        tracing::info!("test executed in: {:.2?}", duration);
1352        tracing::info!("new pack file size: {}", pack_size);
1353        tracing::info!("original total size: {}", total_original_size);
1354        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1355        tracing::info!(
1356            "space saved: {} bytes",
1357            total_original_size.saturating_sub(pack_size)
1358        );
1359
1360        // check format
1361        check_format(&result);
1362    }
1363
1364    #[test]
1365    fn test_encode_offset() {
1366        // let value = 11013;
1367        let value = 16389;
1368
1369        let data = encode_offset(value);
1370        println!("{data:?}");
1371        let mut reader = Cursor::new(data);
1372        let (result, _) = read_offset_encoding(&mut reader).unwrap();
1373        println!("result: {result}");
1374        assert_eq!(result, value as u64);
1375    }
1376
1377    #[tokio::test]
1378    async fn test_pack_encoder_large_file_with_delta() {
1379        let _guard = set_hash_kind_for_test(HashKind::Sha1);
1380        init_logger();
1381        let entries = get_entries_for_test().await;
1382        let entries_number = entries.lock().await.len();
1383
1384        let total_original_size: usize = entries
1385            .lock()
1386            .await
1387            .iter()
1388            .map(|entry| entry.data.len())
1389            .sum();
1390
1391        let (tx, mut rx) = mpsc::channel(100_000);
1392        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1393
1394        let encoder = PackEncoder::new(entries_number, 10, tx);
1395
1396        let start = Instant::now(); // 开始时间
1397        encoder.encode_async(entry_rx).await.unwrap();
1398
1399        // spawn a task to send entries
1400        tokio::spawn(async move {
1401            let entries = entries.lock().await;
1402            for entry in entries.iter() {
1403                entry_tx
1404                    .send(MetaAttached {
1405                        inner: entry.clone(),
1406                        meta: EntryMeta::new(),
1407                    })
1408                    .await
1409                    .unwrap();
1410            }
1411            drop(entry_tx);
1412            tracing::info!("all entries sent");
1413        });
1414
1415        let mut result = Vec::new();
1416        while let Some(chunk) = rx.recv().await {
1417            result.extend(chunk);
1418        }
1419
1420        let pack_size = result.len();
1421        let compression_rate = if total_original_size > 0 {
1422            1.0 - (pack_size as f64 / total_original_size as f64)
1423        } else {
1424            0.0
1425        };
1426
1427        let duration = start.elapsed();
1428        tracing::info!("test executed in: {:.2?}", duration);
1429        tracing::info!("new pack file size: {}", pack_size);
1430        tracing::info!("original total size: {}", total_original_size);
1431        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1432        tracing::info!(
1433            "space saved: {} bytes",
1434            total_original_size.saturating_sub(pack_size)
1435        );
1436
1437        // check format
1438        check_format(&result);
1439    }
1440    #[tokio::test]
1441    async fn test_pack_encoder_large_file_with_delta_sha256() {
1442        let _guard = set_hash_kind_for_test(HashKind::Sha256);
1443        init_logger();
1444        let entries = get_entries_for_test_sha256().await;
1445        let entries_number = entries.lock().await.len();
1446
1447        let total_original_size: usize = entries
1448            .lock()
1449            .await
1450            .iter()
1451            .map(|entry| entry.data.len())
1452            .sum();
1453
1454        let (tx, mut rx) = mpsc::channel(100_000);
1455        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1456
1457        let encoder = PackEncoder::new(entries_number, 10, tx);
1458
1459        let start = Instant::now(); // 开始时间
1460        encoder.encode_async(entry_rx).await.unwrap();
1461
1462        // spawn a task to send entries
1463        tokio::spawn(async move {
1464            let entries = entries.lock().await;
1465            for entry in entries.iter() {
1466                entry_tx
1467                    .send(MetaAttached {
1468                        inner: entry.clone(),
1469                        meta: EntryMeta::new(),
1470                    })
1471                    .await
1472                    .unwrap();
1473            }
1474            drop(entry_tx);
1475            tracing::info!("all entries sent");
1476        });
1477
1478        let mut result = Vec::new();
1479        while let Some(chunk) = rx.recv().await {
1480            result.extend(chunk);
1481        }
1482
1483        let pack_size = result.len();
1484        let compression_rate = if total_original_size > 0 {
1485            1.0 - (pack_size as f64 / total_original_size as f64)
1486        } else {
1487            0.0
1488        };
1489
1490        let duration = start.elapsed();
1491        tracing::info!("test executed in: {:.2?}", duration);
1492        tracing::info!("new pack file size: {}", pack_size);
1493        tracing::info!("original total size: {}", total_original_size);
1494        tracing::info!("compression rate: {:.2}%", compression_rate * 100.0);
1495        tracing::info!(
1496            "space saved: {} bytes",
1497            total_original_size.saturating_sub(pack_size)
1498        );
1499
1500        // check format
1501        check_format(&result);
1502    }
1503
1504    #[tokio::test]
1505    async fn test_pack_encoder_output_to_files() {
1506        let _guard = set_hash_kind_for_test(HashKind::Sha1);
1507        init_logger();
1508        let entries = get_entries_for_test().await;
1509        let entries_number = entries.lock().await.len();
1510
1511        let total_original_size: usize = entries
1512            .lock()
1513            .await
1514            .iter()
1515            .map(|entry| entry.data.len())
1516            .sum();
1517
1518        let start = Instant::now();
1519
1520        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1521        // 自动创建临时目录,生命周期结束自动删除
1522        let dir = tempdir().unwrap();
1523        let path = dir.path();
1524
1525        // spawn a task to send entries
1526        tokio::spawn(async move {
1527            let entries = entries.lock().await;
1528            for entry in entries.iter() {
1529                entry_tx
1530                    .send(MetaAttached {
1531                        inner: entry.clone(),
1532                        meta: EntryMeta::new(),
1533                    })
1534                    .await
1535                    .unwrap();
1536            }
1537            drop(entry_tx);
1538            tracing::info!("all entries sent");
1539        });
1540
1541        encode_and_output_to_files(entry_rx, entries_number, path.to_path_buf(), 0)
1542            .await
1543            .unwrap();
1544
1545        // 验证临时目录下生成的 pack/idx 文件
1546        let mut pack_file = None;
1547        let mut idx_file = None;
1548        for entry in std::fs::read_dir(path).unwrap() {
1549            let entry = entry.unwrap();
1550            let file_name = entry.file_name();
1551            tracing::info!("file name: {:?}", file_name);
1552            let file_name = file_name.to_string_lossy();
1553            if file_name.ends_with(".pack") {
1554                pack_file = Some(entry.path());
1555            } else if file_name.ends_with(".idx") {
1556                idx_file = Some(entry.path());
1557            }
1558        }
1559        let pack_file = pack_file.expect("pack file not generated");
1560        let idx_file = idx_file.expect("idx file not generated");
1561        assert!(
1562            pack_file.metadata().unwrap().len() > 0,
1563            "pack file is empty"
1564        );
1565        assert!(idx_file.metadata().unwrap().len() > 0, "idx file is empty");
1566
1567        let duration = start.elapsed();
1568        tracing::info!("test executed in: {:.2?}", duration);
1569        tracing::info!("original total size: {}", total_original_size);
1570    }
1571
1572    #[tokio::test]
1573    async fn test_pack_encoder_output_to_files_with_delta() {
1574        let _guard = set_hash_kind_for_test(HashKind::Sha1);
1575        init_logger();
1576        let entries = get_entries_for_test().await;
1577        let entries_number = entries.lock().await.len();
1578
1579        let total_original_size: usize = entries
1580            .lock()
1581            .await
1582            .iter()
1583            .map(|entry| entry.data.len())
1584            .sum();
1585
1586        let start = Instant::now();
1587
1588        let (entry_tx, entry_rx) = mpsc::channel::<MetaAttached<Entry, EntryMeta>>(100_000);
1589        // 自动创建临时目录,生命周期结束自动删除
1590        let dir = tempdir().unwrap();
1591        let path = dir.path();
1592
1593        // spawn a task to send entries
1594        tokio::spawn(async move {
1595            let entries = entries.lock().await;
1596            for entry in entries.iter() {
1597                entry_tx
1598                    .send(MetaAttached {
1599                        inner: entry.clone(),
1600                        meta: EntryMeta::new(),
1601                    })
1602                    .await
1603                    .unwrap();
1604            }
1605            drop(entry_tx);
1606            tracing::info!("all entries sent");
1607        });
1608
1609        encode_and_output_to_files(entry_rx, entries_number, path.to_path_buf(), 10)
1610            .await
1611            .unwrap();
1612
1613        // 验证临时目录下生成的 pack/idx 文件
1614        let mut pack_file = None;
1615        let mut idx_file = None;
1616        for entry in std::fs::read_dir(path).unwrap() {
1617            let entry = entry.unwrap();
1618            let file_name = entry.file_name();
1619            tracing::info!("file name: {:?}", file_name);
1620            let file_name = file_name.to_string_lossy();
1621            if file_name.ends_with(".pack") {
1622                pack_file = Some(entry.path());
1623            } else if file_name.ends_with(".idx") {
1624                idx_file = Some(entry.path());
1625            }
1626        }
1627        let pack_file = pack_file.expect("pack file not generated");
1628        let idx_file = idx_file.expect("idx file not generated");
1629        assert!(
1630            pack_file.metadata().unwrap().len() > 0,
1631            "pack file is empty"
1632        );
1633        assert!(idx_file.metadata().unwrap().len() > 0, "idx file is empty");
1634
1635        let duration = start.elapsed();
1636        tracing::info!("test executed in: {:.2?}", duration);
1637        tracing::info!("original total size: {}", total_original_size);
1638    }
1639}