git_pack/index/traverse/
mod.rs

1use std::sync::atomic::AtomicBool;
2
3use git_features::{parallel, progress::Progress};
4
5use crate::index;
6
7mod reduce;
8///
9pub mod with_index;
10///
11pub mod with_lookup;
12use reduce::Reducer;
13
14mod error;
15pub use error::Error;
16
17mod types;
18pub use types::{Algorithm, ProgressId, SafetyCheck, Statistics};
19
20/// Traversal options for [`index::File::traverse()`].
21#[derive(Debug, Clone)]
22pub struct Options<F> {
23    /// The algorithm to employ.
24    pub traversal: Algorithm,
25    /// If `Some`, only use the given amount of threads. Otherwise, the amount of threads to use will be selected based on
26    /// the amount of available logical cores.
27    pub thread_limit: Option<usize>,
28    /// The kinds of safety checks to perform.
29    pub check: SafetyCheck,
30    /// A function to create a pack cache
31    pub make_pack_lookup_cache: F,
32}
33
34impl Default for Options<fn() -> crate::cache::Never> {
35    fn default() -> Self {
36        Options {
37            check: Default::default(),
38            traversal: Default::default(),
39            thread_limit: None,
40            make_pack_lookup_cache: || crate::cache::Never,
41        }
42    }
43}
44
45/// The outcome of the [`traverse()`][index::File::traverse()] method.
46pub struct Outcome<P> {
47    /// The checksum obtained when hashing the file, which matched the checksum contained within the file.
48    pub actual_index_checksum: git_hash::ObjectId,
49    /// The statistics obtained during traversal.
50    pub statistics: Statistics,
51    /// The input progress to allow reuse.
52    pub progress: P,
53}
54
55/// Traversal of pack data files using an index file
56impl index::File {
57    /// Iterate through all _decoded objects_ in the given `pack` and handle them with a `Processor`.
58    /// The return value is (pack-checksum, [`Outcome`], `progress`), thus the pack traversal will always verify
59    /// the whole packs checksum to assure it was correct. In case of bit-rod, the operation will abort early without
60    /// verifying all objects using the [interrupt mechanism][git_features::interrupt] mechanism.
61    ///
62    /// # Algorithms
63    ///
64    /// Using the [`Options::traversal`] field one can chose between two algorithms providing different tradeoffs. Both invoke
65    /// `new_processor()` to create functions receiving decoded objects, their object kind, index entry and a progress instance to provide
66    /// progress information.
67    ///
68    /// * [`Algorithm::DeltaTreeLookup`] builds an index to avoid any unnecessary computation while resolving objects, avoiding
69    ///   the need for a cache entirely, rendering `new_cache()` unused.
70    ///   One could also call [`traverse_with_index()`][index::File::traverse_with_index()] directly.
71    /// * [`Algorithm::Lookup`] uses a cache created by `new_cache()` to avoid having to re-compute all bases of a delta-chain while
72    ///   decoding objects.
73    ///   One could also call [`traverse_with_lookup()`][index::File::traverse_with_lookup()] directly.
74    ///
75    /// Use [`thread_limit`][Options::thread_limit] to further control parallelism and [`check`][SafetyCheck] to define how much the passed
76    /// objects shall be verified beforehand.
77    pub fn traverse<P, C, Processor, E, F>(
78        &self,
79        pack: &crate::data::File,
80        progress: P,
81        should_interrupt: &AtomicBool,
82        new_processor: impl Fn() -> Processor + Send + Clone,
83        Options {
84            traversal,
85            thread_limit,
86            check,
87            make_pack_lookup_cache,
88        }: Options<F>,
89    ) -> Result<Outcome<P>, Error<E>>
90    where
91        P: Progress,
92        C: crate::cache::DecodeEntry,
93        E: std::error::Error + Send + Sync + 'static,
94        Processor: FnMut(
95            git_object::Kind,
96            &[u8],
97            &index::Entry,
98            &mut <P::SubProgress as Progress>::SubProgress,
99        ) -> Result<(), E>,
100        F: Fn() -> C + Send + Clone,
101    {
102        match traversal {
103            Algorithm::Lookup => self.traverse_with_lookup(
104                new_processor,
105                pack,
106                progress,
107                should_interrupt,
108                with_lookup::Options {
109                    thread_limit,
110                    check,
111                    make_pack_lookup_cache,
112                },
113            ),
114            Algorithm::DeltaTreeLookup => self.traverse_with_index(
115                pack,
116                new_processor,
117                progress,
118                should_interrupt,
119                crate::index::traverse::with_index::Options { check, thread_limit },
120            ),
121        }
122    }
123
124    fn possibly_verify<E>(
125        &self,
126        pack: &crate::data::File,
127        check: SafetyCheck,
128        pack_progress: impl Progress,
129        index_progress: impl Progress,
130        should_interrupt: &AtomicBool,
131    ) -> Result<git_hash::ObjectId, Error<E>>
132    where
133        E: std::error::Error + Send + Sync + 'static,
134    {
135        Ok(if check.file_checksum() {
136            if self.pack_checksum() != pack.checksum() {
137                return Err(Error::PackMismatch {
138                    actual: pack.checksum(),
139                    expected: self.pack_checksum(),
140                });
141            }
142            let (pack_res, id) = parallel::join(
143                move || pack.verify_checksum(pack_progress, should_interrupt),
144                move || self.verify_checksum(index_progress, should_interrupt),
145            );
146            pack_res?;
147            id?
148        } else {
149            self.index_checksum()
150        })
151    }
152
153    #[allow(clippy::too_many_arguments)]
154    fn decode_and_process_entry<C, P, E>(
155        &self,
156        check: SafetyCheck,
157        pack: &crate::data::File,
158        cache: &mut C,
159        buf: &mut Vec<u8>,
160        progress: &mut P,
161        index_entry: &crate::index::Entry,
162        processor: &mut impl FnMut(git_object::Kind, &[u8], &index::Entry, &mut P) -> Result<(), E>,
163    ) -> Result<crate::data::decode::entry::Outcome, Error<E>>
164    where
165        C: crate::cache::DecodeEntry,
166        P: Progress,
167        E: std::error::Error + Send + Sync + 'static,
168    {
169        let pack_entry = pack.entry(index_entry.pack_offset);
170        let pack_entry_data_offset = pack_entry.data_offset;
171        let entry_stats = pack
172            .decode_entry(
173                pack_entry,
174                buf,
175                |id, _| {
176                    self.lookup(id).map(|index| {
177                        crate::data::decode::entry::ResolvedBase::InPack(pack.entry(self.pack_offset_at_index(index)))
178                    })
179                },
180                cache,
181            )
182            .map_err(|e| Error::PackDecode {
183                source: e,
184                id: index_entry.oid,
185                offset: index_entry.pack_offset,
186            })?;
187        let object_kind = entry_stats.kind;
188        let header_size = (pack_entry_data_offset - index_entry.pack_offset) as usize;
189        let entry_len = header_size + entry_stats.compressed_size;
190
191        process_entry(
192            check,
193            object_kind,
194            buf,
195            progress,
196            index_entry,
197            || pack.entry_crc32(index_entry.pack_offset, entry_len),
198            processor,
199        )?;
200        Ok(entry_stats)
201    }
202}
203
204#[allow(clippy::too_many_arguments)]
205fn process_entry<P, E>(
206    check: SafetyCheck,
207    object_kind: git_object::Kind,
208    decompressed: &[u8],
209    progress: &mut P,
210    index_entry: &crate::index::Entry,
211    pack_entry_crc32: impl FnOnce() -> u32,
212    processor: &mut impl FnMut(git_object::Kind, &[u8], &index::Entry, &mut P) -> Result<(), E>,
213) -> Result<(), Error<E>>
214where
215    P: Progress,
216    E: std::error::Error + Send + Sync + 'static,
217{
218    if check.object_checksum() {
219        let mut hasher = git_features::hash::hasher(index_entry.oid.kind());
220        hasher.update(&git_object::encode::loose_header(object_kind, decompressed.len()));
221        hasher.update(decompressed);
222
223        let actual_oid = git_hash::ObjectId::from(hasher.digest());
224        if actual_oid != index_entry.oid {
225            return Err(Error::PackObjectMismatch {
226                actual: actual_oid,
227                expected: index_entry.oid,
228                offset: index_entry.pack_offset,
229                kind: object_kind,
230            });
231        }
232        if let Some(desired_crc32) = index_entry.crc32 {
233            let actual_crc32 = pack_entry_crc32();
234            if actual_crc32 != desired_crc32 {
235                return Err(Error::Crc32Mismatch {
236                    actual: actual_crc32,
237                    expected: desired_crc32,
238                    offset: index_entry.pack_offset,
239                    kind: object_kind,
240                });
241            }
242        }
243    }
244    processor(object_kind, decompressed, index_entry, progress).map_err(Error::Processor)
245}