git_internal/internal/pack/
decode.rs

1use std::io::{self, BufRead, Cursor, ErrorKind, Read};
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::thread::{self, JoinHandle};
6use std::time::Instant;
7
8use axum::Error;
9use bytes::Bytes;
10use flate2::bufread::ZlibDecoder;
11use futures_util::{Stream, StreamExt};
12use threadpool::ThreadPool;
13use tokio::sync::mpsc::UnboundedSender;
14use uuid::Uuid;
15
16use crate::errors::GitError;
17use crate::hash::{ObjectHash, get_hash_kind, set_hash_kind};
18
19use crate::internal::metadata::{EntryMeta, MetaAttached};
20use crate::zstdelta;
21
22use crate::internal::object::types::ObjectType;
23
24use super::cache_object::CacheObjectInfo;
25use crate::internal::pack::cache::_Cache;
26use crate::internal::pack::cache::Caches;
27use crate::internal::pack::cache_object::{CacheObject, MemSizeRecorder};
28use crate::internal::pack::channel_reader::StreamBufReader;
29use crate::internal::pack::entry::Entry;
30use crate::internal::pack::waitlist::Waitlist;
31use crate::internal::pack::wrapper::Wrapper;
32use crate::internal::pack::{DEFAULT_TMP_DIR, Pack, utils};
33use crate::utils::CountingReader;
34
35/// For the convenience of passing parameters
36struct SharedParams {
37    pub pool: Arc<ThreadPool>,
38    pub waitlist: Arc<Waitlist>,
39    pub caches: Arc<Caches>,
40    pub cache_objs_mem_size: Arc<AtomicUsize>,
41    pub callback: Arc<dyn Fn(MetaAttached<Entry, EntryMeta>) + Sync + Send>,
42}
43
44impl Drop for Pack {
45    fn drop(&mut self) {
46        if self.clean_tmp {
47            self.caches.remove_tmp_dir();
48        }
49    }
50}
51
52impl Pack {
53    /// # Parameters
54    /// - `thread_num`: The number of threads to use for decoding and cache, `None` mean use the number of logical CPUs.
55    ///   It can't be zero, or panic <br>
56    /// - `mem_limit`: The maximum size of the memory cache in bytes, or None for unlimited.
57    ///   The 80% of it will be used for [Caches]  <br>
58    ///   ​**Not very accurate, because of memory alignment and other reasons, overuse about 15%** <br>
59    /// - `temp_path`: The path to a directory for temporary files, default is "./.cache_temp" <br>
60    ///   For example, thread_num = 4 will use up to 8 threads (4 for decoding and 4 for cache) <br>
61    /// - `clean_tmp`: whether to remove temp directory when Pack is dropped
62    pub fn new(
63        thread_num: Option<usize>,
64        mem_limit: Option<usize>,
65        temp_path: Option<PathBuf>,
66        clean_tmp: bool,
67    ) -> Self {
68        let mut temp_path = temp_path.unwrap_or(PathBuf::from(DEFAULT_TMP_DIR));
69        // add 8 random characters as subdirectory, check if the directory exists
70        loop {
71            let sub_dir = Uuid::new_v4().to_string()[..8].to_string();
72            temp_path.push(sub_dir);
73            if !temp_path.exists() {
74                break;
75            }
76            temp_path.pop();
77        }
78        let thread_num = thread_num.unwrap_or_else(num_cpus::get);
79        let cache_mem_size = mem_limit.map(|mem_limit| mem_limit * 4 / 5);
80        Pack {
81            number: 0,
82            signature: ObjectHash::default(),
83            objects: Vec::new(),
84            pool: Arc::new(ThreadPool::new(thread_num)),
85            waitlist: Arc::new(Waitlist::new()),
86            caches: Arc::new(Caches::new(cache_mem_size, temp_path, thread_num)),
87            mem_limit,
88            cache_objs_mem: Arc::new(AtomicUsize::default()),
89            clean_tmp,
90        }
91    }
92
93    /// Checks and reads the header of a Git pack file.
94    ///
95    /// This function reads the first 12 bytes of a pack file, which include the b"PACK" magic identifier,
96    /// the version number, and the number of objects in the pack. It verifies that the magic identifier
97    /// is correct and that the version number is 2 (which is the version currently supported by Git).
98    /// It also collects these header bytes for later use, such as for hashing the entire pack file.
99    ///
100    /// # Parameters
101    /// * `pack` - A mutable reference to an object implementing the `Read` trait,
102    ///   representing the source of the pack file data (e.g., file, memory stream).
103    ///
104    /// # Returns
105    /// A `Result` which is:
106    /// * `Ok((u32, Vec<u8>))`: On successful reading and validation of the header, returns a tuple where:
107    ///     - The first element is the number of objects in the pack file (`u32`).
108    ///     - The second element is a vector containing the bytes of the pack file header (`Vec<u8>`).
109    /// * `Err(GitError)`: On failure, returns a [`GitError`] with a description of the issue.
110    ///
111    /// # Errors
112    /// This function can return an error in the following situations:
113    /// * If the pack file does not start with the "PACK" magic identifier.
114    /// * If the pack file's version number is not 2.
115    /// * If there are any issues reading from the provided `pack` source.
116    pub fn check_header(pack: &mut impl BufRead) -> Result<(u32, Vec<u8>), GitError> {
117        // A vector to store the header data for hashing later
118        let mut header_data = Vec::new();
119
120        // Read the first 4 bytes which should be "PACK"
121        let mut magic = [0; 4];
122        // Read the magic "PACK" identifier
123        let result = pack.read_exact(&mut magic);
124        match result {
125            Ok(_) => {
126                // Store these bytes for later
127                header_data.extend_from_slice(&magic);
128
129                // Check if the magic bytes match "PACK"
130                if magic != *b"PACK" {
131                    // If not, return an error indicating invalid pack header
132                    return Err(GitError::InvalidPackHeader(format!(
133                        "{},{},{},{}",
134                        magic[0], magic[1], magic[2], magic[3]
135                    )));
136                }
137            }
138            Err(e) => {
139                // If there is an error in reading, return a GitError
140                return Err(GitError::InvalidPackFile(format!(
141                    "Error reading magic identifier: {e}"
142                )));
143            }
144        }
145
146        // Read the next 4 bytes for the version number
147        let mut version_bytes = [0; 4];
148        let result = pack.read_exact(&mut version_bytes); // Read the version number
149        match result {
150            Ok(_) => {
151                // Store these bytes
152                header_data.extend_from_slice(&version_bytes);
153
154                // Convert the version bytes to an u32 integer
155                let version = u32::from_be_bytes(version_bytes);
156                if version != 2 {
157                    // Git currently supports version 2, so error if not version 2
158                    return Err(GitError::InvalidPackFile(format!(
159                        "Version Number is {version}, not 2"
160                    )));
161                }
162            }
163            Err(e) => {
164                // If there is an error in reading, return a GitError
165                return Err(GitError::InvalidPackFile(format!(
166                    "Error reading version number: {e}"
167                )));
168            }
169        }
170
171        // Read the next 4 bytes for the number of objects in the pack
172        let mut object_num_bytes = [0; 4];
173        // Read the number of objects
174        let result = pack.read_exact(&mut object_num_bytes);
175        match result {
176            Ok(_) => {
177                // Store these bytes
178                header_data.extend_from_slice(&object_num_bytes);
179                // Convert the object number bytes to an u32 integer
180                let object_num = u32::from_be_bytes(object_num_bytes);
181                // Return the number of objects and the header data for further processing
182                Ok((object_num, header_data))
183            }
184            Err(e) => {
185                // If there is an error in reading, return a GitError
186                Err(GitError::InvalidPackFile(format!(
187                    "Error reading object number: {e}"
188                )))
189            }
190        }
191    }
192
193    /// Decompresses data from a given Read and BufRead source using Zlib decompression.
194    ///
195    /// # Parameters
196    /// * `pack`: A source that implements both Read and BufRead traits (e.g., file, network stream).
197    /// * `expected_size`: The expected decompressed size of the data.
198    ///
199    /// # Returns
200    /// Returns a `Result` containing either:
201    /// * A tuple with a `Vec<u8>` of the decompressed data and the total number of input bytes processed,
202    /// * Or a `GitError` in case of a mismatch in expected size or any other reading error.
203    ///
204    pub fn decompress_data(
205        pack: &mut (impl BufRead + Send),
206        expected_size: usize,
207    ) -> Result<(Vec<u8>, usize), GitError> {
208        // Create a buffer with the expected size for the decompressed data
209        let mut buf = Vec::with_capacity(expected_size);
210
211        let mut counting_reader = CountingReader::new(pack);
212        // Create a new Zlib decoder with the original data
213        //let mut deflate = ZlibDecoder::new(pack);
214        let mut deflate = ZlibDecoder::new(&mut counting_reader);
215        // Attempt to read data to the end of the buffer
216        match deflate.read_to_end(&mut buf) {
217            Ok(_) => {
218                // Check if the length of the buffer matches the expected size
219                if buf.len() != expected_size {
220                    Err(GitError::InvalidPackFile(format!(
221                        "The object size {} does not match the expected size {}",
222                        buf.len(),
223                        expected_size
224                    )))
225                } else {
226                    // If everything is as expected, return the buffer, the original data, and the total number of input bytes processed
227                    let actual_input_bytes = counting_reader.bytes_read as usize;
228                    Ok((buf, actual_input_bytes))
229                }
230            }
231            Err(e) => {
232                // If there is an error in reading, return a GitError
233                Err(GitError::InvalidPackFile(format!(
234                    "Decompression error: {e}"
235                )))
236            }
237        }
238    }
239
240    /// Decodes a pack object from a given Read and BufRead source and returns the object as a [`CacheObject`].
241    ///
242    /// # Parameters
243    /// * `pack`: A source that implements both Read and BufRead traits.
244    /// * `offset`: A mutable reference to the current offset within the pack.
245    ///
246    /// # Returns
247    /// Returns a `Result` containing either:
248    /// * A tuple of the next offset in the pack and the original compressed data as `Vec<u8>`,
249    /// * Or a `GitError` in case of any reading or decompression error.
250    ///
251    pub fn decode_pack_object(
252        pack: &mut (impl BufRead + Send),
253        offset: &mut usize,
254    ) -> Result<CacheObject, GitError> {
255        let init_offset = *offset;
256
257        // Attempt to read the type and size, handle potential errors
258        let (type_bits, size) = match utils::read_type_and_varint_size(pack, offset) {
259            Ok(result) => result,
260            Err(e) => {
261                // Handle the error e.g., by logging it or converting it to GitError
262                // and then return from the function
263                return Err(GitError::InvalidPackFile(format!("Read error: {e}")));
264            }
265        };
266
267        // Check if the object type is valid
268        let t = ObjectType::from_u8(type_bits)?;
269
270        match t {
271            ObjectType::Commit | ObjectType::Tree | ObjectType::Blob | ObjectType::Tag => {
272                let (data, raw_size) = Pack::decompress_data(pack, size)?;
273                *offset += raw_size;
274                Ok(CacheObject::new_for_undeltified(t, data, init_offset))
275            }
276            ObjectType::OffsetDelta | ObjectType::OffsetZstdelta => {
277                let (delta_offset, bytes) = utils::read_offset_encoding(pack).unwrap();
278                *offset += bytes;
279
280                let (data, raw_size) = Pack::decompress_data(pack, size)?;
281                *offset += raw_size;
282
283                // Count the base object offset: the current offset - delta offset
284                let base_offset = init_offset
285                    .checked_sub(delta_offset as usize)
286                    .ok_or_else(|| {
287                        GitError::InvalidObjectInfo("Invalid OffsetDelta offset".to_string())
288                    })
289                    .unwrap();
290
291                let mut reader = Cursor::new(&data);
292                let (_, final_size) = utils::read_delta_object_size(&mut reader)?;
293
294                let obj_info = match t {
295                    ObjectType::OffsetDelta => {
296                        CacheObjectInfo::OffsetDelta(base_offset, final_size)
297                    }
298                    ObjectType::OffsetZstdelta => {
299                        CacheObjectInfo::OffsetZstdelta(base_offset, final_size)
300                    }
301                    _ => unreachable!(),
302                };
303                Ok(CacheObject {
304                    info: obj_info,
305                    offset: init_offset,
306                    data_decompressed: data,
307                    mem_recorder: None,
308                    is_delta_in_pack: true,
309                })
310            }
311            ObjectType::HashDelta => {
312                // Read hash bytes to get the reference object hash(size depends on hash kind,e.g.,20 for SHA1,32 for SHA256)
313                let ref_sha = ObjectHash::from_stream(pack).unwrap();
314                // Offset is incremented by 20/32 bytes
315                *offset += get_hash_kind().size();
316
317                let (data, raw_size) = Pack::decompress_data(pack, size)?;
318                *offset += raw_size;
319
320                let mut reader = Cursor::new(&data);
321                let (_, final_size) = utils::read_delta_object_size(&mut reader)?;
322
323                Ok(CacheObject {
324                    info: CacheObjectInfo::HashDelta(ref_sha, final_size),
325                    offset: init_offset,
326                    data_decompressed: data,
327                    mem_recorder: None,
328                    is_delta_in_pack: true,
329                })
330            }
331        }
332    }
333
334    /// Decodes a pack file from a given Read and BufRead source, for each object in the pack,
335    /// it decodes the object and processes it using the provided callback function.
336    ///
337    /// # Parameters
338    /// * pack_id_callback: A callback that seed pack_file sha1 for updating database
339    ///
340    pub fn decode<F, C>(
341        &mut self,
342        pack: &mut (impl BufRead + Send),
343        callback: F,
344        pack_id_callback: Option<C>,
345    ) -> Result<(), GitError>
346    where
347        F: Fn(MetaAttached<Entry, EntryMeta>) + Sync + Send + 'static,
348        C: FnOnce(ObjectHash) + Send + 'static,
349    {
350        let time = Instant::now();
351        let mut last_update_time = time.elapsed().as_millis();
352        let log_info = |_i: usize, pack: &Pack| {
353            tracing::info!(
354                "time {:.2} s \t decode: {:?} \t dec-num: {} \t cah-num: {} \t Objs: {} MB \t CacheUsed: {} MB",
355                time.elapsed().as_millis() as f64 / 1000.0,
356                _i,
357                pack.pool.queued_count(),
358                pack.caches.queued_tasks(),
359                pack.cache_objs_mem_used() / 1024 / 1024,
360                pack.caches.memory_used() / 1024 / 1024
361            );
362        };
363        let callback = Arc::new(callback);
364
365        let caches = self.caches.clone();
366        let mut reader = Wrapper::new(io::BufReader::new(pack));
367
368        let result = Pack::check_header(&mut reader);
369        match result {
370            Ok((object_num, _)) => {
371                self.number = object_num as usize;
372            }
373            Err(e) => {
374                return Err(e);
375            }
376        }
377        tracing::info!("The pack file has {} objects", self.number);
378        let mut offset: usize = 12;
379        let mut i = 0;
380        while i < self.number {
381            // log per 1000 objects and 1 second
382            if i % 1000 == 0 {
383                let time_now = time.elapsed().as_millis();
384                if time_now - last_update_time > 1000 {
385                    log_info(i, self);
386                    last_update_time = time_now;
387                }
388            }
389            // 3 parts: Waitlist + TheadPool + Caches
390            // hardcode the limit of the tasks of threads_pool queue, to limit memory
391            while self.pool.queued_count() > 2000
392                || self
393                    .mem_limit
394                    .map(|limit| self.memory_used() > limit)
395                    .unwrap_or(false)
396            {
397                thread::yield_now();
398            }
399            let r: Result<CacheObject, GitError> =
400                Pack::decode_pack_object(&mut reader, &mut offset);
401            match r {
402                Ok(mut obj) => {
403                    obj.set_mem_recorder(self.cache_objs_mem.clone());
404                    obj.record_mem_size();
405
406                    // Wrapper of Arc Params, for convenience to pass
407                    let params = Arc::new(SharedParams {
408                        pool: self.pool.clone(),
409                        waitlist: self.waitlist.clone(),
410                        caches: self.caches.clone(),
411                        cache_objs_mem_size: self.cache_objs_mem.clone(),
412                        callback: callback.clone(),
413                    });
414
415                    let caches = caches.clone();
416                    let waitlist = self.waitlist.clone();
417                    let kind = get_hash_kind();
418                    self.pool.execute(move || {
419                        set_hash_kind(kind);
420                        match obj.info {
421                            CacheObjectInfo::BaseObject(_, _) => {
422                                Self::cache_obj_and_process_waitlist(params, obj);
423                            }
424                            CacheObjectInfo::OffsetDelta(base_offset, _)
425                            | CacheObjectInfo::OffsetZstdelta(base_offset, _) => {
426                                if let Some(base_obj) = caches.get_by_offset(base_offset) {
427                                    Self::process_delta(params, obj, base_obj);
428                                } else {
429                                    // You can delete this 'if' block ↑, because there are Second check in 'else'
430                                    // It will be more readable, but the performance will be slightly reduced
431                                    waitlist.insert_offset(base_offset, obj);
432                                    // Second check: prevent that the base_obj thread has finished before the waitlist insert
433                                    if let Some(base_obj) = caches.get_by_offset(base_offset) {
434                                        Self::process_waitlist(params, base_obj);
435                                    }
436                                }
437                            }
438                            CacheObjectInfo::HashDelta(base_ref, _) => {
439                                if let Some(base_obj) = caches.get_by_hash(base_ref) {
440                                    Self::process_delta(params, obj, base_obj);
441                                } else {
442                                    waitlist.insert_ref(base_ref, obj);
443                                    if let Some(base_obj) = caches.get_by_hash(base_ref) {
444                                        Self::process_waitlist(params, base_obj);
445                                    }
446                                }
447                            }
448                        }
449                    });
450                }
451                Err(e) => {
452                    return Err(e);
453                }
454            }
455            i += 1;
456        }
457        log_info(i, self);
458        let render_hash = reader.final_hash();
459        self.signature = ObjectHash::from_stream(&mut reader).unwrap();
460
461        if render_hash != self.signature {
462            return Err(GitError::InvalidPackFile(format!(
463                "The pack file hash {} does not match the trailer hash {}",
464                render_hash, self.signature
465            )));
466        }
467
468        let end = utils::is_eof(&mut reader);
469        if !end {
470            return Err(GitError::InvalidPackFile(
471                "The pack file is not at the end".to_string(),
472            ));
473        }
474
475        self.pool.join(); // wait for all threads to finish
476
477        // send pack id for metadata
478        if let Some(pack_callback) = pack_id_callback {
479            pack_callback(self.signature);
480        }
481        // !Attention: Caches threadpool may not stop, but it's not a problem (garbage file data)
482        // So that files != self.number
483        assert_eq!(self.waitlist.map_offset.len(), 0);
484        assert_eq!(self.waitlist.map_ref.len(), 0);
485        assert_eq!(self.number, caches.total_inserted());
486        tracing::info!(
487            "The pack file has been decoded successfully, takes: [ {:?} ]",
488            time.elapsed()
489        );
490        self.caches.clear(); // clear cached objects & stop threads
491        assert_eq!(self.cache_objs_mem_used(), 0); // all the objs should be dropped until here
492
493        // impl in Drop Trait
494        // if self.clean_tmp {
495        //     self.caches.remove_tmp_dir();
496        // }
497
498        Ok(())
499    }
500
501    /// Decode a Pack in a new thread and send the CacheObjects while decoding.
502    /// <br> Attention: It will consume the `pack` and return in a JoinHandle.
503    pub fn decode_async(
504        mut self,
505        mut pack: impl BufRead + Send + 'static,
506        sender: UnboundedSender<Entry>,
507    ) -> JoinHandle<Pack> {
508        let kind = get_hash_kind();
509        thread::spawn(move || {
510            set_hash_kind(kind);
511            self.decode(
512                &mut pack,
513                move |entry| {
514                    if let Err(e) = sender.send(entry.inner) {
515                        eprintln!("Channel full, failed to send entry: {e:?}");
516                    }
517                },
518                None::<fn(ObjectHash)>,
519            )
520            .unwrap();
521            self
522        })
523    }
524
525    /// Decodes a `Pack` from a `Stream` of `Bytes`, and sends the `Entry` while decoding.
526    pub async fn decode_stream(
527        mut self,
528        mut stream: impl Stream<Item = Result<Bytes, Error>> + Unpin + Send + 'static,
529        sender: UnboundedSender<MetaAttached<Entry, EntryMeta>>,
530        pack_hash_send: Option<UnboundedSender<ObjectHash>>,
531    ) -> Self {
532        let kind = get_hash_kind();
533        let (tx, rx) = std::sync::mpsc::channel();
534        let mut reader = StreamBufReader::new(rx);
535        tokio::spawn(async move {
536            while let Some(chunk) = stream.next().await {
537                let data = chunk.unwrap().to_vec();
538                if let Err(e) = tx.send(data) {
539                    eprintln!("Sending Error: {e:?}");
540                    break;
541                }
542            }
543        });
544        // CPU-bound task, so use spawn_blocking
545        // DO NOT use thread::spawn, because it will block tokio runtime (if single-threaded runtime, like in tests)
546        tokio::task::spawn_blocking(move || {
547            set_hash_kind(kind);
548            self.decode(
549                &mut reader,
550                move |entry: MetaAttached<Entry, EntryMeta>| {
551                    // as we used unbound channel here, it will never full so can be send with synchronous
552                    if let Err(e) = sender.send(entry) {
553                        eprintln!("unbound channel Sending Error: {e:?}");
554                    }
555                },
556                Some(move |pack_id: ObjectHash| {
557                    if let Some(pack_id_send) = pack_hash_send
558                        && let Err(e) = pack_id_send.send(pack_id)
559                    {
560                        eprintln!("unbound channel Sending Error: {e:?}");
561                    }
562                }),
563            )
564            .unwrap();
565            self
566        })
567        .await
568        .unwrap()
569    }
570
571    /// CacheObjects + Index size of Caches
572    fn memory_used(&self) -> usize {
573        self.cache_objs_mem_used() + self.caches.memory_used_index()
574    }
575
576    /// The total memory used by the CacheObjects of this Pack
577    fn cache_objs_mem_used(&self) -> usize {
578        self.cache_objs_mem.load(Ordering::Acquire)
579    }
580
581    /// Rebuild the Delta Object in a new thread & process the objects waiting for it recursively.
582    /// <br> This function must be *static*, because [&self] can't be moved into a new thread.
583    fn process_delta(
584        shared_params: Arc<SharedParams>,
585        delta_obj: CacheObject,
586        base_obj: Arc<CacheObject>,
587    ) {
588        shared_params.pool.clone().execute(move || {
589            let mut new_obj = match delta_obj.info {
590                CacheObjectInfo::OffsetDelta(_, _) | CacheObjectInfo::HashDelta(_, _) => {
591                    Pack::rebuild_delta(delta_obj, base_obj)
592                }
593                CacheObjectInfo::OffsetZstdelta(_, _) => {
594                    Pack::rebuild_zstdelta(delta_obj, base_obj)
595                }
596                _ => unreachable!(),
597            };
598
599            new_obj.set_mem_recorder(shared_params.cache_objs_mem_size.clone());
600            new_obj.record_mem_size();
601            Self::cache_obj_and_process_waitlist(shared_params, new_obj); //Indirect Recursion
602        });
603    }
604
605    /// Cache the new object & process the objects waiting for it (in multi-threading).
606    fn cache_obj_and_process_waitlist(shared_params: Arc<SharedParams>, new_obj: CacheObject) {
607        (shared_params.callback)(new_obj.to_entry_metadata());
608        let new_obj = shared_params.caches.insert(
609            new_obj.offset,
610            new_obj.base_object_hash().unwrap(),
611            new_obj,
612        );
613        Self::process_waitlist(shared_params, new_obj);
614    }
615
616    fn process_waitlist(shared_params: Arc<SharedParams>, base_obj: Arc<CacheObject>) {
617        let wait_objs = shared_params
618            .waitlist
619            .take(base_obj.offset, base_obj.base_object_hash().unwrap());
620        for obj in wait_objs {
621            // Process the objects waiting for the new object(base_obj = new_obj)
622            Self::process_delta(shared_params.clone(), obj, base_obj.clone());
623        }
624    }
625
626    /// Reconstruct the Delta Object based on the "base object"
627    /// and return the new object.
628    pub fn rebuild_delta(delta_obj: CacheObject, base_obj: Arc<CacheObject>) -> CacheObject {
629        const COPY_INSTRUCTION_FLAG: u8 = 1 << 7;
630        const COPY_OFFSET_BYTES: u8 = 4;
631        const COPY_SIZE_BYTES: u8 = 3;
632        const COPY_ZERO_SIZE: usize = 0x10000;
633
634        let mut stream = Cursor::new(&delta_obj.data_decompressed);
635
636        // Read the base object size
637        // (Size Encoding)
638        let (base_size, result_size) = utils::read_delta_object_size(&mut stream).unwrap();
639
640        // Get the base object data
641        let base_info = &base_obj.data_decompressed;
642        assert_eq!(base_info.len(), base_size, "Base object size mismatch");
643
644        let mut result = Vec::with_capacity(result_size);
645
646        loop {
647            // Check if the stream has ended, meaning the new object is done
648            let instruction = match utils::read_bytes(&mut stream) {
649                Ok([instruction]) => instruction,
650                Err(err) if err.kind() == ErrorKind::UnexpectedEof => break,
651                Err(err) => {
652                    panic!(
653                        "{}",
654                        GitError::DeltaObjectError(format!("Wrong instruction in delta :{err}"))
655                    );
656                }
657            };
658
659            if instruction & COPY_INSTRUCTION_FLAG == 0 {
660                // Data instruction; the instruction byte specifies the number of data bytes
661                if instruction == 0 {
662                    // Appending 0 bytes doesn't make sense, so git disallows it
663                    panic!(
664                        "{}",
665                        GitError::DeltaObjectError(String::from("Invalid data instruction"))
666                    );
667                }
668
669                // Append the provided bytes
670                let mut data = vec![0; instruction as usize];
671                stream.read_exact(&mut data).unwrap();
672                result.extend_from_slice(&data);
673            } else {
674                // Copy instruction
675                // +----------+---------+---------+---------+---------+-------+-------+-------+
676                // | 1xxxxxxx | offset1 | offset2 | offset3 | offset4 | size1 | size2 | size3 |
677                // +----------+---------+---------+---------+---------+-------+-------+-------+
678                let mut nonzero_bytes = instruction;
679                let offset =
680                    utils::read_partial_int(&mut stream, COPY_OFFSET_BYTES, &mut nonzero_bytes)
681                        .unwrap();
682                let mut size =
683                    utils::read_partial_int(&mut stream, COPY_SIZE_BYTES, &mut nonzero_bytes)
684                        .unwrap();
685                if size == 0 {
686                    // Copying 0 bytes doesn't make sense, so git assumes a different size
687                    size = COPY_ZERO_SIZE;
688                }
689                // Copy bytes from the base object
690                let base_data = base_info.get(offset..(offset + size)).ok_or_else(|| {
691                    GitError::DeltaObjectError("Invalid copy instruction".to_string())
692                });
693
694                match base_data {
695                    Ok(data) => result.extend_from_slice(data),
696                    Err(e) => panic!("{}", e),
697                }
698            }
699        }
700        assert_eq!(result_size, result.len(), "Result size mismatch");
701
702        let hash = utils::calculate_object_hash(base_obj.object_type(), &result);
703        // create new obj from `delta_obj` & `result` instead of modifying `delta_obj` for heap-size recording
704        CacheObject {
705            info: CacheObjectInfo::BaseObject(base_obj.object_type(), hash),
706            offset: delta_obj.offset,
707            data_decompressed: result,
708            mem_recorder: None,
709            is_delta_in_pack: delta_obj.is_delta_in_pack,
710        } // Canonical form (Complete Object)
711        // Memory recording will happen after this function returns. See `process_delta`
712    }
713    pub fn rebuild_zstdelta(delta_obj: CacheObject, base_obj: Arc<CacheObject>) -> CacheObject {
714        let result = zstdelta::apply(&base_obj.data_decompressed, &delta_obj.data_decompressed)
715            .expect("Failed to apply zstdelta");
716        let hash = utils::calculate_object_hash(base_obj.object_type(), &result);
717        CacheObject {
718            info: CacheObjectInfo::BaseObject(base_obj.object_type(), hash),
719            offset: delta_obj.offset,
720            data_decompressed: result,
721            mem_recorder: None,
722            is_delta_in_pack: delta_obj.is_delta_in_pack,
723        } // Canonical form (Complete Object)
724        // Memory recording will happen after this function returns. See `process_delta`
725    }
726}
727
728#[cfg(test)]
729mod tests {
730    use std::fs;
731    use std::io::BufReader;
732    use std::io::Cursor;
733    use std::io::prelude::*;
734    use std::sync::Arc;
735    use std::sync::atomic::{AtomicUsize, Ordering};
736    use std::{env, path::PathBuf};
737
738    use flate2::Compression;
739    use flate2::write::ZlibEncoder;
740    use tokio_util::io::ReaderStream;
741
742    use crate::hash::{HashKind, ObjectHash, set_hash_kind};
743    use crate::internal::pack::Pack;
744    use crate::internal::pack::tests::init_logger;
745    use futures_util::TryStreamExt;
746
747    #[tokio::test]
748    async fn test_pack_check_header() {
749        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
750        source.push("tests/data/packs/git-2d187177923cd618a75da6c6db45bb89d92bd504.pack");
751
752        let f = fs::File::open(source).unwrap();
753        let mut buf_reader = BufReader::new(f);
754        let (object_num, _) = Pack::check_header(&mut buf_reader).unwrap();
755
756        assert_eq!(object_num, 358109);
757    }
758
759    #[test]
760    fn test_decompress_data() {
761        let data = b"Hello, world!"; // Sample data to compress and then decompress
762        let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
763        encoder.write_all(data).unwrap();
764        let compressed_data = encoder.finish().unwrap();
765        let compressed_size = compressed_data.len();
766
767        // Create a cursor for the compressed data to simulate a BufRead source
768        let mut cursor: Cursor<Vec<u8>> = Cursor::new(compressed_data);
769        let expected_size = data.len();
770
771        // Decompress the data and assert correctness
772        let result = Pack::decompress_data(&mut cursor, expected_size);
773        match result {
774            Ok((decompressed_data, bytes_read)) => {
775                assert_eq!(bytes_read, compressed_size);
776                assert_eq!(decompressed_data, data);
777            }
778            Err(e) => panic!("Decompression failed: {e:?}"),
779        }
780    }
781
782    #[test]
783    fn test_pack_decode_without_delta() {
784        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
785        source.push("tests/data/packs/pack-1d0e6c14760c956c173ede71cb28f33d921e232f.pack");
786
787        let tmp = PathBuf::from("/tmp/.cache_temp");
788
789        let f = fs::File::open(source).unwrap();
790        let mut buffered = BufReader::new(f);
791        let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
792        p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
793            .unwrap();
794    }
795    #[test]
796    fn test_pack_decode_without_delta_sha256() {
797        let _guard = set_hash_kind(HashKind::Sha256);
798        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
799        source.push("tests/data/packs/pack-78047853c60a1a3bb587f59598bdeb773fefc821f6f60f4f4797644ad43dad3d.pack");
800
801        let tmp = PathBuf::from("/tmp/.cache_temp");
802
803        let f = fs::File::open(source).unwrap();
804        let mut buffered = BufReader::new(f);
805        let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
806        p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
807            .unwrap();
808    }
809
810    #[test]
811    // #[traced_test]
812    fn test_pack_decode_with_ref_delta() {
813        init_logger();
814
815        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
816        source.push("tests/data/packs/ref-delta-65d47638aa7cb7c39f1bd1d5011a415439b887a8.pack");
817
818        let tmp = PathBuf::from("/tmp/.cache_temp");
819
820        let f = fs::File::open(source).unwrap();
821        let mut buffered = BufReader::new(f);
822        let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
823        p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
824            .unwrap();
825    }
826    #[test]
827    fn test_pack_decode_with_ref_delta_sha256() {
828        let _guard = set_hash_kind(HashKind::Sha256);
829        init_logger();
830        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
831        source.push("tests/data/packs/ref-delta-0e26651d43b149c9baef6035c19cca140f82bb0d0cc5b12fda0ae89ff6a25195.pack");
832
833        let tmp = PathBuf::from("/tmp/.cache_temp");
834
835        let f = fs::File::open(source).unwrap();
836        let mut buffered = BufReader::new(f);
837        let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
838        p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
839            .unwrap();
840    }
841
842    #[test]
843    fn test_pack_decode_no_mem_limit() {
844        let _guard = set_hash_kind(HashKind::Sha1);
845        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
846        source.push("tests/data/packs/pack-1d0e6c14760c956c173ede71cb28f33d921e232f.pack");
847
848        let tmp = PathBuf::from("/tmp/.cache_temp");
849
850        let f = fs::File::open(source).unwrap();
851        let mut buffered = BufReader::new(f);
852        let mut p = Pack::new(None, None, Some(tmp), true);
853        p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
854            .unwrap();
855    }
856    #[test]
857    fn test_pack_decode_no_mem_limit_sha256() {
858        let _guard = set_hash_kind(HashKind::Sha256);
859        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
860        source.push("tests/data/packs/pack-78047853c60a1a3bb587f59598bdeb773fefc821f6f60f4f4797644ad43dad3d.pack");
861
862        let tmp = PathBuf::from("/tmp/.cache_temp");
863
864        let f = fs::File::open(source).unwrap();
865        let mut buffered = BufReader::new(f);
866        let mut p = Pack::new(None, None, Some(tmp), true);
867        p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
868            .unwrap();
869    }
870
871    #[tokio::test]
872    async fn test_pack_decode_with_large_file_with_delta_without_ref() {
873        let _guard = set_hash_kind(HashKind::Sha1);
874        init_logger();
875        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
876        source.push("tests/data/packs/git-2d187177923cd618a75da6c6db45bb89d92bd504.pack");
877
878        let tmp = PathBuf::from("/tmp/.cache_temp");
879
880        let f = fs::File::open(source).unwrap();
881        let mut buffered = BufReader::new(f);
882        let mut p = Pack::new(
883            Some(20),
884            Some(1024 * 1024 * 1024 * 1), //try to avoid dead lock on CI servers with low memory
885            Some(tmp.clone()),
886            true,
887        );
888        let rt = p.decode(
889            &mut buffered,
890            |_obj| {
891                // println!("{:?} {}", obj.hash.to_string(), offset);
892            },
893            None::<fn(ObjectHash)>,
894        );
895        if let Err(e) = rt {
896            fs::remove_dir_all(tmp).unwrap();
897            panic!("Error: {e:?}");
898        }
899    } // it will be stuck on dropping `Pack` on Windows if `mem_size` is None, so we need `mimalloc`
900    #[tokio::test]
901    async fn test_pack_decode_with_large_file_with_delta_without_ref_sha256() {
902        let _guard = set_hash_kind(HashKind::Sha256);
903        init_logger();
904        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
905        source.push("tests/data/packs/git-large-sha256-f6455f09d816f54d115724975da6e7edfb100d746ad145bfd0d2ddc0e0261f5d.pack");
906
907        let tmp = PathBuf::from("/tmp/.cache_temp");
908
909        let f = fs::File::open(source).unwrap();
910        let mut buffered = BufReader::new(f);
911        let mut p = Pack::new(Some(20), Some(1024 * 1024), Some(tmp.clone()), true);
912        let rt = p.decode(
913            &mut buffered,
914            |_obj| {
915                // println!("{:?} {}", obj.hash.to_string(), offset);
916            },
917            None::<fn(ObjectHash)>,
918        );
919        if let Err(e) = rt {
920            fs::remove_dir_all(tmp).unwrap();
921            panic!("Error: {e:?}");
922        }
923    } // it will be stuck on dropping `Pack` on Windows if `mem_size` is None, so we need `mimalloc`
924
925    #[tokio::test]
926    async fn test_decode_large_file_stream() {
927        init_logger();
928        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
929        source.push("tests/data/packs/git-2d187177923cd618a75da6c6db45bb89d92bd504.pack");
930
931        let tmp = PathBuf::from("/tmp/.cache_temp");
932        let f = tokio::fs::File::open(source).await.unwrap();
933        let stream = ReaderStream::new(f).map_err(axum::Error::new);
934        let p = Pack::new(
935            Some(20),
936            Some(1024 * 1024 * 1024 * 1),
937            Some(tmp.clone()),
938            true,
939        );
940
941        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
942        let handle = tokio::spawn(async move { p.decode_stream(stream, tx, None).await });
943        let count = Arc::new(AtomicUsize::new(0));
944        let count_c = count.clone();
945        // in tests, RUNTIME is single-threaded, so `sync code` will block the tokio runtime
946        let consume = tokio::spawn(async move {
947            let mut cnt = 0;
948            while let Some(_entry) = rx.recv().await {
949                cnt += 1;
950            }
951            tracing::info!("Received: {}", cnt);
952            count_c.store(cnt, Ordering::Release);
953        });
954        let p = handle.await.unwrap();
955        consume.await.unwrap();
956        assert_eq!(count.load(Ordering::Acquire), p.number);
957        assert_eq!(p.number, 358109);
958    }
959    #[tokio::test]
960    async fn test_decode_large_file_stream_sha256() {
961        let _guard = set_hash_kind(HashKind::Sha256);
962        init_logger();
963        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
964        source.push("tests/data/packs/git-large-sha256-f6455f09d816f54d115724975da6e7edfb100d746ad145bfd0d2ddc0e0261f5d.pack");
965
966        let tmp = PathBuf::from("/tmp/.cache_temp");
967        let f = tokio::fs::File::open(source).await.unwrap();
968        let stream = ReaderStream::new(f).map_err(axum::Error::new);
969        let p = Pack::new(Some(20), Some(1024 * 1024), Some(tmp.clone()), true);
970
971        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
972        let handle = tokio::spawn(async move { p.decode_stream(stream, tx, None).await });
973        let count = Arc::new(AtomicUsize::new(0));
974        let count_c = count.clone();
975        // in tests, RUNTIME is single-threaded, so `sync code` will block the tokio runtime
976        let consume = tokio::spawn(async move {
977            let mut cnt = 0;
978            while let Some(_entry) = rx.recv().await {
979                cnt += 1;
980            }
981            tracing::info!("Received: {}", cnt);
982            count_c.store(cnt, Ordering::Release);
983        });
984        let p = handle.await.unwrap();
985        consume.await.unwrap();
986        assert_eq!(count.load(Ordering::Acquire), p.number);
987        assert_eq!(p.number, 26);
988    }
989
990    #[tokio::test]
991    async fn test_decode_large_file_async() {
992        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
993        source.push("tests/data/packs/git-2d187177923cd618a75da6c6db45bb89d92bd504.pack");
994
995        let tmp = PathBuf::from("/tmp/.cache_temp");
996        let f = fs::File::open(source).unwrap();
997        let buffered = BufReader::new(f);
998        let p = Pack::new(
999            Some(20),
1000            Some(1024 * 1024 * 1024 * 1),
1001            Some(tmp.clone()),
1002            true,
1003        );
1004
1005        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1006        let handle = p.decode_async(buffered, tx); // new thread
1007        let mut cnt = 0;
1008        while let Some(_entry) = rx.recv().await {
1009            cnt += 1; //use entry here
1010        }
1011        let p = handle.join().unwrap();
1012        assert_eq!(cnt, p.number);
1013    }
1014    #[tokio::test]
1015    async fn test_decode_large_file_async_sha256() {
1016        let _guard = set_hash_kind(HashKind::Sha256);
1017        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1018        source.push("tests/data/packs/git-large-sha256-f6455f09d816f54d115724975da6e7edfb100d746ad145bfd0d2ddc0e0261f5d.pack");
1019
1020        let tmp = PathBuf::from("/tmp/.cache_temp");
1021        let f = fs::File::open(source).unwrap();
1022        let buffered = BufReader::new(f);
1023        let p = Pack::new(Some(20), Some(1024 * 1024), Some(tmp.clone()), true);
1024
1025        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1026        let handle = p.decode_async(buffered, tx); // new thread
1027        let mut cnt = 0;
1028        while let Some(_entry) = rx.recv().await {
1029            cnt += 1; //use entry here
1030        }
1031        let p = handle.join().unwrap();
1032        assert_eq!(cnt, p.number);
1033    }
1034
1035    #[test]
1036    fn test_pack_decode_with_delta_without_ref() {
1037        let _guard = set_hash_kind(HashKind::Sha1);
1038        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1039        source.push("tests/data/packs/pack-d50df695086eea6253a237cb5ac44af1629e7ced.pack");
1040
1041        let tmp = PathBuf::from("/tmp/.cache_temp");
1042
1043        let f = fs::File::open(source).unwrap();
1044        let mut buffered = BufReader::new(f);
1045        let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
1046        print!("pack_id: {:?}", p.signature);
1047        p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
1048            .unwrap();
1049        print!("pack_id: {:?}", p.signature.to_string());
1050    }
1051    #[test]
1052    fn test_pack_decode_with_delta_without_ref_sha256() {
1053        let _guard = set_hash_kind(HashKind::Sha256);
1054        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1055        source.push("tests/data/packs/pack-delta-sha256-3662654057d1adedf50f2a80bfdec752e00ac72798859d850ab1dcfd801beedd.pack");
1056
1057        let tmp = PathBuf::from("/tmp/.cache_temp");
1058
1059        let f = fs::File::open(source).unwrap();
1060        let mut buffered = BufReader::new(f);
1061        let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
1062        print!("pack_id: {:?}", p.signature); //default sha1
1063        p.decode(&mut buffered, |_| {}, None::<fn(ObjectHash)>)
1064            .unwrap();
1065        print!("pack_id: {:?}", p.signature.to_string());
1066    }
1067
1068    #[test] // Take too long time
1069    fn test_pack_decode_multi_task_with_large_file_with_delta_without_ref() {
1070        let _guard = set_hash_kind(HashKind::Sha1);
1071        let task1 = std::thread::spawn(|| {
1072            test_pack_decode_with_large_file_with_delta_without_ref();
1073        });
1074        let task2 = std::thread::spawn(|| {
1075            test_pack_decode_with_large_file_with_delta_without_ref();
1076        });
1077
1078        task1.join().unwrap();
1079        task2.join().unwrap();
1080    }
1081    #[test]
1082    fn test_pack_decode_multi_task_with_large_file_with_delta_without_ref_sha256() {
1083        let _guard = set_hash_kind(HashKind::Sha256);
1084        let task1 = std::thread::spawn(|| {
1085            test_pack_decode_with_large_file_with_delta_without_ref_sha256();
1086        });
1087        let task2 = std::thread::spawn(|| {
1088            test_pack_decode_with_large_file_with_delta_without_ref_sha256();
1089        });
1090
1091        task1.join().unwrap();
1092        task2.join().unwrap();
1093    }
1094}