git_internal/internal/pack/
decode.rs

1//! Streaming pack decoder that validates headers, inflates entries, rebuilds deltas (including zstd),
2//! and populates caches/metadata for downstream consumers.
3
4use std::{
5    io::{self, BufRead, Cursor, ErrorKind, Read},
6    path::PathBuf,
7    sync::{
8        Arc,
9        atomic::{AtomicUsize, Ordering},
10    },
11    thread::{self, JoinHandle},
12    time::Instant,
13};
14
15use axum::Error;
16use bytes::Bytes;
17use flate2::bufread::ZlibDecoder;
18use futures_util::{Stream, StreamExt};
19use threadpool::ThreadPool;
20use tokio::sync::mpsc::UnboundedSender;
21use uuid::Uuid;
22
23use super::cache_object::CacheObjectInfo;
24use crate::{
25    errors::GitError,
26    hash::{ObjectHash, get_hash_kind, set_hash_kind},
27    internal::{
28        metadata::{EntryMeta, MetaAttached},
29        object::types::ObjectType,
30        pack::{
31            DEFAULT_TMP_DIR, Pack,
32            cache::{_Cache, Caches},
33            cache_object::{CacheObject, MemSizeRecorder},
34            channel_reader::StreamBufReader,
35            entry::Entry,
36            utils,
37            waitlist::Waitlist,
38            wrapper::Wrapper,
39        },
40    },
41    utils::CountingReader,
42    zstdelta,
43};
44
45/// A reader that counts bytes read and computes CRC32 checksum.
46/// which is used to verify the integrity of decompressed data.
47struct CrcCountingReader<'a, R> {
48    inner: R,
49    bytes_read: u64,
50    crc: &'a mut crc32fast::Hasher,
51}
52impl<R: Read> Read for CrcCountingReader<'_, R> {
53    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
54        let n = self.inner.read(buf)?;
55        self.bytes_read += n as u64;
56        self.crc.update(&buf[..n]);
57        Ok(n)
58    }
59}
60impl<R: BufRead> BufRead for CrcCountingReader<'_, R> {
61    fn fill_buf(&mut self) -> io::Result<&[u8]> {
62        self.inner.fill_buf()
63    }
64    fn consume(&mut self, amt: usize) {
65        let buf = self.inner.fill_buf().unwrap_or(&[]);
66        self.crc.update(&buf[..amt.min(buf.len())]);
67        self.bytes_read += amt as u64;
68        self.inner.consume(amt);
69    }
70}
71
72/// For the convenience of passing parameters
73struct SharedParams {
74    pub pool: Arc<ThreadPool>,
75    pub waitlist: Arc<Waitlist>,
76    pub caches: Arc<Caches>,
77    pub cache_objs_mem_size: Arc<AtomicUsize>,
78    pub callback: Arc<dyn Fn(MetaAttached<Entry, EntryMeta>) + Sync + Send>,
79}
80
81impl Drop for Pack {
82    fn drop(&mut self) {
83        if self.clean_tmp {
84            self.caches.remove_tmp_dir();
85        }
86    }
87}
88
89impl Pack {
90    /// # Parameters
91    /// - `thread_num`: The number of threads to use for decoding and cache, `None` mean use the number of logical CPUs.
92    ///   It can't be zero, or panic <br>
93    /// - `mem_limit`: The maximum size of the memory cache in bytes, or None for unlimited.
94    ///   The 80% of it will be used for [Caches]  <br>
95    ///   ​**Not very accurate, because of memory alignment and other reasons, overuse about 15%** <br>
96    /// - `temp_path`: The path to a directory for temporary files, default is "./.cache_temp" <br>
97    ///   For example, thread_num = 4 will use up to 8 threads (4 for decoding and 4 for cache) <br>
98    /// - `clean_tmp`: whether to remove temp directory when Pack is dropped
99    pub fn new(
100        thread_num: Option<usize>,
101        mem_limit: Option<usize>,
102        temp_path: Option<PathBuf>,
103        clean_tmp: bool,
104    ) -> Self {
105        let mut temp_path = temp_path.unwrap_or(PathBuf::from(DEFAULT_TMP_DIR));
106        // add 8 random characters as subdirectory, check if the directory exists
107        loop {
108            let sub_dir = Uuid::new_v4().to_string()[..8].to_string();
109            temp_path.push(sub_dir);
110            if !temp_path.exists() {
111                break;
112            }
113            temp_path.pop();
114        }
115        let thread_num = thread_num.unwrap_or_else(num_cpus::get);
116        let cache_mem_size = mem_limit.map(|mem_limit| mem_limit * 4 / 5);
117        Pack {
118            number: 0,
119            signature: ObjectHash::default(),
120            objects: Vec::new(),
121            pool: Arc::new(ThreadPool::new(thread_num)),
122            waitlist: Arc::new(Waitlist::new()),
123            caches: Arc::new(Caches::new(cache_mem_size, temp_path, thread_num)),
124            mem_limit,
125            cache_objs_mem: Arc::new(AtomicUsize::default()),
126            clean_tmp,
127        }
128    }
129
130    /// Checks and reads the header of a Git pack file.
131    ///
132    /// This function reads the first 12 bytes of a pack file, which include the b"PACK" magic identifier,
133    /// the version number, and the number of objects in the pack. It verifies that the magic identifier
134    /// is correct and that the version number is 2 (which is the version currently supported by Git).
135    /// It also collects these header bytes for later use, such as for hashing the entire pack file.
136    ///
137    /// # Parameters
138    /// * `pack` - A mutable reference to an object implementing the `Read` trait,
139    ///   representing the source of the pack file data (e.g., file, memory stream).
140    ///
141    /// # Returns
142    /// A `Result` which is:
143    /// * `Ok((u32, Vec<u8>))`: On successful reading and validation of the header, returns a tuple where:
144    ///     - The first element is the number of objects in the pack file (`u32`).
145    ///     - The second element is a vector containing the bytes of the pack file header (`Vec<u8>`).
146    /// * `Err(GitError)`: On failure, returns a [`GitError`] with a description of the issue.
147    ///
148    /// # Errors
149    /// This function can return an error in the following situations:
150    /// * If the pack file does not start with the "PACK" magic identifier.
151    /// * If the pack file's version number is not 2.
152    /// * If there are any issues reading from the provided `pack` source.
153    pub fn check_header(pack: &mut impl BufRead) -> Result<(u32, Vec<u8>), GitError> {
154        // A vector to store the header data for hashing later
155        let mut header_data = Vec::new();
156
157        // Read the first 4 bytes which should be "PACK"
158        let mut magic = [0; 4];
159        // Read the magic "PACK" identifier
160        let result = pack.read_exact(&mut magic);
161        match result {
162            Ok(_) => {
163                // Store these bytes for later
164                header_data.extend_from_slice(&magic);
165
166                // Check if the magic bytes match "PACK"
167                if magic != *b"PACK" {
168                    // If not, return an error indicating invalid pack header
169                    return Err(GitError::InvalidPackHeader(format!(
170                        "{},{},{},{}",
171                        magic[0], magic[1], magic[2], magic[3]
172                    )));
173                }
174            }
175            Err(e) => {
176                // If there is an error in reading, return a GitError
177                return Err(GitError::InvalidPackFile(format!(
178                    "Error reading magic identifier: {e}"
179                )));
180            }
181        }
182
183        // Read the next 4 bytes for the version number
184        let mut version_bytes = [0; 4];
185        let result = pack.read_exact(&mut version_bytes); // Read the version number
186        match result {
187            Ok(_) => {
188                // Store these bytes
189                header_data.extend_from_slice(&version_bytes);
190
191                // Convert the version bytes to an u32 integer
192                let version = u32::from_be_bytes(version_bytes);
193                if version != 2 {
194                    // Git currently supports version 2, so error if not version 2
195                    return Err(GitError::InvalidPackFile(format!(
196                        "Version Number is {version}, not 2"
197                    )));
198                }
199            }
200            Err(e) => {
201                // If there is an error in reading, return a GitError
202                return Err(GitError::InvalidPackFile(format!(
203                    "Error reading version number: {e}"
204                )));
205            }
206        }
207
208        // Read the next 4 bytes for the number of objects in the pack
209        let mut object_num_bytes = [0; 4];
210        // Read the number of objects
211        let result = pack.read_exact(&mut object_num_bytes);
212        match result {
213            Ok(_) => {
214                // Store these bytes
215                header_data.extend_from_slice(&object_num_bytes);
216                // Convert the object number bytes to an u32 integer
217                let object_num = u32::from_be_bytes(object_num_bytes);
218                // Return the number of objects and the header data for further processing
219                Ok((object_num, header_data))
220            }
221            Err(e) => {
222                // If there is an error in reading, return a GitError
223                Err(GitError::InvalidPackFile(format!(
224                    "Error reading object number: {e}"
225                )))
226            }
227        }
228    }
229
230    /// Decompresses data from a given Read and BufRead source using Zlib decompression.
231    ///
232    /// # Parameters
233    /// * `pack`: A source that implements both Read and BufRead traits (e.g., file, network stream).
234    /// * `expected_size`: The expected decompressed size of the data.
235    ///
236    /// # Returns
237    /// Returns a `Result` containing either:
238    /// * A tuple with a `Vec<u8>` of the decompressed data and the total number of input bytes processed,
239    /// * Or a `GitError` in case of a mismatch in expected size or any other reading error.
240    ///
241    pub fn decompress_data(
242        pack: &mut (impl BufRead + Send),
243        expected_size: usize,
244    ) -> Result<(Vec<u8>, usize), GitError> {
245        // Create a buffer with the expected size for the decompressed data
246        let mut buf = Vec::with_capacity(expected_size);
247
248        let mut counting_reader = CountingReader::new(pack);
249        // Create a new Zlib decoder with the original data
250        //let mut deflate = ZlibDecoder::new(pack);
251        let mut deflate = ZlibDecoder::new(&mut counting_reader);
252        // Attempt to read data to the end of the buffer
253        match deflate.read_to_end(&mut buf) {
254            Ok(_) => {
255                // Check if the length of the buffer matches the expected size
256                if buf.len() != expected_size {
257                    Err(GitError::InvalidPackFile(format!(
258                        "The object size {} does not match the expected size {}",
259                        buf.len(),
260                        expected_size
261                    )))
262                } else {
263                    // If everything is as expected, return the buffer, the original data, and the total number of input bytes processed
264                    let actual_input_bytes = counting_reader.bytes_read as usize;
265                    Ok((buf, actual_input_bytes))
266                }
267            }
268            Err(e) => {
269                // If there is an error in reading, return a GitError
270                Err(GitError::InvalidPackFile(format!(
271                    "Decompression error: {e}"
272                )))
273            }
274        }
275    }
276
277    /// Decodes a pack object from a given Read and BufRead source and returns the object as a [`CacheObject`].
278    ///
279    /// # Parameters
280    /// * `pack`: A source that implements both Read and BufRead traits.
281    /// * `offset`: A mutable reference to the current offset within the pack.
282    ///
283    /// # Returns
284    /// Returns a `Result` containing either:
285    /// * A tuple of the next offset in the pack and the original compressed data as `Vec<u8>`,
286    /// * Or a `GitError` in case of any reading or decompression error.
287    ///
288    pub fn decode_pack_object(
289        pack: &mut (impl BufRead + Send),
290        offset: &mut usize,
291    ) -> Result<CacheObject, GitError> {
292        let init_offset = *offset;
293        let mut hasher = crc32fast::Hasher::new();
294        let mut reader = CrcCountingReader {
295            inner: pack,
296            bytes_read: 0,
297            crc: &mut hasher,
298        };
299
300        // Attempt to read the type and size, handle potential errors
301        // Note: read_type_and_varint_size updates the offset manually, but we can rely on reader.bytes_read
302        let (type_bits, size) = match utils::read_type_and_varint_size(&mut reader, offset) {
303            Ok(result) => result,
304            Err(e) => {
305                // Handle the error e.g., by logging it or converting it to GitError
306                // and then return from the function
307                return Err(GitError::InvalidPackFile(format!("Read error: {e}")));
308            }
309        };
310
311        // Check if the object type is valid
312        let t = ObjectType::from_u8(type_bits)?;
313
314        match t {
315            ObjectType::Commit | ObjectType::Tree | ObjectType::Blob | ObjectType::Tag => {
316                let (data, raw_size) = Pack::decompress_data(&mut reader, size)?;
317                *offset += raw_size;
318                let crc32 = hasher.finalize();
319                Ok(CacheObject::new_for_undeltified(
320                    t,
321                    data,
322                    init_offset,
323                    crc32,
324                ))
325            }
326            ObjectType::OffsetDelta | ObjectType::OffsetZstdelta => {
327                let (delta_offset, bytes) = utils::read_offset_encoding(&mut reader).unwrap();
328                *offset += bytes;
329
330                let (data, raw_size) = Pack::decompress_data(&mut reader, size)?;
331                *offset += raw_size;
332
333                // Count the base object offset: the current offset - delta offset
334                let base_offset = init_offset
335                    .checked_sub(delta_offset as usize)
336                    .ok_or_else(|| {
337                        GitError::InvalidObjectInfo("Invalid OffsetDelta offset".to_string())
338                    })
339                    .unwrap();
340
341                let mut reader = Cursor::new(&data);
342                let (_, final_size) = utils::read_delta_object_size(&mut reader)?;
343
344                let obj_info = match t {
345                    ObjectType::OffsetDelta => {
346                        CacheObjectInfo::OffsetDelta(base_offset, final_size)
347                    }
348                    ObjectType::OffsetZstdelta => {
349                        CacheObjectInfo::OffsetZstdelta(base_offset, final_size)
350                    }
351                    _ => unreachable!(),
352                };
353                let crc32 = hasher.finalize();
354                Ok(CacheObject {
355                    info: obj_info,
356                    offset: init_offset,
357                    crc32,
358                    data_decompressed: data,
359                    mem_recorder: None,
360                    is_delta_in_pack: true,
361                })
362            }
363            ObjectType::HashDelta => {
364                // Read hash bytes to get the reference object hash(size depends on hash kind,e.g.,20 for SHA1,32 for SHA256)
365                let ref_sha = ObjectHash::from_stream(&mut reader).unwrap();
366                // Offset is incremented by 20/32 bytes
367                *offset += get_hash_kind().size();
368
369                let (data, raw_size) = Pack::decompress_data(&mut reader, size)?;
370                *offset += raw_size;
371
372                let mut reader = Cursor::new(&data);
373                let (_, final_size) = utils::read_delta_object_size(&mut reader)?;
374
375                let crc32 = hasher.finalize();
376
377                Ok(CacheObject {
378                    info: CacheObjectInfo::HashDelta(ref_sha, final_size),
379                    offset: init_offset,
380                    crc32,
381                    data_decompressed: data,
382                    mem_recorder: None,
383                    is_delta_in_pack: true,
384                })
385            }
386        }
387    }
388
389    /// Decodes a pack file from a given Read and BufRead source, for each object in the pack,
390    /// it decodes the object and processes it using the provided callback function.
391    ///
392    /// # Parameters
393    /// * pack_id_callback: A callback that seed pack_file sha1 for updating database
394    ///
395    pub fn decode<F, C>(
396        &mut self,
397        pack: &mut (impl BufRead + Send),
398        callback: F,
399        pack_id_callback: Option<C>,
400    ) -> Result<(), GitError>
401    where
402        F: Fn(MetaAttached<Entry, EntryMeta>) + Sync + Send + 'static,
403        C: FnOnce(ObjectHash) + Send + 'static,
404    {
405        let time = Instant::now();
406        let mut last_update_time = time.elapsed().as_millis();
407        let log_info = |_i: usize, pack: &Pack| {
408            tracing::info!(
409                "time {:.2} s \t decode: {:?} \t dec-num: {} \t cah-num: {} \t Objs: {} MB \t CacheUsed: {} MB",
410                time.elapsed().as_millis() as f64 / 1000.0,
411                _i,
412                pack.pool.queued_count(),
413                pack.caches.queued_tasks(),
414                pack.cache_objs_mem_used() / 1024 / 1024,
415                pack.caches.memory_used() / 1024 / 1024
416            );
417        };
418        let callback = Arc::new(callback);
419
420        let caches = self.caches.clone();
421        let mut reader = Wrapper::new(io::BufReader::new(pack));
422
423        let result = Pack::check_header(&mut reader);
424        match result {
425            Ok((object_num, _)) => {
426                self.number = object_num as usize;
427            }
428            Err(e) => {
429                return Err(e);
430            }
431        }
432        tracing::info!("The pack file has {} objects", self.number);
433        let mut offset: usize = 12;
434        let mut i = 0;
435        while i < self.number {
436            // log per 1000 objects and 1 second
437            if i % 1000 == 0 {
438                let time_now = time.elapsed().as_millis();
439                if time_now - last_update_time > 1000 {
440                    log_info(i, self);
441                    last_update_time = time_now;
442                }
443            }
444            // 3 parts: Waitlist + TheadPool + Caches
445            // hardcode the limit of the tasks of threads_pool queue, to limit memory
446            while self.pool.queued_count() > 2000
447                || self
448                    .mem_limit
449                    .map(|limit| self.memory_used() > limit)
450                    .unwrap_or(false)
451            {
452                thread::yield_now();
453            }
454            let r: Result<CacheObject, GitError> =
455                Pack::decode_pack_object(&mut reader, &mut offset);
456            match r {
457                Ok(mut obj) => {
458                    obj.set_mem_recorder(self.cache_objs_mem.clone());
459                    obj.record_mem_size();
460
461                    // Wrapper of Arc Params, for convenience to pass
462                    let params = Arc::new(SharedParams {
463                        pool: self.pool.clone(),
464                        waitlist: self.waitlist.clone(),
465                        caches: self.caches.clone(),
466                        cache_objs_mem_size: self.cache_objs_mem.clone(),
467                        callback: callback.clone(),
468                    });
469
470                    let caches = caches.clone();
471                    let waitlist = self.waitlist.clone();
472                    let kind = get_hash_kind();
473                    self.pool.execute(move || {
474                        set_hash_kind(kind);
475                        match obj.info {
476                            CacheObjectInfo::BaseObject(_, _) => {
477                                Self::cache_obj_and_process_waitlist(params, obj);
478                            }
479                            CacheObjectInfo::OffsetDelta(base_offset, _)
480                            | CacheObjectInfo::OffsetZstdelta(base_offset, _) => {
481                                if let Some(base_obj) = caches.get_by_offset(base_offset) {
482                                    Self::process_delta(params, obj, base_obj);
483                                } else {
484                                    // You can delete this 'if' block ↑, because there are Second check in 'else'
485                                    // It will be more readable, but the performance will be slightly reduced
486                                    waitlist.insert_offset(base_offset, obj);
487                                    // Second check: prevent that the base_obj thread has finished before the waitlist insert
488                                    if let Some(base_obj) = caches.get_by_offset(base_offset) {
489                                        Self::process_waitlist(params, base_obj);
490                                    }
491                                }
492                            }
493                            CacheObjectInfo::HashDelta(base_ref, _) => {
494                                if let Some(base_obj) = caches.get_by_hash(base_ref) {
495                                    Self::process_delta(params, obj, base_obj);
496                                } else {
497                                    waitlist.insert_ref(base_ref, obj);
498                                    if let Some(base_obj) = caches.get_by_hash(base_ref) {
499                                        Self::process_waitlist(params, base_obj);
500                                    }
501                                }
502                            }
503                        }
504                    });
505                }
506                Err(e) => {
507                    return Err(e);
508                }
509            }
510            i += 1;
511        }
512        log_info(i, self);
513        let render_hash = reader.final_hash();
514        self.signature = ObjectHash::from_stream(&mut reader).unwrap();
515
516        if render_hash != self.signature {
517            return Err(GitError::InvalidPackFile(format!(
518                "The pack file hash {} does not match the trailer hash {}",
519                render_hash, self.signature
520            )));
521        }
522
523        let end = utils::is_eof(&mut reader);
524        if !end {
525            return Err(GitError::InvalidPackFile(
526                "The pack file is not at the end".to_string(),
527            ));
528        }
529
530        self.pool.join(); // wait for all threads to finish
531
532        // send pack id for metadata
533        if let Some(pack_callback) = pack_id_callback {
534            pack_callback(self.signature);
535        }
536        // !Attention: Caches threadpool may not stop, but it's not a problem (garbage file data)
537        // So that files != self.number
538        assert_eq!(self.waitlist.map_offset.len(), 0);
539        assert_eq!(self.waitlist.map_ref.len(), 0);
540        assert_eq!(self.number, caches.total_inserted());
541        tracing::info!(
542            "The pack file has been decoded successfully, takes: [ {:?} ]",
543            time.elapsed()
544        );
545        self.caches.clear(); // clear cached objects & stop threads
546        assert_eq!(self.cache_objs_mem_used(), 0); // all the objs should be dropped until here
547
548        // impl in Drop Trait
549        // if self.clean_tmp {
550        //     self.caches.remove_tmp_dir();
551        // }
552
553        Ok(())
554    }
555
556    /// Decode a Pack in a new thread and send the CacheObjects while decoding.
557    /// <br> Attention: It will consume the `pack` and return in a JoinHandle.
558    pub fn decode_async(
559        mut self,
560        mut pack: impl BufRead + Send + 'static,
561        sender: UnboundedSender<Entry>,
562    ) -> JoinHandle<Pack> {
563        let kind = get_hash_kind();
564        thread::spawn(move || {
565            set_hash_kind(kind);
566            self.decode(
567                &mut pack,
568                move |entry| {
569                    if let Err(e) = sender.send(entry.inner) {
570                        eprintln!("Channel full, failed to send entry: {e:?}");
571                    }
572                },
573                None::<fn(ObjectHash)>,
574            )
575            .unwrap();
576            self
577        })
578    }
579
580    /// Decodes a `Pack` from a `Stream` of `Bytes`, and sends the `Entry` while decoding.
581    pub async fn decode_stream(
582        mut self,
583        mut stream: impl Stream<Item = Result<Bytes, Error>> + Unpin + Send + 'static,
584        sender: UnboundedSender<MetaAttached<Entry, EntryMeta>>,
585        pack_hash_send: Option<UnboundedSender<ObjectHash>>,
586    ) -> Self {
587        let kind = get_hash_kind();
588        let (tx, rx) = std::sync::mpsc::channel();
589        let mut reader = StreamBufReader::new(rx);
590        tokio::spawn(async move {
591            while let Some(chunk) = stream.next().await {
592                let data = chunk.unwrap().to_vec();
593                if let Err(e) = tx.send(data) {
594                    eprintln!("Sending Error: {e:?}");
595                    break;
596                }
597            }
598        });
599        // CPU-bound task, so use spawn_blocking
600        // DO NOT use thread::spawn, because it will block tokio runtime (if single-threaded runtime, like in tests)
601        tokio::task::spawn_blocking(move || {
602            set_hash_kind(kind);
603            self.decode(
604                &mut reader,
605                move |entry: MetaAttached<Entry, EntryMeta>| {
606                    // as we used unbound channel here, it will never full so can be send with synchronous
607                    if let Err(e) = sender.send(entry) {
608                        eprintln!("unbound channel Sending Error: {e:?}");
609                    }
610                },
611                Some(move |pack_id: ObjectHash| {
612                    if let Some(pack_id_send) = pack_hash_send
613                        && let Err(e) = pack_id_send.send(pack_id)
614                    {
615                        eprintln!("unbound channel Sending Error: {e:?}");
616                    }
617                }),
618            )
619            .unwrap();
620            self
621        })
622        .await
623        .unwrap()
624    }
625
626    /// CacheObjects + Index size of Caches
627    fn memory_used(&self) -> usize {
628        self.cache_objs_mem_used() + self.caches.memory_used_index()
629    }
630
631    /// The total memory used by the CacheObjects of this Pack
632    fn cache_objs_mem_used(&self) -> usize {
633        self.cache_objs_mem.load(Ordering::Acquire)
634    }
635
636    /// Rebuild the Delta Object in a new thread & process the objects waiting for it recursively.
637    /// <br> This function must be *static*, because [&self] can't be moved into a new thread.
638    fn process_delta(
639        shared_params: Arc<SharedParams>,
640        delta_obj: CacheObject,
641        base_obj: Arc<CacheObject>,
642    ) {
643        shared_params.pool.clone().execute(move || {
644            let mut new_obj = match delta_obj.info {
645                CacheObjectInfo::OffsetDelta(_, _) | CacheObjectInfo::HashDelta(_, _) => {
646                    Pack::rebuild_delta(delta_obj, base_obj)
647                }
648                CacheObjectInfo::OffsetZstdelta(_, _) => {
649                    Pack::rebuild_zstdelta(delta_obj, base_obj)
650                }
651                _ => unreachable!(),
652            };
653
654            new_obj.set_mem_recorder(shared_params.cache_objs_mem_size.clone());
655            new_obj.record_mem_size();
656            Self::cache_obj_and_process_waitlist(shared_params, new_obj); //Indirect Recursion
657        });
658    }
659
660    /// Cache the new object & process the objects waiting for it (in multi-threading).
661    fn cache_obj_and_process_waitlist(shared_params: Arc<SharedParams>, new_obj: CacheObject) {
662        (shared_params.callback)(new_obj.to_entry_metadata());
663        let new_obj = shared_params.caches.insert(
664            new_obj.offset,
665            new_obj.base_object_hash().unwrap(),
666            new_obj,
667        );
668        Self::process_waitlist(shared_params, new_obj);
669    }
670
671    fn process_waitlist(shared_params: Arc<SharedParams>, base_obj: Arc<CacheObject>) {
672        let wait_objs = shared_params
673            .waitlist
674            .take(base_obj.offset, base_obj.base_object_hash().unwrap());
675        for obj in wait_objs {
676            // Process the objects waiting for the new object(base_obj = new_obj)
677            Self::process_delta(shared_params.clone(), obj, base_obj.clone());
678        }
679    }
680
681    /// Reconstruct the Delta Object based on the "base object"
682    /// and return the new object.
683    pub fn rebuild_delta(delta_obj: CacheObject, base_obj: Arc<CacheObject>) -> CacheObject {
684        const COPY_INSTRUCTION_FLAG: u8 = 1 << 7;
685        const COPY_OFFSET_BYTES: u8 = 4;
686        const COPY_SIZE_BYTES: u8 = 3;
687        const COPY_ZERO_SIZE: usize = 0x10000;
688
689        let mut stream = Cursor::new(&delta_obj.data_decompressed);
690
691        // Read the base object size
692        // (Size Encoding)
693        let (base_size, result_size) = utils::read_delta_object_size(&mut stream).unwrap();
694
695        // Get the base object data
696        let base_info = &base_obj.data_decompressed;
697        assert_eq!(base_info.len(), base_size, "Base object size mismatch");
698
699        let mut result = Vec::with_capacity(result_size);
700
701        loop {
702            // Check if the stream has ended, meaning the new object is done
703            let instruction = match utils::read_bytes(&mut stream) {
704                Ok([instruction]) => instruction,
705                Err(err) if err.kind() == ErrorKind::UnexpectedEof => break,
706                Err(err) => {
707                    panic!(
708                        "{}",
709                        GitError::DeltaObjectError(format!("Wrong instruction in delta :{err}"))
710                    );
711                }
712            };
713
714            if instruction & COPY_INSTRUCTION_FLAG == 0 {
715                // Data instruction; the instruction byte specifies the number of data bytes
716                if instruction == 0 {
717                    // Appending 0 bytes doesn't make sense, so git disallows it
718                    panic!(
719                        "{}",
720                        GitError::DeltaObjectError(String::from("Invalid data instruction"))
721                    );
722                }
723
724                // Append the provided bytes
725                let mut data = vec![0; instruction as usize];
726                stream.read_exact(&mut data).unwrap();
727                result.extend_from_slice(&data);
728            } else {
729                // Copy instruction
730                // +----------+---------+---------+---------+---------+-------+-------+-------+
731                // | 1xxxxxxx | offset1 | offset2 | offset3 | offset4 | size1 | size2 | size3 |
732                // +----------+---------+---------+---------+---------+-------+-------+-------+
733                let mut nonzero_bytes = instruction;
734                let offset =
735                    utils::read_partial_int(&mut stream, COPY_OFFSET_BYTES, &mut nonzero_bytes)
736                        .unwrap();
737                let mut size =
738                    utils::read_partial_int(&mut stream, COPY_SIZE_BYTES, &mut nonzero_bytes)
739                        .unwrap();
740                if size == 0 {
741                    // Copying 0 bytes doesn't make sense, so git assumes a different size
742                    size = COPY_ZERO_SIZE;
743                }
744                // Copy bytes from the base object
745                let base_data = base_info.get(offset..(offset + size)).ok_or_else(|| {
746                    GitError::DeltaObjectError("Invalid copy instruction".to_string())
747                });
748
749                match base_data {
750                    Ok(data) => result.extend_from_slice(data),
751                    Err(e) => panic!("{}", e),
752                }
753            }
754        }
755        assert_eq!(result_size, result.len(), "Result size mismatch");
756
757        let hash = utils::calculate_object_hash(base_obj.object_type(), &result);
758        // create new obj from `delta_obj` & `result` instead of modifying `delta_obj` for heap-size recording
759        CacheObject {
760            info: CacheObjectInfo::BaseObject(base_obj.object_type(), hash),
761            offset: delta_obj.offset,
762            crc32: delta_obj.crc32,
763            data_decompressed: result,
764            mem_recorder: None,
765            is_delta_in_pack: delta_obj.is_delta_in_pack,
766        } // Canonical form (Complete Object)
767        // Memory recording will happen after this function returns. See `process_delta`
768    }
769    pub fn rebuild_zstdelta(delta_obj: CacheObject, base_obj: Arc<CacheObject>) -> CacheObject {
770        let result = zstdelta::apply(&base_obj.data_decompressed, &delta_obj.data_decompressed)
771            .expect("Failed to apply zstdelta");
772        let hash = utils::calculate_object_hash(base_obj.object_type(), &result);
773        CacheObject {
774            info: CacheObjectInfo::BaseObject(base_obj.object_type(), hash),
775            offset: delta_obj.offset,
776            crc32: delta_obj.crc32,
777            data_decompressed: result,
778            mem_recorder: None,
779            is_delta_in_pack: delta_obj.is_delta_in_pack,
780        } // Canonical form (Complete Object)
781        // Memory recording will happen after this function returns. See `process_delta`
782    }
783}
784
785#[cfg(test)]
786mod tests {
787    use std::{
788        env, fs,
789        io::{BufReader, Cursor, prelude::*},
790        path::PathBuf,
791        sync::{
792            Arc,
793            atomic::{AtomicUsize, Ordering},
794        },
795    };
796
797    use flate2::{Compression, write::ZlibEncoder};
798    use futures_util::TryStreamExt;
799    use tokio_util::io::ReaderStream;
800
801    use crate::{
802        hash::{HashKind, ObjectHash, set_hash_kind},
803        internal::pack::{Pack, tests::init_logger},
804    };
805
806    #[tokio::test]
807    async fn test_pack_check_header() {
808        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
809        source.push("tests/data/packs/medium-sha1.pack");
810
811        let f = fs::File::open(source).unwrap();
812        let mut buf_reader = BufReader::new(f);
813        let (object_num, _) = Pack::check_header(&mut buf_reader).unwrap();
814
815        assert_eq!(object_num, 35031);
816    }
817
818    #[test]
819    fn test_decompress_data() {
820        let data = b"Hello, world!"; // Sample data to compress and then decompress
821        let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
822        encoder.write_all(data).unwrap();
823        let compressed_data = encoder.finish().unwrap();
824        let compressed_size = compressed_data.len();
825
826        // Create a cursor for the compressed data to simulate a BufRead source
827        let mut cursor: Cursor<Vec<u8>> = Cursor::new(compressed_data);
828        let expected_size = data.len();
829
830        // Decompress the data and assert correctness
831        let result = Pack::decompress_data(&mut cursor, expected_size);
832        match result {
833            Ok((decompressed_data, bytes_read)) => {
834                assert_eq!(bytes_read, compressed_size);
835                assert_eq!(decompressed_data, data);
836            }
837            Err(e) => panic!("Decompression failed: {e:?}"),
838        }
839    }
840
841    /// Helper function to run decode tests without delta objects
842    fn run_decode_no_delta(rel_path: &str, kind: HashKind) {
843        let _guard = set_hash_kind(kind);
844        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
845        source.push(rel_path);
846
847        let tmp = PathBuf::from("/tmp/.cache_temp");
848
849        let f = fs::File::open(source).unwrap();
850        let mut buffered = BufReader::new(f);
851        let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
852        p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
853            .unwrap();
854    }
855    #[test]
856    fn test_pack_decode_without_delta() {
857        run_decode_no_delta("tests/data/packs/small-sha1.pack", HashKind::Sha1);
858        run_decode_no_delta("tests/data/packs/small-sha256.pack", HashKind::Sha256);
859    }
860
861    /// Helper function to run decode tests with delta objects
862    fn run_decode_with_ref_delta(rel_path: &str, kind: HashKind) {
863        let _guard = set_hash_kind(kind);
864        init_logger();
865
866        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
867        source.push(rel_path);
868
869        let tmp = PathBuf::from("/tmp/.cache_temp");
870
871        let f = fs::File::open(source).unwrap();
872        let mut buffered = BufReader::new(f);
873        let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
874        p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
875            .unwrap();
876    }
877    #[test]
878    fn test_pack_decode_with_ref_delta() {
879        run_decode_with_ref_delta("tests/data/packs/ref-delta-sha1.pack", HashKind::Sha1);
880        run_decode_with_ref_delta("tests/data/packs/ref-delta-sha256.pack", HashKind::Sha256);
881    }
882
883    /// Helper function to run decode tests without memory limit
884    fn run_decode_no_mem_limit(rel_path: &str, kind: HashKind) {
885        let _guard = set_hash_kind(kind);
886        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
887        source.push(rel_path);
888
889        let tmp = PathBuf::from("/tmp/.cache_temp");
890
891        let f = fs::File::open(source).unwrap();
892        let mut buffered = BufReader::new(f);
893        let mut p = Pack::new(None, None, Some(tmp), true);
894        p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
895            .unwrap();
896    }
897    #[test]
898    fn test_pack_decode_no_mem_limit() {
899        run_decode_no_mem_limit("tests/data/packs/small-sha1.pack", HashKind::Sha1);
900        run_decode_no_mem_limit("tests/data/packs/small-sha256.pack", HashKind::Sha256);
901    }
902
903    /// Helper function to run decode tests with delta objects
904    async fn run_decode_large_with_delta(rel_path: &str, kind: HashKind) {
905        let _guard = set_hash_kind(kind);
906        init_logger();
907        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
908        source.push(rel_path);
909
910        let tmp = PathBuf::from("/tmp/.cache_temp");
911
912        let f = fs::File::open(source).unwrap();
913        let mut buffered = BufReader::new(f);
914        let mut p = Pack::new(
915            Some(4),
916            Some(1024 * 1024 * 100), //try to avoid dead lock on CI servers with low memory
917            Some(tmp.clone()),
918            true,
919        );
920        let rt = p.decode(
921            &mut buffered,
922            |_obj| {
923                // println!("{:?} {}", obj.hash.to_string(), offset);
924            },
925            None::<fn(ObjectHash)>,
926        );
927        if let Err(e) = rt {
928            fs::remove_dir_all(tmp).unwrap();
929            panic!("Error: {e:?}");
930        }
931    }
932    #[tokio::test]
933    async fn test_pack_decode_with_large_file_with_delta_without_ref() {
934        run_decode_large_with_delta("tests/data/packs/medium-sha1.pack", HashKind::Sha1).await;
935        run_decode_large_with_delta("tests/data/packs/medium-sha256.pack", HashKind::Sha256).await;
936    } // it will be stuck on dropping `Pack` on Windows if `mem_size` is None, so we need `mimalloc`
937
938    /// Helper function to run decode tests with large file stream
939    async fn run_decode_large_stream(rel_path: &str, kind: HashKind) {
940        let _guard = set_hash_kind(kind);
941        init_logger();
942        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
943        source.push(rel_path);
944
945        let tmp = PathBuf::from("/tmp/.cache_temp");
946        let f = tokio::fs::File::open(source).await.unwrap();
947        let stream = ReaderStream::new(f).map_err(axum::Error::new);
948        let p = Pack::new(Some(4), Some(1024 * 1024 * 100), Some(tmp.clone()), true);
949
950        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
951        let handle = tokio::spawn(async move { p.decode_stream(stream, tx, None).await });
952        let count = Arc::new(AtomicUsize::new(0));
953        let count_c = count.clone();
954        // in tests, RUNTIME is single-threaded, so `sync code` will block the tokio runtime
955        let consume = tokio::spawn(async move {
956            let mut cnt = 0;
957            while let Some(_entry) = rx.recv().await {
958                cnt += 1;
959            }
960            tracing::info!("Received: {}", cnt);
961            count_c.store(cnt, Ordering::Release);
962        });
963        let p = handle.await.unwrap();
964        consume.await.unwrap();
965        assert_eq!(count.load(Ordering::Acquire), p.number);
966        assert_eq!(p.number, 35031);
967    }
968    #[tokio::test]
969    async fn test_decode_large_file_stream() {
970        run_decode_large_stream("tests/data/packs/medium-sha1.pack", HashKind::Sha1).await;
971        run_decode_large_stream("tests/data/packs/medium-sha256.pack", HashKind::Sha256).await;
972    }
973
974    /// Helper function to run decode tests with large file async
975    async fn run_decode_large_file_async(rel_path: &str, kind: HashKind) {
976        let _guard = set_hash_kind(kind);
977        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
978        source.push(rel_path);
979
980        let tmp = PathBuf::from("/tmp/.cache_temp");
981        let f = fs::File::open(source).unwrap();
982        let buffered = BufReader::new(f);
983        let p = Pack::new(Some(4), Some(1024 * 1024 * 100), Some(tmp.clone()), true);
984
985        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
986        let handle = p.decode_async(buffered, tx); // new thread
987        let mut cnt = 0;
988        while let Some(_entry) = rx.recv().await {
989            cnt += 1; //use entry here
990        }
991        let p = handle.join().unwrap();
992        assert_eq!(cnt, p.number);
993    }
994    #[tokio::test]
995    async fn test_decode_large_file_async() {
996        run_decode_large_file_async("tests/data/packs/medium-sha1.pack", HashKind::Sha1).await;
997        run_decode_large_file_async("tests/data/packs/medium-sha256.pack", HashKind::Sha256).await;
998    }
999
1000    /// Helper function to run decode tests with delta objects without reference
1001    fn run_decode_with_delta_no_ref(rel_path: &str, kind: HashKind) {
1002        let _guard = set_hash_kind(kind);
1003        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1004        source.push(rel_path);
1005
1006        let tmp = PathBuf::from("/tmp/.cache_temp");
1007
1008        let f = fs::File::open(source).unwrap();
1009        let mut buffered = BufReader::new(f);
1010        let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
1011        p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
1012            .unwrap();
1013    }
1014    #[test]
1015    fn test_pack_decode_with_delta_without_ref() {
1016        run_decode_with_delta_no_ref("tests/data/packs/medium-sha1.pack", HashKind::Sha1);
1017        run_decode_with_delta_no_ref("tests/data/packs/medium-sha256.pack", HashKind::Sha256);
1018    }
1019
1020    #[test] // Take too long time
1021    fn test_pack_decode_multi_task_with_large_file_with_delta_without_ref() {
1022        let rt = tokio::runtime::Builder::new_current_thread()
1023            .enable_all()
1024            .build()
1025            .unwrap();
1026        rt.block_on(async move {
1027            // For each hash kind, run two decode tasks concurrently to simulate multi-task pressure.
1028            for (kind, path) in [
1029                (HashKind::Sha1, "tests/data/packs/medium-sha1.pack"),
1030                (HashKind::Sha256, "tests/data/packs/medium-sha256.pack"),
1031            ] {
1032                let f1 = run_decode_large_with_delta(path, kind);
1033                let f2 = run_decode_large_with_delta(path, kind);
1034                let _ = futures::future::join(f1, f2).await;
1035            }
1036        });
1037    }
1038}