Skip to main content

git_internal/internal/pack/
decode.rs

1//! Streaming pack decoder that validates headers, inflates entries, rebuilds deltas (including zstd),
2//! and populates caches/metadata for downstream consumers.
3
4use std::{
5    io::{self, BufRead, Cursor, ErrorKind, Read},
6    path::PathBuf,
7    sync::{
8        Arc,
9        atomic::{AtomicUsize, Ordering},
10    },
11    thread::{self, JoinHandle},
12    time::Instant,
13};
14
15use axum::Error;
16use bytes::Bytes;
17use flate2::bufread::ZlibDecoder;
18use futures_util::{Stream, StreamExt};
19use threadpool::ThreadPool;
20use tokio::sync::mpsc::UnboundedSender;
21use uuid::Uuid;
22
23use crate::{
24    errors::GitError,
25    hash::{ObjectHash, get_hash_kind, set_hash_kind},
26    internal::{
27        metadata::{EntryMeta, MetaAttached},
28        object::types::ObjectType,
29        pack::{
30            DEFAULT_TMP_DIR, Pack,
31            cache::{_Cache, Caches},
32            cache_object::{CacheObject, CacheObjectInfo, MemSizeRecorder},
33            channel_reader::StreamBufReader,
34            entry::Entry,
35            utils,
36            waitlist::Waitlist,
37            wrapper::Wrapper,
38        },
39    },
40    utils::CountingReader,
41    zstdelta,
42};
43
44/// A reader that counts bytes read and computes CRC32 checksum.
45/// which is used to verify the integrity of decompressed data.
46struct CrcCountingReader<'a, R> {
47    inner: R,
48    bytes_read: u64,
49    crc: &'a mut crc32fast::Hasher,
50}
51impl<R: Read> Read for CrcCountingReader<'_, R> {
52    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
53        let n = self.inner.read(buf)?;
54        self.bytes_read += n as u64;
55        self.crc.update(&buf[..n]);
56        Ok(n)
57    }
58}
59impl<R: BufRead> BufRead for CrcCountingReader<'_, R> {
60    fn fill_buf(&mut self) -> io::Result<&[u8]> {
61        self.inner.fill_buf()
62    }
63    fn consume(&mut self, amt: usize) {
64        let buf = self.inner.fill_buf().unwrap_or(&[]);
65        self.crc.update(&buf[..amt.min(buf.len())]);
66        self.bytes_read += amt as u64;
67        self.inner.consume(amt);
68    }
69}
70
71/// For the convenience of passing parameters
72struct SharedParams {
73    pub pool: Arc<ThreadPool>,
74    pub waitlist: Arc<Waitlist>,
75    pub caches: Arc<Caches>,
76    pub cache_objs_mem_size: Arc<AtomicUsize>,
77    pub callback: Arc<dyn Fn(MetaAttached<Entry, EntryMeta>) + Sync + Send>,
78}
79
80impl Drop for Pack {
81    fn drop(&mut self) {
82        if self.clean_tmp {
83            self.caches.remove_tmp_dir();
84        }
85    }
86}
87
88impl Pack {
89    /// # Parameters
90    /// - `thread_num`: The number of threads to use for decoding and cache, `None` mean use the number of logical CPUs.
91    ///   It can't be zero, or panic <br>
92    /// - `mem_limit`: The maximum size of the memory cache in bytes, or None for unlimited.
93    ///   The 80% of it will be used for [Caches]  <br>
94    ///   ​**Not very accurate, because of memory alignment and other reasons, overuse about 15%** <br>
95    /// - `temp_path`: The path to a directory for temporary files, default is "./.cache_temp" <br>
96    ///   For example, thread_num = 4 will use up to 8 threads (4 for decoding and 4 for cache) <br>
97    /// - `clean_tmp`: whether to remove temp directory when Pack is dropped
98    pub fn new(
99        thread_num: Option<usize>,
100        mem_limit: Option<usize>,
101        temp_path: Option<PathBuf>,
102        clean_tmp: bool,
103    ) -> Self {
104        let mut temp_path = temp_path.unwrap_or(PathBuf::from(DEFAULT_TMP_DIR));
105        // add 8 random characters as subdirectory, check if the directory exists
106        loop {
107            let sub_dir = Uuid::new_v4().to_string()[..8].to_string();
108            temp_path.push(sub_dir);
109            if !temp_path.exists() {
110                break;
111            }
112            temp_path.pop();
113        }
114        let thread_num = thread_num.unwrap_or_else(num_cpus::get);
115        let cache_mem_size = mem_limit.map(|mem_limit| {
116            // Use wider math to avoid 32-bit overflow when computing 80%.
117            ((mem_limit as u128) * 4 / 5) as usize
118        });
119        Pack {
120            number: 0,
121            signature: ObjectHash::default(),
122            objects: Vec::new(),
123            pool: Arc::new(ThreadPool::new(thread_num)),
124            waitlist: Arc::new(Waitlist::new()),
125            caches: Arc::new(Caches::new(cache_mem_size, temp_path, thread_num)),
126            mem_limit,
127            cache_objs_mem: Arc::new(AtomicUsize::default()),
128            clean_tmp,
129        }
130    }
131
132    /// Checks and reads the header of a Git pack file.
133    ///
134    /// This function reads the first 12 bytes of a pack file, which include the b"PACK" magic identifier,
135    /// the version number, and the number of objects in the pack. It verifies that the magic identifier
136    /// is correct and that the version number is 2 (which is the version currently supported by Git).
137    /// It also collects these header bytes for later use, such as for hashing the entire pack file.
138    ///
139    /// # Parameters
140    /// * `pack` - A mutable reference to an object implementing the `Read` trait,
141    ///   representing the source of the pack file data (e.g., file, memory stream).
142    ///
143    /// # Returns
144    /// A `Result` which is:
145    /// * `Ok((u32, Vec<u8>))`: On successful reading and validation of the header, returns a tuple where:
146    ///     - The first element is the number of objects in the pack file (`u32`).
147    ///     - The second element is a vector containing the bytes of the pack file header (`Vec<u8>`).
148    /// * `Err(GitError)`: On failure, returns a [`GitError`] with a description of the issue.
149    ///
150    /// # Errors
151    /// This function can return an error in the following situations:
152    /// * If the pack file does not start with the "PACK" magic identifier.
153    /// * If the pack file's version number is not 2.
154    /// * If there are any issues reading from the provided `pack` source.
155    pub fn check_header(pack: &mut impl BufRead) -> Result<(u32, Vec<u8>), GitError> {
156        // A vector to store the header data for hashing later
157        let mut header_data = Vec::new();
158
159        // Read the first 4 bytes which should be "PACK"
160        let mut magic = [0; 4];
161        // Read the magic "PACK" identifier
162        let result = pack.read_exact(&mut magic);
163        match result {
164            Ok(_) => {
165                // Store these bytes for later
166                header_data.extend_from_slice(&magic);
167
168                // Check if the magic bytes match "PACK"
169                if magic != *b"PACK" {
170                    // If not, return an error indicating invalid pack header
171                    return Err(GitError::InvalidPackHeader(format!(
172                        "{},{},{},{}",
173                        magic[0], magic[1], magic[2], magic[3]
174                    )));
175                }
176            }
177            Err(e) => {
178                // If there is an error in reading, return a GitError
179                return Err(GitError::InvalidPackFile(format!(
180                    "Error reading magic identifier: {e}"
181                )));
182            }
183        }
184
185        // Read the next 4 bytes for the version number
186        let mut version_bytes = [0; 4];
187        let result = pack.read_exact(&mut version_bytes); // Read the version number
188        match result {
189            Ok(_) => {
190                // Store these bytes
191                header_data.extend_from_slice(&version_bytes);
192
193                // Convert the version bytes to an u32 integer
194                let version = u32::from_be_bytes(version_bytes);
195                if version != 2 {
196                    // Git currently supports version 2, so error if not version 2
197                    return Err(GitError::InvalidPackFile(format!(
198                        "Version Number is {version}, not 2"
199                    )));
200                }
201            }
202            Err(e) => {
203                // If there is an error in reading, return a GitError
204                return Err(GitError::InvalidPackFile(format!(
205                    "Error reading version number: {e}"
206                )));
207            }
208        }
209
210        // Read the next 4 bytes for the number of objects in the pack
211        let mut object_num_bytes = [0; 4];
212        // Read the number of objects
213        let result = pack.read_exact(&mut object_num_bytes);
214        match result {
215            Ok(_) => {
216                // Store these bytes
217                header_data.extend_from_slice(&object_num_bytes);
218                // Convert the object number bytes to an u32 integer
219                let object_num = u32::from_be_bytes(object_num_bytes);
220                // Return the number of objects and the header data for further processing
221                Ok((object_num, header_data))
222            }
223            Err(e) => {
224                // If there is an error in reading, return a GitError
225                Err(GitError::InvalidPackFile(format!(
226                    "Error reading object number: {e}"
227                )))
228            }
229        }
230    }
231
232    /// Decompresses data from a given Read and BufRead source using Zlib decompression.
233    ///
234    /// # Parameters
235    /// * `pack`: A source that implements both Read and BufRead traits (e.g., file, network stream).
236    /// * `expected_size`: The expected decompressed size of the data.
237    ///
238    /// # Returns
239    /// Returns a `Result` containing either:
240    /// * A tuple with a `Vec<u8>` of the decompressed data and the total number of input bytes processed,
241    /// * Or a `GitError` in case of a mismatch in expected size or any other reading error.
242    ///
243    pub fn decompress_data(
244        pack: &mut (impl BufRead + Send),
245        expected_size: usize,
246    ) -> Result<(Vec<u8>, usize), GitError> {
247        // Create a buffer with the expected size for the decompressed data
248        let mut buf = Vec::with_capacity(expected_size);
249
250        let mut counting_reader = CountingReader::new(pack);
251        // Create a new Zlib decoder with the original data
252        //let mut deflate = ZlibDecoder::new(pack);
253        let mut deflate = ZlibDecoder::new(&mut counting_reader);
254        // Attempt to read data to the end of the buffer
255        match deflate.read_to_end(&mut buf) {
256            Ok(_) => {
257                // Check if the length of the buffer matches the expected size
258                if buf.len() != expected_size {
259                    Err(GitError::InvalidPackFile(format!(
260                        "The object size {} does not match the expected size {}",
261                        buf.len(),
262                        expected_size
263                    )))
264                } else {
265                    // If everything is as expected, return the buffer, the original data, and the total number of input bytes processed
266                    let actual_input_bytes = counting_reader.bytes_read as usize;
267                    Ok((buf, actual_input_bytes))
268                }
269            }
270            Err(e) => {
271                // If there is an error in reading, return a GitError
272                Err(GitError::InvalidPackFile(format!(
273                    "Decompression error: {e}"
274                )))
275            }
276        }
277    }
278
279    /// Decodes a pack object from a given Read and BufRead source and returns the object as a [`CacheObject`].
280    ///
281    /// # Parameters
282    /// * `pack`: A source that implements both Read and BufRead traits.
283    /// * `offset`: A mutable reference to the current offset within the pack.
284    ///
285    /// # Returns
286    /// Returns a `Result` containing either:
287    /// * A tuple of the next offset in the pack and the original compressed data as `Vec<u8>`,
288    /// * Or a `GitError` in case of any reading or decompression error.
289    ///
290    pub fn decode_pack_object(
291        pack: &mut (impl BufRead + Send),
292        offset: &mut usize,
293    ) -> Result<Option<CacheObject>, GitError> {
294        let init_offset = *offset;
295        let mut hasher = crc32fast::Hasher::new();
296        let mut reader = CrcCountingReader {
297            inner: pack,
298            bytes_read: 0,
299            crc: &mut hasher,
300        };
301
302        // Attempt to read the type and size, handle potential errors
303        // Note: read_type_and_varint_size updates the offset manually, but we can rely on reader.bytes_read
304        let (type_bits, size) = match utils::read_type_and_varint_size(&mut reader, offset) {
305            Ok(result) => result,
306            Err(e) => {
307                // Handle the error e.g., by logging it or converting it to GitError
308                // and then return from the function
309                return Err(GitError::InvalidPackFile(format!("Read error: {e}")));
310            }
311        };
312
313        // Check if the object type is valid
314        let t = ObjectType::from_pack_type_u8(type_bits)?;
315
316        match t {
317            ObjectType::Commit | ObjectType::Tree | ObjectType::Blob | ObjectType::Tag => {
318                let (data, raw_size) = Pack::decompress_data(&mut reader, size)?;
319                *offset += raw_size;
320                let crc32 = hasher.finalize();
321                Ok(Some(CacheObject::new_for_undeltified(
322                    t,
323                    data,
324                    init_offset,
325                    crc32,
326                )))
327            }
328            ObjectType::OffsetDelta | ObjectType::OffsetZstdelta => {
329                let (delta_offset, bytes) = utils::read_offset_encoding(&mut reader).unwrap();
330                *offset += bytes;
331
332                let (data, raw_size) = Pack::decompress_data(&mut reader, size)?;
333                *offset += raw_size;
334
335                // Count the base object offset: the current offset - delta offset
336                let base_offset = init_offset
337                    .checked_sub(delta_offset as usize)
338                    .ok_or_else(|| {
339                        GitError::InvalidObjectInfo("Invalid OffsetDelta offset".to_string())
340                    })
341                    .unwrap();
342
343                let mut reader = Cursor::new(&data);
344                let (_, final_size) = utils::read_delta_object_size(&mut reader)?;
345
346                let obj_info = match t {
347                    ObjectType::OffsetDelta => {
348                        CacheObjectInfo::OffsetDelta(base_offset, final_size)
349                    }
350                    ObjectType::OffsetZstdelta => {
351                        CacheObjectInfo::OffsetZstdelta(base_offset, final_size)
352                    }
353                    _ => unreachable!(),
354                };
355                let crc32 = hasher.finalize();
356                Ok(Some(CacheObject {
357                    info: obj_info,
358                    offset: init_offset,
359                    crc32,
360                    data_decompressed: data,
361                    mem_recorder: None,
362                    is_delta_in_pack: true,
363                }))
364            }
365            ObjectType::HashDelta => {
366                // Read hash bytes to get the reference object hash(size depends on hash kind,e.g.,20 for SHA1,32 for SHA256)
367                let ref_sha = ObjectHash::from_stream(&mut reader).unwrap();
368                // Offset is incremented by 20/32 bytes
369                *offset += get_hash_kind().size();
370
371                let (data, raw_size) = Pack::decompress_data(&mut reader, size)?;
372                *offset += raw_size;
373
374                let mut reader = Cursor::new(&data);
375                let (_, final_size) = utils::read_delta_object_size(&mut reader)?;
376
377                let crc32 = hasher.finalize();
378
379                Ok(Some(CacheObject {
380                    info: CacheObjectInfo::HashDelta(ref_sha, final_size),
381                    offset: init_offset,
382                    crc32,
383                    data_decompressed: data,
384                    mem_recorder: None,
385                    is_delta_in_pack: true,
386                }))
387            }
388            // AI object types (ContextSnapshot, Decision, etc.) use u8 IDs >= 8
389            // and cannot appear in a pack file (3-bit type field only holds 1-7).
390            // `from_pack_type_u8` already rejects them, but guard explicitly here.
391            other => Err(GitError::InvalidPackFile(format!(
392                "AI object type `{other}` cannot appear in a pack file"
393            ))),
394        }
395    }
396
397    /// Decodes a pack file from a given Read and BufRead source, for each object in the pack,
398    /// it decodes the object and processes it using the provided callback function.
399    ///
400    /// # Parameters
401    /// * pack_id_callback: A callback that seed pack_file sha1 for updating database
402    ///
403    pub fn decode<F, C>(
404        &mut self,
405        pack: &mut (impl BufRead + Send),
406        callback: F,
407        pack_id_callback: Option<C>,
408    ) -> Result<(), GitError>
409    where
410        F: Fn(MetaAttached<Entry, EntryMeta>) + Sync + Send + 'static,
411        C: FnOnce(ObjectHash) + Send + 'static,
412    {
413        let time = Instant::now();
414        let mut last_update_time = time.elapsed().as_millis();
415        let log_info = |_i: usize, pack: &Pack| {
416            tracing::info!(
417                "time {:.2} s \t decode: {:?} \t dec-num: {} \t cah-num: {} \t Objs: {} MB \t CacheUsed: {} MB",
418                time.elapsed().as_millis() as f64 / 1000.0,
419                _i,
420                pack.pool.queued_count(),
421                pack.caches.queued_tasks(),
422                pack.cache_objs_mem_used() / 1024 / 1024,
423                pack.caches.memory_used() / 1024 / 1024
424            );
425        };
426        let callback = Arc::new(callback);
427
428        let caches = self.caches.clone();
429        let mut reader = Wrapper::new(io::BufReader::new(pack));
430
431        let result = Pack::check_header(&mut reader);
432        match result {
433            Ok((object_num, _)) => {
434                self.number = object_num as usize;
435            }
436            Err(e) => {
437                return Err(e);
438            }
439        }
440        tracing::info!("The pack file has {} objects", self.number);
441        let mut offset: usize = 12;
442        let mut i = 0;
443        while i < self.number {
444            // log per 1000 objects and 1 second
445            if i % 1000 == 0 {
446                let time_now = time.elapsed().as_millis();
447                if time_now - last_update_time > 1000 {
448                    log_info(i, self);
449                    last_update_time = time_now;
450                }
451            }
452            // 3 parts: Waitlist + TheadPool + Caches
453            // hardcode the limit of the tasks of threads_pool queue, to limit memory
454            while self.pool.queued_count() > 2000
455                || self
456                    .mem_limit
457                    .map(|limit| self.memory_used() > limit)
458                    .unwrap_or(false)
459            {
460                thread::yield_now();
461            }
462            let r: Result<Option<CacheObject>, GitError> =
463                Pack::decode_pack_object(&mut reader, &mut offset);
464            match r {
465                Ok(Some(mut obj)) => {
466                    obj.set_mem_recorder(self.cache_objs_mem.clone());
467                    obj.record_mem_size();
468
469                    // Wrapper of Arc Params, for convenience to pass
470                    let params = Arc::new(SharedParams {
471                        pool: self.pool.clone(),
472                        waitlist: self.waitlist.clone(),
473                        caches: self.caches.clone(),
474                        cache_objs_mem_size: self.cache_objs_mem.clone(),
475                        callback: callback.clone(),
476                    });
477
478                    let caches = caches.clone();
479                    let waitlist = self.waitlist.clone();
480                    let kind = get_hash_kind();
481                    self.pool.execute(move || {
482                        set_hash_kind(kind);
483                        match obj.info {
484                            CacheObjectInfo::BaseObject(_, _) => {
485                                Self::cache_obj_and_process_waitlist(params, obj);
486                            }
487                            CacheObjectInfo::OffsetDelta(base_offset, _)
488                            | CacheObjectInfo::OffsetZstdelta(base_offset, _) => {
489                                if let Some(base_obj) = caches.get_by_offset(base_offset) {
490                                    Self::process_delta(params, obj, base_obj);
491                                } else {
492                                    // You can delete this 'if' block ↑, because there are Second check in 'else'
493                                    // It will be more readable, but the performance will be slightly reduced
494                                    waitlist.insert_offset(base_offset, obj);
495                                    // Second check: prevent that the base_obj thread has finished before the waitlist insert
496                                    if let Some(base_obj) = caches.get_by_offset(base_offset) {
497                                        Self::process_waitlist(params, base_obj);
498                                    }
499                                }
500                            }
501                            CacheObjectInfo::HashDelta(base_ref, _) => {
502                                if let Some(base_obj) = caches.get_by_hash(base_ref) {
503                                    Self::process_delta(params, obj, base_obj);
504                                } else {
505                                    waitlist.insert_ref(base_ref, obj);
506                                    if let Some(base_obj) = caches.get_by_hash(base_ref) {
507                                        Self::process_waitlist(params, base_obj);
508                                    }
509                                }
510                            }
511                        }
512                    });
513                }
514                Ok(None) => {}
515                Err(e) => {
516                    return Err(e);
517                }
518            }
519            i += 1;
520        }
521        log_info(i, self);
522        let render_hash = reader.final_hash();
523        self.signature = ObjectHash::from_stream(&mut reader).unwrap();
524
525        if render_hash != self.signature {
526            return Err(GitError::InvalidPackFile(format!(
527                "The pack file hash {} does not match the trailer hash {}",
528                render_hash, self.signature
529            )));
530        }
531
532        let end = utils::is_eof(&mut reader);
533        if !end {
534            return Err(GitError::InvalidPackFile(
535                "The pack file is not at the end".to_string(),
536            ));
537        }
538
539        self.pool.join(); // wait for all threads to finish
540
541        // send pack id for metadata
542        if let Some(pack_callback) = pack_id_callback {
543            pack_callback(self.signature);
544        }
545        // !Attention: Caches threadpool may not stop, but it's not a problem (garbage file data)
546        // So that files != self.number
547        assert_eq!(self.waitlist.map_offset.len(), 0);
548        assert_eq!(self.waitlist.map_ref.len(), 0);
549        // Because we may skip some objects (e.g. AI objects), we use >= instead of ==
550        assert!(self.number >= caches.total_inserted());
551        tracing::info!(
552            "The pack file has been decoded successfully, takes: [ {:?} ]",
553            time.elapsed()
554        );
555        self.caches.clear(); // clear cached objects & stop threads
556        assert_eq!(self.cache_objs_mem_used(), 0); // all the objs should be dropped until here
557
558        // impl in Drop Trait
559        // if self.clean_tmp {
560        //     self.caches.remove_tmp_dir();
561        // }
562
563        Ok(())
564    }
565
566    /// Decode a Pack in a new thread and send the CacheObjects while decoding.
567    /// <br> Attention: It will consume the `pack` and return in a JoinHandle.
568    pub fn decode_async(
569        mut self,
570        mut pack: impl BufRead + Send + 'static,
571        sender: UnboundedSender<Entry>,
572    ) -> JoinHandle<Pack> {
573        let kind = get_hash_kind();
574        thread::spawn(move || {
575            set_hash_kind(kind);
576            self.decode(
577                &mut pack,
578                move |entry| {
579                    if let Err(e) = sender.send(entry.inner) {
580                        eprintln!("Channel full, failed to send entry: {e:?}");
581                    }
582                },
583                None::<fn(ObjectHash)>,
584            )
585            .unwrap();
586            self
587        })
588    }
589
590    /// Decodes a `Pack` from a `Stream` of `Bytes`, and sends the `Entry` while decoding.
591    pub async fn decode_stream(
592        mut self,
593        mut stream: impl Stream<Item = Result<Bytes, Error>> + Unpin + Send + 'static,
594        sender: UnboundedSender<MetaAttached<Entry, EntryMeta>>,
595        pack_hash_send: Option<UnboundedSender<ObjectHash>>,
596    ) -> Self {
597        let kind = get_hash_kind();
598        let (tx, rx) = std::sync::mpsc::channel();
599        let mut reader = StreamBufReader::new(rx);
600        tokio::spawn(async move {
601            while let Some(chunk) = stream.next().await {
602                let data = chunk.unwrap().to_vec();
603                if let Err(e) = tx.send(data) {
604                    eprintln!("Sending Error: {e:?}");
605                    break;
606                }
607            }
608        });
609        // CPU-bound task, so use spawn_blocking
610        // DO NOT use thread::spawn, because it will block tokio runtime (if single-threaded runtime, like in tests)
611        tokio::task::spawn_blocking(move || {
612            set_hash_kind(kind);
613            self.decode(
614                &mut reader,
615                move |entry: MetaAttached<Entry, EntryMeta>| {
616                    // as we used unbound channel here, it will never full so can be send with synchronous
617                    if let Err(e) = sender.send(entry) {
618                        eprintln!("unbound channel Sending Error: {e:?}");
619                    }
620                },
621                Some(move |pack_id: ObjectHash| {
622                    if let Some(pack_id_send) = pack_hash_send
623                        && let Err(e) = pack_id_send.send(pack_id)
624                    {
625                        eprintln!("unbound channel Sending Error: {e:?}");
626                    }
627                }),
628            )
629            .unwrap();
630            self
631        })
632        .await
633        .unwrap()
634    }
635
636    /// CacheObjects + Index size of Caches
637    fn memory_used(&self) -> usize {
638        self.cache_objs_mem_used() + self.caches.memory_used_index()
639    }
640
641    /// The total memory used by the CacheObjects of this Pack
642    fn cache_objs_mem_used(&self) -> usize {
643        self.cache_objs_mem.load(Ordering::Acquire)
644    }
645
646    /// Rebuild the Delta Object in a new thread & process the objects waiting for it recursively.
647    /// <br> This function must be *static*, because [&self] can't be moved into a new thread.
648    fn process_delta(
649        shared_params: Arc<SharedParams>,
650        delta_obj: CacheObject,
651        base_obj: Arc<CacheObject>,
652    ) {
653        shared_params.pool.clone().execute(move || {
654            let mut new_obj = match delta_obj.info {
655                CacheObjectInfo::OffsetDelta(_, _) | CacheObjectInfo::HashDelta(_, _) => {
656                    Pack::rebuild_delta(delta_obj, base_obj)
657                }
658                CacheObjectInfo::OffsetZstdelta(_, _) => {
659                    Pack::rebuild_zstdelta(delta_obj, base_obj)
660                }
661                _ => unreachable!(),
662            };
663
664            new_obj.set_mem_recorder(shared_params.cache_objs_mem_size.clone());
665            new_obj.record_mem_size();
666            Self::cache_obj_and_process_waitlist(shared_params, new_obj); //Indirect Recursion
667        });
668    }
669
670    /// Cache the new object & process the objects waiting for it (in multi-threading).
671    fn cache_obj_and_process_waitlist(shared_params: Arc<SharedParams>, new_obj: CacheObject) {
672        (shared_params.callback)(new_obj.to_entry_metadata());
673        let new_obj = shared_params.caches.insert(
674            new_obj.offset,
675            new_obj.base_object_hash().unwrap(),
676            new_obj,
677        );
678        Self::process_waitlist(shared_params, new_obj);
679    }
680
681    fn process_waitlist(shared_params: Arc<SharedParams>, base_obj: Arc<CacheObject>) {
682        let wait_objs = shared_params
683            .waitlist
684            .take(base_obj.offset, base_obj.base_object_hash().unwrap());
685        for obj in wait_objs {
686            // Process the objects waiting for the new object(base_obj = new_obj)
687            Self::process_delta(shared_params.clone(), obj, base_obj.clone());
688        }
689    }
690
691    /// Reconstruct the Delta Object based on the "base object"
692    /// and return the new object.
693    pub fn rebuild_delta(delta_obj: CacheObject, base_obj: Arc<CacheObject>) -> CacheObject {
694        const COPY_INSTRUCTION_FLAG: u8 = 1 << 7;
695        const COPY_OFFSET_BYTES: u8 = 4;
696        const COPY_SIZE_BYTES: u8 = 3;
697        const COPY_ZERO_SIZE: usize = 0x10000;
698
699        let mut stream = Cursor::new(&delta_obj.data_decompressed);
700
701        // Read the base object size
702        // (Size Encoding)
703        let (base_size, result_size) = utils::read_delta_object_size(&mut stream).unwrap();
704
705        // Get the base object data
706        let base_info = &base_obj.data_decompressed;
707        assert_eq!(base_info.len(), base_size, "Base object size mismatch");
708
709        let mut result = Vec::with_capacity(result_size);
710
711        loop {
712            // Check if the stream has ended, meaning the new object is done
713            let instruction = match utils::read_bytes(&mut stream) {
714                Ok([instruction]) => instruction,
715                Err(err) if err.kind() == ErrorKind::UnexpectedEof => break,
716                Err(err) => {
717                    panic!(
718                        "{}",
719                        GitError::DeltaObjectError(format!("Wrong instruction in delta :{err}"))
720                    );
721                }
722            };
723
724            if instruction & COPY_INSTRUCTION_FLAG == 0 {
725                // Data instruction; the instruction byte specifies the number of data bytes
726                if instruction == 0 {
727                    // Appending 0 bytes doesn't make sense, so git disallows it
728                    panic!(
729                        "{}",
730                        GitError::DeltaObjectError(String::from("Invalid data instruction"))
731                    );
732                }
733
734                // Append the provided bytes
735                let mut data = vec![0; instruction as usize];
736                stream.read_exact(&mut data).unwrap();
737                result.extend_from_slice(&data);
738            } else {
739                // Copy instruction
740                // +----------+---------+---------+---------+---------+-------+-------+-------+
741                // | 1xxxxxxx | offset1 | offset2 | offset3 | offset4 | size1 | size2 | size3 |
742                // +----------+---------+---------+---------+---------+-------+-------+-------+
743                let mut nonzero_bytes = instruction;
744                let offset =
745                    utils::read_partial_int(&mut stream, COPY_OFFSET_BYTES, &mut nonzero_bytes)
746                        .unwrap();
747                let mut size =
748                    utils::read_partial_int(&mut stream, COPY_SIZE_BYTES, &mut nonzero_bytes)
749                        .unwrap();
750                if size == 0 {
751                    // Copying 0 bytes doesn't make sense, so git assumes a different size
752                    size = COPY_ZERO_SIZE;
753                }
754                // Copy bytes from the base object
755                let base_data = base_info.get(offset..(offset + size)).ok_or_else(|| {
756                    GitError::DeltaObjectError("Invalid copy instruction".to_string())
757                });
758
759                match base_data {
760                    Ok(data) => result.extend_from_slice(data),
761                    Err(e) => panic!("{}", e),
762                }
763            }
764        }
765        assert_eq!(result_size, result.len(), "Result size mismatch");
766
767        let hash = utils::calculate_object_hash(base_obj.object_type(), &result);
768        // create new obj from `delta_obj` & `result` instead of modifying `delta_obj` for heap-size recording
769        CacheObject {
770            info: CacheObjectInfo::BaseObject(base_obj.object_type(), hash),
771            offset: delta_obj.offset,
772            crc32: delta_obj.crc32,
773            data_decompressed: result,
774            mem_recorder: None,
775            is_delta_in_pack: delta_obj.is_delta_in_pack,
776        } // Canonical form (Complete Object)
777        // Memory recording will happen after this function returns. See `process_delta`
778    }
779    pub fn rebuild_zstdelta(delta_obj: CacheObject, base_obj: Arc<CacheObject>) -> CacheObject {
780        let result = zstdelta::apply(&base_obj.data_decompressed, &delta_obj.data_decompressed)
781            .expect("Failed to apply zstdelta");
782        let hash = utils::calculate_object_hash(base_obj.object_type(), &result);
783        CacheObject {
784            info: CacheObjectInfo::BaseObject(base_obj.object_type(), hash),
785            offset: delta_obj.offset,
786            crc32: delta_obj.crc32,
787            data_decompressed: result,
788            mem_recorder: None,
789            is_delta_in_pack: delta_obj.is_delta_in_pack,
790        } // Canonical form (Complete Object)
791        // Memory recording will happen after this function returns. See `process_delta`
792    }
793}
794
795#[cfg(test)]
796mod tests {
797    use std::{
798        env, fs,
799        io::{BufReader, Cursor, prelude::*},
800        path::PathBuf,
801        sync::{
802            Arc,
803            atomic::{AtomicUsize, Ordering},
804        },
805    };
806
807    use flate2::{Compression, write::ZlibEncoder};
808    use futures_util::TryStreamExt;
809    use tokio_util::io::ReaderStream;
810
811    use crate::{
812        hash::{HashKind, ObjectHash, set_hash_kind_for_test},
813        internal::pack::{Pack, tests::init_logger},
814    };
815
816    #[tokio::test]
817    async fn test_pack_check_header() {
818        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
819        source.push("tests/data/packs/medium-sha1.pack");
820
821        let f = fs::File::open(source).unwrap();
822        let mut buf_reader = BufReader::new(f);
823        let (object_num, _) = Pack::check_header(&mut buf_reader).unwrap();
824
825        assert_eq!(object_num, 35031);
826    }
827
828    #[test]
829    fn test_decompress_data() {
830        let data = b"Hello, world!"; // Sample data to compress and then decompress
831        let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
832        encoder.write_all(data).unwrap();
833        let compressed_data = encoder.finish().unwrap();
834        let compressed_size = compressed_data.len();
835
836        // Create a cursor for the compressed data to simulate a BufRead source
837        let mut cursor: Cursor<Vec<u8>> = Cursor::new(compressed_data);
838        let expected_size = data.len();
839
840        // Decompress the data and assert correctness
841        let result = Pack::decompress_data(&mut cursor, expected_size);
842        match result {
843            Ok((decompressed_data, bytes_read)) => {
844                assert_eq!(bytes_read, compressed_size);
845                assert_eq!(decompressed_data, data);
846            }
847            Err(e) => panic!("Decompression failed: {e:?}"),
848        }
849    }
850
851    #[test]
852    #[cfg(target_pointer_width = "32")]
853    fn test_pack_new_mem_limit_no_overflow_32bit() {
854        // In the old code, 1.2B * 4 produced an intermediate 4.8B value, which exceeds
855        // 32-bit usize::MAX (~4.29B) and overflowed before a later division; this test
856        // covers that former panic path.
857        let mem_limit = 1_200_000_000usize;
858        let tmp = PathBuf::from("/tmp/.cache_temp");
859        let result = std::panic::catch_unwind(|| {
860            let _p = Pack::new(Some(1), Some(mem_limit), Some(tmp), true);
861        });
862        assert!(result.is_ok(), "Pack::new should not panic on 32-bit");
863    }
864
865    /// Helper function to run decode tests without delta objects
866    fn run_decode_no_delta(rel_path: &str, kind: HashKind) {
867        let _guard = set_hash_kind_for_test(kind);
868        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
869        source.push(rel_path);
870
871        let tmp = PathBuf::from("/tmp/.cache_temp");
872
873        let f = fs::File::open(source).unwrap();
874        let mut buffered = BufReader::new(f);
875        let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
876        p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
877            .unwrap();
878    }
879    #[test]
880    fn test_pack_decode_without_delta() {
881        run_decode_no_delta("tests/data/packs/small-sha1.pack", HashKind::Sha1);
882        run_decode_no_delta("tests/data/packs/small-sha256.pack", HashKind::Sha256);
883    }
884
885    /// Helper function to run decode tests with delta objects
886    fn run_decode_with_ref_delta(rel_path: &str, kind: HashKind) {
887        let _guard = set_hash_kind_for_test(kind);
888        init_logger();
889
890        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
891        source.push(rel_path);
892
893        let tmp = PathBuf::from("/tmp/.cache_temp");
894
895        let f = fs::File::open(source).unwrap();
896        let mut buffered = BufReader::new(f);
897        let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
898        p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
899            .unwrap();
900    }
901    #[test]
902    fn test_pack_decode_with_ref_delta() {
903        run_decode_with_ref_delta("tests/data/packs/ref-delta-sha1.pack", HashKind::Sha1);
904        run_decode_with_ref_delta("tests/data/packs/ref-delta-sha256.pack", HashKind::Sha256);
905    }
906
907    /// Helper function to run decode tests without memory limit
908    fn run_decode_no_mem_limit(rel_path: &str, kind: HashKind) {
909        let _guard = set_hash_kind_for_test(kind);
910        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
911        source.push(rel_path);
912
913        let tmp = PathBuf::from("/tmp/.cache_temp");
914
915        let f = fs::File::open(source).unwrap();
916        let mut buffered = BufReader::new(f);
917        let mut p = Pack::new(None, None, Some(tmp), true);
918        p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
919            .unwrap();
920    }
921    #[test]
922    fn test_pack_decode_no_mem_limit() {
923        run_decode_no_mem_limit("tests/data/packs/small-sha1.pack", HashKind::Sha1);
924        run_decode_no_mem_limit("tests/data/packs/small-sha256.pack", HashKind::Sha256);
925    }
926
927    /// Helper function to run decode tests with delta objects
928    async fn run_decode_large_with_delta(rel_path: &str, kind: HashKind) {
929        let _guard = set_hash_kind_for_test(kind);
930        init_logger();
931        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
932        source.push(rel_path);
933
934        let tmp = PathBuf::from("/tmp/.cache_temp");
935
936        let f = fs::File::open(source).unwrap();
937        let mut buffered = BufReader::new(f);
938        let mut p = Pack::new(
939            Some(4),
940            Some(1024 * 1024 * 100), //try to avoid dead lock on CI servers with low memory
941            Some(tmp.clone()),
942            true,
943        );
944        let rt = p.decode(
945            &mut buffered,
946            |_obj| {
947                // println!("{:?} {}", obj.hash.to_string(), offset);
948            },
949            None::<fn(ObjectHash)>,
950        );
951        if let Err(e) = rt {
952            fs::remove_dir_all(tmp).unwrap();
953            panic!("Error: {e:?}");
954        }
955    }
956    #[tokio::test]
957    async fn test_pack_decode_with_large_file_with_delta_without_ref() {
958        run_decode_large_with_delta("tests/data/packs/medium-sha1.pack", HashKind::Sha1).await;
959        run_decode_large_with_delta("tests/data/packs/medium-sha256.pack", HashKind::Sha256).await;
960    } // it will be stuck on dropping `Pack` on Windows if `mem_size` is None, so we need `mimalloc`
961
962    /// Helper function to run decode tests with large file stream
963    async fn run_decode_large_stream(rel_path: &str, kind: HashKind) {
964        let _guard = set_hash_kind_for_test(kind);
965        init_logger();
966        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
967        source.push(rel_path);
968
969        let tmp = PathBuf::from("/tmp/.cache_temp");
970        let f = tokio::fs::File::open(source).await.unwrap();
971        let stream = ReaderStream::new(f).map_err(axum::Error::new);
972        let p = Pack::new(Some(4), Some(1024 * 1024 * 100), Some(tmp.clone()), true);
973
974        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
975        let handle = tokio::spawn(async move { p.decode_stream(stream, tx, None).await });
976        let count = Arc::new(AtomicUsize::new(0));
977        let count_c = count.clone();
978        // in tests, RUNTIME is single-threaded, so `sync code` will block the tokio runtime
979        let consume = tokio::spawn(async move {
980            let mut cnt = 0;
981            while let Some(_entry) = rx.recv().await {
982                cnt += 1;
983            }
984            tracing::info!("Received: {}", cnt);
985            count_c.store(cnt, Ordering::Release);
986        });
987        let p = handle.await.unwrap();
988        consume.await.unwrap();
989        assert_eq!(count.load(Ordering::Acquire), p.number);
990        assert_eq!(p.number, 35031);
991    }
992    #[tokio::test]
993    async fn test_decode_large_file_stream() {
994        run_decode_large_stream("tests/data/packs/medium-sha1.pack", HashKind::Sha1).await;
995        run_decode_large_stream("tests/data/packs/medium-sha256.pack", HashKind::Sha256).await;
996    }
997
998    /// Helper function to run decode tests with large file async
999    async fn run_decode_large_file_async(rel_path: &str, kind: HashKind) {
1000        let _guard = set_hash_kind_for_test(kind);
1001        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1002        source.push(rel_path);
1003
1004        let tmp = PathBuf::from("/tmp/.cache_temp");
1005        let f = fs::File::open(source).unwrap();
1006        let buffered = BufReader::new(f);
1007        let p = Pack::new(Some(4), Some(1024 * 1024 * 100), Some(tmp.clone()), true);
1008
1009        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1010        let handle = p.decode_async(buffered, tx); // new thread
1011        let mut cnt = 0;
1012        while let Some(_entry) = rx.recv().await {
1013            cnt += 1; //use entry here
1014        }
1015        let p = handle.join().unwrap();
1016        assert_eq!(cnt, p.number);
1017    }
1018    #[tokio::test]
1019    async fn test_decode_large_file_async() {
1020        run_decode_large_file_async("tests/data/packs/medium-sha1.pack", HashKind::Sha1).await;
1021        run_decode_large_file_async("tests/data/packs/medium-sha256.pack", HashKind::Sha256).await;
1022    }
1023
1024    /// Helper function to run decode tests with delta objects without reference
1025    fn run_decode_with_delta_no_ref(rel_path: &str, kind: HashKind) {
1026        let _guard = set_hash_kind_for_test(kind);
1027        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1028        source.push(rel_path);
1029
1030        let tmp = PathBuf::from("/tmp/.cache_temp");
1031
1032        let f = fs::File::open(source).unwrap();
1033        let mut buffered = BufReader::new(f);
1034        let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
1035        p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
1036            .unwrap();
1037    }
1038    #[test]
1039    fn test_pack_decode_with_delta_without_ref() {
1040        run_decode_with_delta_no_ref("tests/data/packs/medium-sha1.pack", HashKind::Sha1);
1041        run_decode_with_delta_no_ref("tests/data/packs/medium-sha256.pack", HashKind::Sha256);
1042    }
1043
1044    #[test] // Take too long time
1045    fn test_pack_decode_multi_task_with_large_file_with_delta_without_ref() {
1046        let rt = tokio::runtime::Builder::new_current_thread()
1047            .enable_all()
1048            .build()
1049            .unwrap();
1050        rt.block_on(async move {
1051            // For each hash kind, run two decode tasks concurrently to simulate multi-task pressure.
1052            for (kind, path) in [
1053                (HashKind::Sha1, "tests/data/packs/medium-sha1.pack"),
1054                (HashKind::Sha256, "tests/data/packs/medium-sha256.pack"),
1055            ] {
1056                let f1 = run_decode_large_with_delta(path, kind);
1057                let f2 = run_decode_large_with_delta(path, kind);
1058                let _ = futures::future::join(f1, f2).await;
1059            }
1060        });
1061    }
1062}