git_internal/internal/pack/
decode.rs

1use std::io::{self, BufRead, Cursor, ErrorKind, Read};
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::thread::{self, JoinHandle};
6use std::time::Instant;
7
8use axum::Error;
9use bytes::Bytes;
10use flate2::bufread::ZlibDecoder;
11use futures_util::{Stream, StreamExt};
12use threadpool::ThreadPool;
13use tokio::sync::mpsc::UnboundedSender;
14use uuid::Uuid;
15
16use crate::errors::GitError;
17use crate::hash::SHA1;
18use crate::zstdelta;
19use crate::internal::object::types::ObjectType;
20
21use super::cache_object::CacheObjectInfo;
22use crate::internal::pack::cache::_Cache;
23use crate::internal::pack::cache::Caches;
24use crate::internal::pack::cache_object::{CacheObject, MemSizeRecorder};
25use crate::internal::pack::channel_reader::StreamBufReader;
26use crate::internal::pack::entry::Entry;
27use crate::internal::pack::waitlist::Waitlist;
28use crate::internal::pack::wrapper::Wrapper;
29use crate::internal::pack::{DEFAULT_TMP_DIR, Pack, utils};
30use crate::utils::CountingReader;
31
32/// For the convenience of passing parameters
33struct SharedParams {
34    pub pool: Arc<ThreadPool>,
35    pub waitlist: Arc<Waitlist>,
36    pub caches: Arc<Caches>,
37    pub cache_objs_mem_size: Arc<AtomicUsize>,
38    pub callback: Arc<dyn Fn(Entry, usize) + Sync + Send>,
39}
40
41impl Drop for Pack {
42    fn drop(&mut self) {
43        if self.clean_tmp {
44            self.caches.remove_tmp_dir();
45        }
46    }
47}
48
49impl Pack {
50    /// # Parameters
51    /// - `thread_num`: The number of threads to use for decoding and cache, `None` mean use the number of logical CPUs.
52    ///   It can't be zero, or panic <br>
53    /// - `mem_limit`: The maximum size of the memory cache in bytes, or None for unlimited.
54    ///   The 80% of it will be used for [Caches]  <br>
55    ///   ​**Not very accurate, because of memory alignment and other reasons, overuse about 15%** <br>
56    /// - `temp_path`: The path to a directory for temporary files, default is "./.cache_temp" <br>
57    ///   For example, thread_num = 4 will use up to 8 threads (4 for decoding and 4 for cache) <br>
58    /// - `clean_tmp`: whether to remove temp directory when Pack is dropped
59    pub fn new(
60        thread_num: Option<usize>,
61        mem_limit: Option<usize>,
62        temp_path: Option<PathBuf>,
63        clean_tmp: bool,
64    ) -> Self {
65        let mut temp_path = temp_path.unwrap_or(PathBuf::from(DEFAULT_TMP_DIR));
66        // add 8 random characters as subdirectory, check if the directory exists
67        loop {
68            let sub_dir = Uuid::new_v4().to_string()[..8].to_string();
69            temp_path.push(sub_dir);
70            if !temp_path.exists() {
71                break;
72            }
73            temp_path.pop();
74        }
75        let thread_num = thread_num.unwrap_or_else(num_cpus::get);
76        let cache_mem_size = mem_limit.map(|mem_limit| mem_limit * 4 / 5);
77        Pack {
78            number: 0,
79            signature: SHA1::default(),
80            objects: Vec::new(),
81            pool: Arc::new(ThreadPool::new(thread_num)),
82            waitlist: Arc::new(Waitlist::new()),
83            caches: Arc::new(Caches::new(cache_mem_size, temp_path, thread_num)),
84            mem_limit,
85            cache_objs_mem: Arc::new(AtomicUsize::default()),
86            clean_tmp,
87        }
88    }
89
90    /// Checks and reads the header of a Git pack file.
91    ///
92    /// This function reads the first 12 bytes of a pack file, which include the b"PACK" magic identifier,
93    /// the version number, and the number of objects in the pack. It verifies that the magic identifier
94    /// is correct and that the version number is 2 (which is the version currently supported by Git).
95    /// It also collects these header bytes for later use, such as for hashing the entire pack file.
96    ///
97    /// # Parameters
98    /// * `pack` - A mutable reference to an object implementing the `Read` trait,
99    ///   representing the source of the pack file data (e.g., file, memory stream).
100    ///
101    /// # Returns
102    /// A `Result` which is:
103    /// * `Ok((u32, Vec<u8>))`: On successful reading and validation of the header, returns a tuple where:
104    ///     - The first element is the number of objects in the pack file (`u32`).
105    ///     - The second element is a vector containing the bytes of the pack file header (`Vec<u8>`).
106    /// * `Err(GitError)`: On failure, returns a [`GitError`] with a description of the issue.
107    ///
108    /// # Errors
109    /// This function can return an error in the following situations:
110    /// * If the pack file does not start with the "PACK" magic identifier.
111    /// * If the pack file's version number is not 2.
112    /// * If there are any issues reading from the provided `pack` source.
113    pub fn check_header(pack: &mut impl BufRead) -> Result<(u32, Vec<u8>), GitError> {
114        // A vector to store the header data for hashing later
115        let mut header_data = Vec::new();
116
117        // Read the first 4 bytes which should be "PACK"
118        let mut magic = [0; 4];
119        // Read the magic "PACK" identifier
120        let result = pack.read_exact(&mut magic);
121        match result {
122            Ok(_) => {
123                // Store these bytes for later
124                header_data.extend_from_slice(&magic);
125
126                // Check if the magic bytes match "PACK"
127                if magic != *b"PACK" {
128                    // If not, return an error indicating invalid pack header
129                    return Err(GitError::InvalidPackHeader(format!(
130                        "{},{},{},{}",
131                        magic[0], magic[1], magic[2], magic[3]
132                    )));
133                }
134            }
135            Err(e) => {
136                // If there is an error in reading, return a GitError
137                return Err(GitError::InvalidPackFile(format!(
138                    "Error reading magic identifier: {e}"
139                )));
140            }
141        }
142
143        // Read the next 4 bytes for the version number
144        let mut version_bytes = [0; 4];
145        let result = pack.read_exact(&mut version_bytes); // Read the version number
146        match result {
147            Ok(_) => {
148                // Store these bytes
149                header_data.extend_from_slice(&version_bytes);
150
151                // Convert the version bytes to an u32 integer
152                let version = u32::from_be_bytes(version_bytes);
153                if version != 2 {
154                    // Git currently supports version 2, so error if not version 2
155                    return Err(GitError::InvalidPackFile(format!(
156                        "Version Number is {version}, not 2"
157                    )));
158                }
159            }
160            Err(e) => {
161                // If there is an error in reading, return a GitError
162                return Err(GitError::InvalidPackFile(format!(
163                    "Error reading version number: {e}"
164                )));
165            }
166        }
167
168        // Read the next 4 bytes for the number of objects in the pack
169        let mut object_num_bytes = [0; 4];
170        // Read the number of objects
171        let result = pack.read_exact(&mut object_num_bytes);
172        match result {
173            Ok(_) => {
174                // Store these bytes
175                header_data.extend_from_slice(&object_num_bytes);
176                // Convert the object number bytes to an u32 integer
177                let object_num = u32::from_be_bytes(object_num_bytes);
178                // Return the number of objects and the header data for further processing
179                Ok((object_num, header_data))
180            }
181            Err(e) => {
182                // If there is an error in reading, return a GitError
183                Err(GitError::InvalidPackFile(format!(
184                    "Error reading object number: {e}"
185                )))
186            }
187        }
188    }
189
190    /// Decompresses data from a given Read and BufRead source using Zlib decompression.
191    ///
192    /// # Parameters
193    /// * `pack`: A source that implements both Read and BufRead traits (e.g., file, network stream).
194    /// * `expected_size`: The expected decompressed size of the data.
195    ///
196    /// # Returns
197    /// Returns a `Result` containing either:
198    /// * A tuple with a `Vec<u8>` of the decompressed data and the total number of input bytes processed,
199    /// * Or a `GitError` in case of a mismatch in expected size or any other reading error.
200    ///
201    pub fn decompress_data(
202        pack: &mut (impl BufRead + Send),
203        expected_size: usize,
204    ) -> Result<(Vec<u8>, usize), GitError> {
205        // Create a buffer with the expected size for the decompressed data
206        let mut buf = Vec::with_capacity(expected_size);
207
208        let mut counting_reader = CountingReader::new(pack);
209        // Create a new Zlib decoder with the original data
210        //let mut deflate = ZlibDecoder::new(pack);
211        let mut deflate = ZlibDecoder::new(&mut counting_reader);
212        // Attempt to read data to the end of the buffer
213        match deflate.read_to_end(&mut buf) {
214            Ok(_) => {
215                // Check if the length of the buffer matches the expected size
216                if buf.len() != expected_size {
217                    Err(GitError::InvalidPackFile(format!(
218                        "The object size {} does not match the expected size {}",
219                        buf.len(),
220                        expected_size
221                    )))
222                } else {
223                    // If everything is as expected, return the buffer, the original data, and the total number of input bytes processed
224                    let actual_input_bytes = counting_reader.bytes_read as usize;
225                    Ok((buf, actual_input_bytes))
226                }
227            }
228            Err(e) => {
229                // If there is an error in reading, return a GitError
230                Err(GitError::InvalidPackFile(format!(
231                    "Decompression error: {e}"
232                )))
233            }
234        }
235    }
236
237    /// Decodes a pack object from a given Read and BufRead source and returns the object as a [`CacheObject`].
238    ///
239    /// # Parameters
240    /// * `pack`: A source that implements both Read and BufRead traits.
241    /// * `offset`: A mutable reference to the current offset within the pack.
242    ///
243    /// # Returns
244    /// Returns a `Result` containing either:
245    /// * A tuple of the next offset in the pack and the original compressed data as `Vec<u8>`,
246    /// * Or a `GitError` in case of any reading or decompression error.
247    ///
248    pub fn decode_pack_object(
249        pack: &mut (impl BufRead + Send),
250        offset: &mut usize,
251    ) -> Result<CacheObject, GitError> {
252        let init_offset = *offset;
253
254        // Attempt to read the type and size, handle potential errors
255        let (type_bits, size) = match utils::read_type_and_varint_size(pack, offset) {
256            Ok(result) => result,
257            Err(e) => {
258                // Handle the error e.g., by logging it or converting it to GitError
259                // and then return from the function
260                return Err(GitError::InvalidPackFile(format!("Read error: {e}")));
261            }
262        };
263
264        // Check if the object type is valid
265        let t = ObjectType::from_u8(type_bits)?;
266
267        match t {
268            ObjectType::Commit | ObjectType::Tree | ObjectType::Blob | ObjectType::Tag => {
269                let (data, raw_size) = Pack::decompress_data(pack, size)?;
270                *offset += raw_size;
271                Ok(CacheObject::new_for_undeltified(t, data, init_offset))
272            }
273            ObjectType::OffsetDelta | ObjectType::OffsetZstdelta => {
274                let (delta_offset, bytes) = utils::read_offset_encoding(pack).unwrap();
275                *offset += bytes;
276
277                let (data, raw_size) = Pack::decompress_data(pack, size)?;
278                *offset += raw_size;
279
280                // Count the base object offset: the current offset - delta offset
281                let base_offset = init_offset
282                    .checked_sub(delta_offset as usize)
283                    .ok_or_else(|| {
284                        GitError::InvalidObjectInfo("Invalid OffsetDelta offset".to_string())
285                    })
286                    .unwrap();
287
288                let mut reader = Cursor::new(&data);
289                let (_, final_size) = utils::read_delta_object_size(&mut reader)?;
290
291                let obj_info = match t {
292                    ObjectType::OffsetDelta => {
293                        CacheObjectInfo::OffsetDelta(base_offset, final_size)
294                    }
295                    ObjectType::OffsetZstdelta => {
296                        CacheObjectInfo::OffsetZstdelta(base_offset, final_size)
297                    }
298                    _ => unreachable!(),
299                };
300                Ok(CacheObject {
301                    info: obj_info,
302                    offset: init_offset,
303                    data_decompressed: data,
304                    mem_recorder: None,
305                })
306            }
307            ObjectType::HashDelta => {
308                // Read 20 bytes to get the reference object SHA1 hash
309                let ref_sha1 = SHA1::from_stream(pack).unwrap();
310                // Offset is incremented by 20 bytes
311                *offset += SHA1::SIZE;
312
313                let (data, raw_size) = Pack::decompress_data(pack, size)?;
314                *offset += raw_size;
315
316                let mut reader = Cursor::new(&data);
317                let (_, final_size) = utils::read_delta_object_size(&mut reader)?;
318
319                Ok(CacheObject {
320                    info: CacheObjectInfo::HashDelta(ref_sha1, final_size),
321                    offset: init_offset,
322                    data_decompressed: data,
323                    mem_recorder: None,
324                })
325            }
326        }
327    }
328
329    /// Decodes a pack file from a given Read and BufRead source, for each object in the pack,
330    /// it decodes the object and processes it using the provided callback function.
331    pub fn decode<F>(
332        &mut self,
333        pack: &mut (impl BufRead + Send),
334        callback: F,
335    ) -> Result<(), GitError>
336    where
337        F: Fn(Entry, usize) + Sync + Send + 'static,
338    {
339        let time = Instant::now();
340        let mut last_update_time = time.elapsed().as_millis();
341        let log_info = |_i: usize, pack: &Pack| {
342            tracing::info!(
343                "time {:.2} s \t decode: {:?} \t dec-num: {} \t cah-num: {} \t Objs: {} MB \t CacheUsed: {} MB",
344                time.elapsed().as_millis() as f64 / 1000.0,
345                _i,
346                pack.pool.queued_count(),
347                pack.caches.queued_tasks(),
348                pack.cache_objs_mem_used() / 1024 / 1024,
349                pack.caches.memory_used() / 1024 / 1024
350            );
351        };
352        let callback = Arc::new(callback);
353
354        let caches = self.caches.clone();
355        let mut reader = Wrapper::new(io::BufReader::new(pack));
356
357        let result = Pack::check_header(&mut reader);
358        match result {
359            Ok((object_num, _)) => {
360                self.number = object_num as usize;
361            }
362            Err(e) => {
363                return Err(e);
364            }
365        }
366        tracing::info!("The pack file has {} objects", self.number);
367        let mut offset: usize = 12;
368        let mut i = 0;
369        while i < self.number {
370            // log per 1000 objects and 1 second
371            if i % 1000 == 0 {
372                let time_now = time.elapsed().as_millis();
373                if time_now - last_update_time > 1000 {
374                    log_info(i, self);
375                    last_update_time = time_now;
376                }
377            }
378            // 3 parts: Waitlist + TheadPool + Caches
379            // hardcode the limit of the tasks of threads_pool queue, to limit memory
380            while self.pool.queued_count() > 2000
381                || self
382                    .mem_limit
383                    .map(|limit| self.memory_used() > limit)
384                    .unwrap_or(false)
385            {
386                thread::yield_now();
387            }
388            let r: Result<CacheObject, GitError> =
389                Pack::decode_pack_object(&mut reader, &mut offset);
390            match r {
391                Ok(mut obj) => {
392                    obj.set_mem_recorder(self.cache_objs_mem.clone());
393                    obj.record_mem_size();
394
395                    // Wrapper of Arc Params, for convenience to pass
396                    let params = Arc::new(SharedParams {
397                        pool: self.pool.clone(),
398                        waitlist: self.waitlist.clone(),
399                        caches: self.caches.clone(),
400                        cache_objs_mem_size: self.cache_objs_mem.clone(),
401                        callback: callback.clone(),
402                    });
403
404                    let caches = caches.clone();
405                    let waitlist = self.waitlist.clone();
406                    self.pool.execute(move || {
407                        match obj.info {
408                            CacheObjectInfo::BaseObject(_, _) => {
409                                Self::cache_obj_and_process_waitlist(params, obj);
410                            }
411                            CacheObjectInfo::OffsetDelta(base_offset, _)
412                            | CacheObjectInfo::OffsetZstdelta(base_offset, _) => {
413                                if let Some(base_obj) = caches.get_by_offset(base_offset) {
414                                    Self::process_delta(params, obj, base_obj);
415                                } else {
416                                    // You can delete this 'if' block ↑, because there are Second check in 'else'
417                                    // It will be more readable, but the performance will be slightly reduced
418                                    waitlist.insert_offset(base_offset, obj);
419                                    // Second check: prevent that the base_obj thread has finished before the waitlist insert
420                                    if let Some(base_obj) = caches.get_by_offset(base_offset) {
421                                        Self::process_waitlist(params, base_obj);
422                                    }
423                                }
424                            }
425                            CacheObjectInfo::HashDelta(base_ref, _) => {
426                                if let Some(base_obj) = caches.get_by_hash(base_ref) {
427                                    Self::process_delta(params, obj, base_obj);
428                                } else {
429                                    waitlist.insert_ref(base_ref, obj);
430                                    if let Some(base_obj) = caches.get_by_hash(base_ref) {
431                                        Self::process_waitlist(params, base_obj);
432                                    }
433                                }
434                            }
435                        }
436                    });
437                }
438                Err(e) => {
439                    return Err(e);
440                }
441            }
442            i += 1;
443        }
444        log_info(i, self);
445        let render_hash = reader.final_hash();
446        self.signature = SHA1::from_stream(&mut reader).unwrap();
447
448        if render_hash != self.signature {
449            return Err(GitError::InvalidPackFile(format!(
450                "The pack file hash {} does not match the trailer hash {}",
451                render_hash, self.signature
452            )));
453        }
454
455        let end = utils::is_eof(&mut reader);
456        if !end {
457            return Err(GitError::InvalidPackFile(
458                "The pack file is not at the end".to_string(),
459            ));
460        }
461
462        self.pool.join(); // wait for all threads to finish
463        // !Attention: Caches threadpool may not stop, but it's not a problem (garbage file data)
464        // So that files != self.number
465        assert_eq!(self.waitlist.map_offset.len(), 0);
466        assert_eq!(self.waitlist.map_ref.len(), 0);
467        assert_eq!(self.number, caches.total_inserted());
468        tracing::info!(
469            "The pack file has been decoded successfully, takes: [ {:?} ]",
470            time.elapsed()
471        );
472        self.caches.clear(); // clear cached objects & stop threads
473        assert_eq!(self.cache_objs_mem_used(), 0); // all the objs should be dropped until here
474
475        // impl in Drop Trait
476        // if self.clean_tmp {
477        //     self.caches.remove_tmp_dir();
478        // }
479
480        Ok(())
481    }
482
483    /// Decode a Pack in a new thread and send the CacheObjects while decoding.
484    /// <br> Attention: It will consume the `pack` and return in a JoinHandle.
485    pub fn decode_async(
486        mut self,
487        mut pack: impl BufRead + Send + 'static,
488        sender: UnboundedSender<Entry>,
489    ) -> JoinHandle<Pack> {
490        thread::spawn(move || {
491            self.decode(&mut pack, move |entry, _| {
492                if let Err(e) = sender.send(entry) {
493                    eprintln!("Channel full, failed to send entry: {e:?}");
494                }
495            })
496            .unwrap();
497            self
498        })
499    }
500
501    /// Decodes a `Pack` from a `Stream` of `Bytes`, and sends the `Entry` while decoding.
502    pub async fn decode_stream(
503        mut self,
504        mut stream: impl Stream<Item = Result<Bytes, Error>> + Unpin + Send + 'static,
505        sender: UnboundedSender<Entry>,
506    ) -> Self {
507        let (tx, rx) = std::sync::mpsc::channel();
508        let mut reader = StreamBufReader::new(rx);
509        tokio::spawn(async move {
510            while let Some(chunk) = stream.next().await {
511                let data = chunk.unwrap().to_vec();
512                if let Err(e) = tx.send(data) {
513                    eprintln!("Sending Error: {e:?}");
514                    break;
515                }
516            }
517        });
518        // CPU-bound task, so use spawn_blocking
519        // DO NOT use thread::spawn, because it will block tokio runtime (if single-threaded runtime, like in tests)
520        tokio::task::spawn_blocking(move || {
521            self.decode(&mut reader, move |entry: Entry, _| {
522                // as we used unbound channel here, it will never full so can be send with synchronous
523                if let Err(e) = sender.send(entry) {
524                    eprintln!("unbound channel Sending Error: {e:?}");
525                }
526            })
527            .unwrap();
528            self
529        })
530        .await
531        .unwrap()
532    }
533
534    /// CacheObjects + Index size of Caches
535    fn memory_used(&self) -> usize {
536        self.cache_objs_mem_used() + self.caches.memory_used_index()
537    }
538
539    /// The total memory used by the CacheObjects of this Pack
540    fn cache_objs_mem_used(&self) -> usize {
541        self.cache_objs_mem.load(Ordering::Acquire)
542    }
543
544    /// Rebuild the Delta Object in a new thread & process the objects waiting for it recursively.
545    /// <br> This function must be *static*, because [&self] can't be moved into a new thread.
546    fn process_delta(
547        shared_params: Arc<SharedParams>,
548        delta_obj: CacheObject,
549        base_obj: Arc<CacheObject>,
550    ) {
551        shared_params.pool.clone().execute(move || {
552            let mut new_obj = match delta_obj.info {
553                CacheObjectInfo::OffsetDelta(_, _) | CacheObjectInfo::HashDelta(_, _) => {
554                    Pack::rebuild_delta(delta_obj, base_obj)
555                }
556                CacheObjectInfo::OffsetZstdelta(_, _) => {
557                    Pack::rebuild_zstdelta(delta_obj, base_obj)
558                }
559                _ => unreachable!(),
560            };
561
562            new_obj.set_mem_recorder(shared_params.cache_objs_mem_size.clone());
563            new_obj.record_mem_size();
564            Self::cache_obj_and_process_waitlist(shared_params, new_obj); //Indirect Recursion
565        });
566    }
567
568    /// Cache the new object & process the objects waiting for it (in multi-threading).
569    fn cache_obj_and_process_waitlist(shared_params: Arc<SharedParams>, new_obj: CacheObject) {
570        (shared_params.callback)(new_obj.to_entry(), new_obj.offset);
571        let new_obj = shared_params.caches.insert(
572            new_obj.offset,
573            new_obj.base_object_hash().unwrap(),
574            new_obj,
575        );
576        Self::process_waitlist(shared_params, new_obj);
577    }
578
579    fn process_waitlist(shared_params: Arc<SharedParams>, base_obj: Arc<CacheObject>) {
580        let wait_objs = shared_params
581            .waitlist
582            .take(base_obj.offset, base_obj.base_object_hash().unwrap());
583        for obj in wait_objs {
584            // Process the objects waiting for the new object(base_obj = new_obj)
585            Self::process_delta(shared_params.clone(), obj, base_obj.clone());
586        }
587    }
588
589    /// Reconstruct the Delta Object based on the "base object"
590    /// and return the new object.
591    pub fn rebuild_delta(delta_obj: CacheObject, base_obj: Arc<CacheObject>) -> CacheObject {
592        const COPY_INSTRUCTION_FLAG: u8 = 1 << 7;
593        const COPY_OFFSET_BYTES: u8 = 4;
594        const COPY_SIZE_BYTES: u8 = 3;
595        const COPY_ZERO_SIZE: usize = 0x10000;
596
597        let mut stream = Cursor::new(&delta_obj.data_decompressed);
598
599        // Read the base object size
600        // (Size Encoding)
601        let (base_size, result_size) = utils::read_delta_object_size(&mut stream).unwrap();
602
603        // Get the base object data
604        let base_info = &base_obj.data_decompressed;
605        assert_eq!(base_info.len(), base_size, "Base object size mismatch");
606
607        let mut result = Vec::with_capacity(result_size);
608
609        loop {
610            // Check if the stream has ended, meaning the new object is done
611            let instruction = match utils::read_bytes(&mut stream) {
612                Ok([instruction]) => instruction,
613                Err(err) if err.kind() == ErrorKind::UnexpectedEof => break,
614                Err(err) => {
615                    panic!(
616                        "{}",
617                        GitError::DeltaObjectError(format!("Wrong instruction in delta :{err}"))
618                    );
619                }
620            };
621
622            if instruction & COPY_INSTRUCTION_FLAG == 0 {
623                // Data instruction; the instruction byte specifies the number of data bytes
624                if instruction == 0 {
625                    // Appending 0 bytes doesn't make sense, so git disallows it
626                    panic!(
627                        "{}",
628                        GitError::DeltaObjectError(String::from("Invalid data instruction"))
629                    );
630                }
631
632                // Append the provided bytes
633                let mut data = vec![0; instruction as usize];
634                stream.read_exact(&mut data).unwrap();
635                result.extend_from_slice(&data);
636            } else {
637                // Copy instruction
638                // +----------+---------+---------+---------+---------+-------+-------+-------+
639                // | 1xxxxxxx | offset1 | offset2 | offset3 | offset4 | size1 | size2 | size3 |
640                // +----------+---------+---------+---------+---------+-------+-------+-------+
641                let mut nonzero_bytes = instruction;
642                let offset =
643                    utils::read_partial_int(&mut stream, COPY_OFFSET_BYTES, &mut nonzero_bytes)
644                        .unwrap();
645                let mut size =
646                    utils::read_partial_int(&mut stream, COPY_SIZE_BYTES, &mut nonzero_bytes)
647                        .unwrap();
648                if size == 0 {
649                    // Copying 0 bytes doesn't make sense, so git assumes a different size
650                    size = COPY_ZERO_SIZE;
651                }
652                // Copy bytes from the base object
653                let base_data = base_info.get(offset..(offset + size)).ok_or_else(|| {
654                    GitError::DeltaObjectError("Invalid copy instruction".to_string())
655                });
656
657                match base_data {
658                    Ok(data) => result.extend_from_slice(data),
659                    Err(e) => panic!("{}", e),
660                }
661            }
662        }
663        assert_eq!(result_size, result.len(), "Result size mismatch");
664
665        let hash = utils::calculate_object_hash(base_obj.object_type(), &result);
666        // create new obj from `delta_obj` & `result` instead of modifying `delta_obj` for heap-size recording
667        CacheObject {
668            info: CacheObjectInfo::BaseObject(base_obj.object_type(), hash),
669            offset: delta_obj.offset,
670            data_decompressed: result,
671            mem_recorder: None,
672        } // Canonical form (Complete Object)
673        // Memory recording will happen after this function returns. See `process_delta`
674    }
675    pub fn rebuild_zstdelta(delta_obj: CacheObject, base_obj: Arc<CacheObject>) -> CacheObject {
676        let result = zstdelta::apply(&base_obj.data_decompressed, &delta_obj.data_decompressed)
677            .expect("Failed to apply zstdelta");
678        let hash = utils::calculate_object_hash(base_obj.object_type(), &result);
679        CacheObject {
680            info: CacheObjectInfo::BaseObject(base_obj.object_type(), hash),
681            offset: delta_obj.offset,
682            data_decompressed: result,
683            mem_recorder: None,
684        } // Canonical form (Complete Object)
685        // Memory recording will happen after this function returns. See `process_delta`
686    }
687}
688
689#[cfg(test)]
690mod tests {
691    use std::fs;
692    use std::io::BufReader;
693    use std::io::Cursor;
694    use std::io::prelude::*;
695    use std::sync::Arc;
696    use std::sync::atomic::{AtomicUsize, Ordering};
697    use std::{env, path::PathBuf};
698
699    use flate2::Compression;
700    use flate2::write::ZlibEncoder;
701    use tokio_util::io::ReaderStream;
702
703    use crate::internal::pack::Pack;
704    use crate::internal::pack::tests::init_logger;
705    use futures_util::TryStreamExt;
706
707    #[tokio::test]
708    async fn test_pack_check_header() {
709        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
710        source.push("tests/data/packs/git-2d187177923cd618a75da6c6db45bb89d92bd504.pack");
711
712        let f = fs::File::open(source).unwrap();
713        let mut buf_reader = BufReader::new(f);
714        let (object_num, _) = Pack::check_header(&mut buf_reader).unwrap();
715
716        assert_eq!(object_num, 358109);
717    }
718
719    #[test]
720    fn test_decompress_data() {
721        let data = b"Hello, world!"; // Sample data to compress and then decompress
722        let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
723        encoder.write_all(data).unwrap();
724        let compressed_data = encoder.finish().unwrap();
725        let compressed_size = compressed_data.len();
726
727        // Create a cursor for the compressed data to simulate a BufRead source
728        let mut cursor: Cursor<Vec<u8>> = Cursor::new(compressed_data);
729        let expected_size = data.len();
730
731        // Decompress the data and assert correctness
732        let result = Pack::decompress_data(&mut cursor, expected_size);
733        match result {
734            Ok((decompressed_data, bytes_read)) => {
735                assert_eq!(bytes_read, compressed_size);
736                assert_eq!(decompressed_data, data);
737            }
738            Err(e) => panic!("Decompression failed: {e:?}"),
739        }
740    }
741
742    #[test]
743    fn test_pack_decode_without_delta() {
744        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
745        source.push("tests/data/packs/pack-1d0e6c14760c956c173ede71cb28f33d921e232f.pack");
746
747        let tmp = PathBuf::from("/tmp/.cache_temp");
748
749        let f = fs::File::open(source).unwrap();
750        let mut buffered = BufReader::new(f);
751        let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
752        p.decode(&mut buffered, |_, _| {}).unwrap();
753    }
754
755    #[test]
756    // #[traced_test]
757    fn test_pack_decode_with_ref_delta() {
758        init_logger();
759
760        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
761        source.push("tests/data/packs/ref-delta-65d47638aa7cb7c39f1bd1d5011a415439b887a8.pack");
762
763        let tmp = PathBuf::from("/tmp/.cache_temp");
764
765        let f = fs::File::open(source).unwrap();
766        let mut buffered = BufReader::new(f);
767        let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
768        p.decode(&mut buffered, |_, _| {}).unwrap();
769    }
770
771    #[test]
772    fn test_pack_decode_no_mem_limit() {
773        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
774        source.push("tests/data/packs/pack-1d0e6c14760c956c173ede71cb28f33d921e232f.pack");
775
776        let tmp = PathBuf::from("/tmp/.cache_temp");
777
778        let f = fs::File::open(source).unwrap();
779        let mut buffered = BufReader::new(f);
780        let mut p = Pack::new(None, None, Some(tmp), true);
781        p.decode(&mut buffered, |_, _| {}).unwrap();
782    }
783
784    #[tokio::test]
785    async fn test_pack_decode_with_large_file_with_delta_without_ref() {
786        init_logger();
787        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
788        source.push("tests/data/packs/git-2d187177923cd618a75da6c6db45bb89d92bd504.pack");
789
790        let tmp = PathBuf::from("/tmp/.cache_temp");
791
792        let f = fs::File::open(source).unwrap();
793        let mut buffered = BufReader::new(f);
794        let mut p = Pack::new(
795            Some(20),
796            Some(1024 * 1024 * 1024 * 2),
797            Some(tmp.clone()),
798            true,
799        );
800        let rt = p.decode(&mut buffered, |_obj, _offset| {
801            // println!("{:?} {}", obj.hash.to_string(), offset);
802        });
803        if let Err(e) = rt {
804            fs::remove_dir_all(tmp).unwrap();
805            panic!("Error: {e:?}");
806        }
807    } // it will be stuck on dropping `Pack` on Windows if `mem_size` is None, so we need `mimalloc`
808
809    #[tokio::test]
810    async fn test_decode_large_file_stream() {
811        init_logger();
812        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
813        source.push("tests/data/packs/git-2d187177923cd618a75da6c6db45bb89d92bd504.pack");
814
815        let tmp = PathBuf::from("/tmp/.cache_temp");
816        let f = tokio::fs::File::open(source).await.unwrap();
817        let stream = ReaderStream::new(f).map_err(axum::Error::new);
818        let p = Pack::new(
819            Some(20),
820            Some(1024 * 1024 * 1024 * 4),
821            Some(tmp.clone()),
822            true,
823        );
824
825        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
826        let handle = tokio::spawn(async move { p.decode_stream(stream, tx).await });
827        let count = Arc::new(AtomicUsize::new(0));
828        let count_c = count.clone();
829        // in tests, RUNTIME is single-threaded, so `sync code` will block the tokio runtime
830        let consume = tokio::spawn(async move {
831            let mut cnt = 0;
832            while let Some(_entry) = rx.recv().await {
833                cnt += 1;
834            }
835            tracing::info!("Received: {}", cnt);
836            count_c.store(cnt, Ordering::Release);
837        });
838        let p = handle.await.unwrap();
839        consume.await.unwrap();
840        assert_eq!(count.load(Ordering::Acquire), p.number);
841        assert_eq!(p.number, 358109);
842    }
843
844    #[tokio::test]
845    async fn test_decode_large_file_async() {
846        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
847        source.push("tests/data/packs/git-2d187177923cd618a75da6c6db45bb89d92bd504.pack");
848
849        let tmp = PathBuf::from("/tmp/.cache_temp");
850        let f = fs::File::open(source).unwrap();
851        let buffered = BufReader::new(f);
852        let p = Pack::new(
853            Some(20),
854            Some(1024 * 1024 * 1024 * 2),
855            Some(tmp.clone()),
856            true,
857        );
858
859        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
860        let handle = p.decode_async(buffered, tx); // new thread
861        let mut cnt = 0;
862        while let Some(_entry) = rx.recv().await {
863            cnt += 1; //use entry here
864        }
865        let p = handle.join().unwrap();
866        assert_eq!(cnt, p.number);
867    }
868
869    #[test]
870    fn test_pack_decode_with_delta_without_ref() {
871        let mut source = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
872        source.push("tests/data/packs/pack-d50df695086eea6253a237cb5ac44af1629e7ced.pack");
873
874        let tmp = PathBuf::from("/tmp/.cache_temp");
875
876        let f = fs::File::open(source).unwrap();
877        let mut buffered = BufReader::new(f);
878        let mut p = Pack::new(None, Some(1024 * 1024 * 20), Some(tmp), true);
879        p.decode(&mut buffered, |_, _| {}).unwrap();
880    }
881
882    #[test]// Take too long time
883    fn test_pack_decode_multi_task_with_large_file_with_delta_without_ref() {
884        let task1 = std::thread::spawn(|| {
885            test_pack_decode_with_large_file_with_delta_without_ref();
886        });
887        let task2 = std::thread::spawn(|| {
888            test_pack_decode_with_large_file_with_delta_without_ref();
889        });
890
891        task1.join().unwrap();
892        task2.join().unwrap();
893    }
894}