git_pack/index/traverse/
with_lookup.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2
3use git_features::{
4    parallel::{self, in_parallel_if},
5    progress::{self, unit, Progress},
6    threading::{lock, Mutable, OwnShared},
7};
8
9use super::{Error, Reducer};
10use crate::{
11    data, index,
12    index::{traverse::Outcome, util},
13};
14
15/// Traversal options for [`index::File::traverse_with_lookup()`]
16pub struct Options<F> {
17    /// If `Some`, only use the given amount of threads. Otherwise, the amount of threads to use will be selected based on
18    /// the amount of available logical cores.
19    pub thread_limit: Option<usize>,
20    /// The kinds of safety checks to perform.
21    pub check: index::traverse::SafetyCheck,
22    /// A function to create a pack cache
23    pub make_pack_lookup_cache: F,
24}
25
26impl Default for Options<fn() -> crate::cache::Never> {
27    fn default() -> Self {
28        Options {
29            check: Default::default(),
30            thread_limit: None,
31            make_pack_lookup_cache: || crate::cache::Never,
32        }
33    }
34}
35
36/// The progress ids used in [`index::File::traverse_with_lookup()`].
37///
38/// Use this information to selectively extract the progress of interest in case the parent application has custom visualization.
39#[derive(Debug, Copy, Clone)]
40pub enum ProgressId {
41    /// The amount of bytes currently processed to generate a checksum of the *pack data file*.
42    HashPackDataBytes,
43    /// The amount of bytes currently processed to generate a checksum of the *pack index file*.
44    HashPackIndexBytes,
45    /// Collect all object hashes into a vector and sort it by their pack offset.
46    CollectSortedIndexEntries,
47    /// The amount of objects which were decoded by brute-force.
48    DecodedObjects,
49}
50
51impl From<ProgressId> for git_features::progress::Id {
52    fn from(v: ProgressId) -> Self {
53        match v {
54            ProgressId::HashPackDataBytes => *b"PTHP",
55            ProgressId::HashPackIndexBytes => *b"PTHI",
56            ProgressId::CollectSortedIndexEntries => *b"PTCE",
57            ProgressId::DecodedObjects => *b"PTRO",
58        }
59    }
60}
61
62/// Verify and validate the content of the index file
63impl index::File {
64    /// Iterate through all _decoded objects_ in the given `pack` and handle them with a `Processor` using a cache to reduce the amount of
65    /// waste while decoding objects.
66    ///
67    /// For more details, see the documentation on the [`traverse()`][index::File::traverse()] method.
68    pub fn traverse_with_lookup<P, C, Processor, E, F>(
69        &self,
70        new_processor: impl Fn() -> Processor + Send + Clone,
71        pack: &crate::data::File,
72        mut progress: P,
73        should_interrupt: &AtomicBool,
74        Options {
75            thread_limit,
76            check,
77            make_pack_lookup_cache,
78        }: Options<F>,
79    ) -> Result<Outcome<P>, Error<E>>
80    where
81        P: Progress,
82        C: crate::cache::DecodeEntry,
83        E: std::error::Error + Send + Sync + 'static,
84        Processor: FnMut(
85            git_object::Kind,
86            &[u8],
87            &index::Entry,
88            &mut <P::SubProgress as Progress>::SubProgress,
89        ) -> Result<(), E>,
90        F: Fn() -> C + Send + Clone,
91    {
92        let (verify_result, traversal_result) = parallel::join(
93            {
94                let pack_progress = progress.add_child_with_id(
95                    format!(
96                        "Hash of pack '{}'",
97                        pack.path().file_name().expect("pack has filename").to_string_lossy()
98                    ),
99                    ProgressId::HashPackDataBytes.into(),
100                );
101                let index_progress = progress.add_child_with_id(
102                    format!(
103                        "Hash of index '{}'",
104                        self.path.file_name().expect("index has filename").to_string_lossy()
105                    ),
106                    ProgressId::HashPackIndexBytes.into(),
107                );
108                move || {
109                    let res = self.possibly_verify(pack, check, pack_progress, index_progress, should_interrupt);
110                    if res.is_err() {
111                        should_interrupt.store(true, Ordering::SeqCst);
112                    }
113                    res
114                }
115            },
116            || {
117                let index_entries = util::index_entries_sorted_by_offset_ascending(
118                    self,
119                    progress.add_child_with_id("collecting sorted index", ProgressId::CollectSortedIndexEntries.into()),
120                );
121
122                let (chunk_size, thread_limit, available_cores) =
123                    parallel::optimize_chunk_size_and_thread_limit(1000, Some(index_entries.len()), thread_limit, None);
124                let there_are_enough_entries_to_process = || index_entries.len() > chunk_size * available_cores;
125                let input_chunks = index_entries.chunks(chunk_size.max(chunk_size));
126                let reduce_progress = OwnShared::new(Mutable::new({
127                    let mut p = progress.add_child_with_id("Traversing", ProgressId::DecodedObjects.into());
128                    p.init(Some(self.num_objects() as usize), progress::count("objects"));
129                    p
130                }));
131                let state_per_thread = {
132                    let reduce_progress = reduce_progress.clone();
133                    move |index| {
134                        (
135                            make_pack_lookup_cache(),
136                            new_processor(),
137                            Vec::with_capacity(2048), // decode buffer
138                            lock(&reduce_progress)
139                                .add_child_with_id(format!("thread {index}"), git_features::progress::UNKNOWN), // per thread progress
140                        )
141                    }
142                };
143
144                in_parallel_if(
145                    there_are_enough_entries_to_process,
146                    input_chunks,
147                    thread_limit,
148                    state_per_thread,
149                    |entries: &[index::Entry],
150                     (cache, ref mut processor, buf, progress)|
151                     -> Result<Vec<data::decode::entry::Outcome>, Error<_>> {
152                        progress.init(
153                            Some(entries.len()),
154                            Some(unit::dynamic(unit::Human::new(
155                                unit::human::Formatter::new(),
156                                "objects",
157                            ))),
158                        );
159                        let mut stats = Vec::with_capacity(entries.len());
160                        progress.set(0);
161                        for index_entry in entries.iter() {
162                            let result = self.decode_and_process_entry(
163                                check,
164                                pack,
165                                cache,
166                                buf,
167                                progress,
168                                index_entry,
169                                processor,
170                            );
171                            progress.inc();
172                            let stat = match result {
173                                Err(err @ Error::PackDecode { .. }) if !check.fatal_decode_error() => {
174                                    progress.info(format!("Ignoring decode error: {err}"));
175                                    continue;
176                                }
177                                res => res,
178                            }?;
179                            stats.push(stat);
180                        }
181                        Ok(stats)
182                    },
183                    Reducer::from_progress(reduce_progress, pack.data_len(), check, should_interrupt),
184                )
185            },
186        );
187        Ok(Outcome {
188            actual_index_checksum: verify_result?,
189            statistics: traversal_result?,
190            progress,
191        })
192    }
193}