Skip to main content

gix_pack/index/traverse/
with_lookup.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2
3use gix_features::{
4    parallel::{self, in_parallel_if},
5    progress::{self, Count, DynNestedProgress, Progress},
6    threading::{Mutable, OwnShared, lock},
7    zlib,
8};
9
10use super::{Error, Reducer};
11use crate::{
12    data, exact_vec, index,
13    index::{traverse::Outcome, util},
14};
15
16/// Traversal options for [`index::File::traverse_with_lookup()`]
17pub struct Options<F> {
18    /// If `Some`, only use the given number of threads. Otherwise, the number of threads to use will be selected based on
19    /// the number of available logical cores.
20    pub thread_limit: Option<usize>,
21    /// The kinds of safety checks to perform.
22    pub check: index::traverse::SafetyCheck,
23    /// A function to create a pack cache
24    pub make_pack_lookup_cache: F,
25}
26
27impl Default for Options<fn() -> crate::cache::Never> {
28    fn default() -> Self {
29        Options {
30            check: Default::default(),
31            thread_limit: None,
32            make_pack_lookup_cache: || crate::cache::Never,
33        }
34    }
35}
36
37/// The progress ids used in [`index::File::traverse_with_lookup()`].
38///
39/// Use this information to selectively extract the progress of interest in case the parent application has custom visualization.
40#[derive(Debug, Copy, Clone)]
41pub enum ProgressId {
42    /// The amount of bytes currently processed to generate a checksum of the *pack data file*.
43    HashPackDataBytes,
44    /// The amount of bytes currently processed to generate a checksum of the *pack index file*.
45    HashPackIndexBytes,
46    /// Collect all object hashes into a vector and sort it by their pack offset.
47    CollectSortedIndexEntries,
48    /// The amount of objects which were decoded by brute-force.
49    DecodedObjects,
50}
51
52impl From<ProgressId> for gix_features::progress::Id {
53    fn from(v: ProgressId) -> Self {
54        match v {
55            ProgressId::HashPackDataBytes => *b"PTHP",
56            ProgressId::HashPackIndexBytes => *b"PTHI",
57            ProgressId::CollectSortedIndexEntries => *b"PTCE",
58            ProgressId::DecodedObjects => *b"PTRO",
59        }
60    }
61}
62
63/// Verify and validate the content of the index file
64impl<T> index::File<T>
65where
66    T: crate::FileData + Sync,
67{
68    /// Iterate through all _decoded objects_ in the given `pack` and handle them with a `Processor` using a cache to reduce the amount of
69    /// waste while decoding objects.
70    ///
71    /// For more details, see the documentation on the [`traverse()`][index::File::traverse()] method.
72    pub fn traverse_with_lookup<C, Processor, E, F, D>(
73        &self,
74        mut processor: Processor,
75        pack: &data::File<D>,
76        progress: &mut dyn DynNestedProgress,
77        should_interrupt: &AtomicBool,
78        Options {
79            thread_limit,
80            check,
81            make_pack_lookup_cache,
82        }: Options<F>,
83    ) -> Result<Outcome, Error<E>>
84    where
85        C: crate::cache::DecodeEntry,
86        E: std::error::Error + Send + Sync + 'static,
87        Processor: FnMut(gix_object::Kind, &[u8], &index::Entry, &dyn Progress) -> Result<(), E> + Send + Clone,
88        F: Fn() -> C + Send + Clone,
89        D: crate::FileData + Send + Sync,
90    {
91        let (verify_result, traversal_result) = parallel::join(
92            {
93                let mut pack_progress = progress.add_child_with_id(
94                    format!("Hash of pack '{}'", crate::source_name(pack.path())),
95                    ProgressId::HashPackDataBytes.into(),
96                );
97                let mut index_progress = progress.add_child_with_id(
98                    format!("Hash of index '{}'", crate::source_name(&self.path)),
99                    ProgressId::HashPackIndexBytes.into(),
100                );
101                move || {
102                    let res =
103                        self.possibly_verify(pack, check, &mut pack_progress, &mut index_progress, should_interrupt);
104                    if res.is_err() {
105                        should_interrupt.store(true, Ordering::SeqCst);
106                    }
107                    res
108                }
109            },
110            || {
111                let index_entries = util::index_entries_sorted_by_offset_ascending(
112                    self,
113                    &mut progress.add_child_with_id(
114                        "collecting sorted index".into(),
115                        ProgressId::CollectSortedIndexEntries.into(),
116                    ),
117                );
118
119                let (chunk_size, thread_limit, available_cores) =
120                    parallel::optimize_chunk_size_and_thread_limit(1000, Some(index_entries.len()), thread_limit, None);
121                let there_are_enough_entries_to_process = || index_entries.len() > chunk_size * available_cores;
122                let input_chunks = index_entries.chunks(chunk_size);
123                let reduce_progress = OwnShared::new(Mutable::new({
124                    let mut p = progress.add_child_with_id("Traversing".into(), ProgressId::DecodedObjects.into());
125                    p.init(Some(self.num_objects() as usize), progress::count("objects"));
126                    p
127                }));
128                let state_per_thread = {
129                    let reduce_progress = reduce_progress.clone();
130                    move |index| {
131                        (
132                            make_pack_lookup_cache(),
133                            Vec::with_capacity(2048), // decode buffer
134                            zlib::Inflate::default(),
135                            lock(&reduce_progress)
136                                .add_child_with_id(format!("thread {index}"), gix_features::progress::UNKNOWN), // per thread progress
137                        )
138                    }
139                };
140
141                in_parallel_if(
142                    there_are_enough_entries_to_process,
143                    input_chunks,
144                    thread_limit,
145                    state_per_thread,
146                    move |entries: &[index::Entry],
147                          (cache, buf, inflate, progress)|
148                          -> Result<Vec<data::decode::entry::Outcome>, Error<_>> {
149                        progress.init(
150                            Some(entries.len()),
151                            gix_features::progress::count_with_decimals("objects", 2),
152                        );
153                        let mut stats = exact_vec(entries.len());
154                        progress.set(0);
155                        for index_entry in entries.iter() {
156                            let result = self.decode_and_process_entry(
157                                check,
158                                pack,
159                                cache,
160                                buf,
161                                inflate,
162                                progress,
163                                index_entry,
164                                &mut processor,
165                            );
166                            progress.inc();
167                            let stat = match result {
168                                Err(err @ Error::PackDecode { .. }) if !check.fatal_decode_error() => {
169                                    progress.info(format!("Ignoring decode error: {err}"));
170                                    continue;
171                                }
172                                res => res,
173                            }?;
174                            stats.push(stat);
175                            if should_interrupt.load(Ordering::Relaxed) {
176                                break;
177                            }
178                        }
179                        Ok(stats)
180                    },
181                    Reducer::from_progress(reduce_progress, pack.data_len(), check, should_interrupt),
182                )
183            },
184        );
185        Ok(Outcome {
186            actual_index_checksum: verify_result?,
187            statistics: traversal_result?,
188        })
189    }
190}