1use std::sync::atomic::{AtomicBool, Ordering};
2
3use gix_features::{
4 parallel::{self, in_parallel_if},
5 progress::{self, Count, DynNestedProgress, Progress},
6 threading::{Mutable, OwnShared, lock},
7 zlib,
8};
9
10use super::{Error, Reducer};
11use crate::{
12 data, exact_vec, index,
13 index::{traverse::Outcome, util},
14};
15
16pub struct Options<F> {
18 pub thread_limit: Option<usize>,
21 pub check: index::traverse::SafetyCheck,
23 pub make_pack_lookup_cache: F,
25}
26
27impl Default for Options<fn() -> crate::cache::Never> {
28 fn default() -> Self {
29 Options {
30 check: Default::default(),
31 thread_limit: None,
32 make_pack_lookup_cache: || crate::cache::Never,
33 }
34 }
35}
36
37#[derive(Debug, Copy, Clone)]
41pub enum ProgressId {
42 HashPackDataBytes,
44 HashPackIndexBytes,
46 CollectSortedIndexEntries,
48 DecodedObjects,
50}
51
52impl From<ProgressId> for gix_features::progress::Id {
53 fn from(v: ProgressId) -> Self {
54 match v {
55 ProgressId::HashPackDataBytes => *b"PTHP",
56 ProgressId::HashPackIndexBytes => *b"PTHI",
57 ProgressId::CollectSortedIndexEntries => *b"PTCE",
58 ProgressId::DecodedObjects => *b"PTRO",
59 }
60 }
61}
62
63impl<T> index::File<T>
65where
66 T: crate::FileData + Sync,
67{
68 pub fn traverse_with_lookup<C, Processor, E, F, D>(
73 &self,
74 mut processor: Processor,
75 pack: &data::File<D>,
76 progress: &mut dyn DynNestedProgress,
77 should_interrupt: &AtomicBool,
78 Options {
79 thread_limit,
80 check,
81 make_pack_lookup_cache,
82 }: Options<F>,
83 ) -> Result<Outcome, Error<E>>
84 where
85 C: crate::cache::DecodeEntry,
86 E: std::error::Error + Send + Sync + 'static,
87 Processor: FnMut(gix_object::Kind, &[u8], &index::Entry, &dyn Progress) -> Result<(), E> + Send + Clone,
88 F: Fn() -> C + Send + Clone,
89 D: crate::FileData + Send + Sync,
90 {
91 let (verify_result, traversal_result) = parallel::join(
92 {
93 let mut pack_progress = progress.add_child_with_id(
94 format!("Hash of pack '{}'", crate::source_name(pack.path())),
95 ProgressId::HashPackDataBytes.into(),
96 );
97 let mut index_progress = progress.add_child_with_id(
98 format!("Hash of index '{}'", crate::source_name(&self.path)),
99 ProgressId::HashPackIndexBytes.into(),
100 );
101 move || {
102 let res =
103 self.possibly_verify(pack, check, &mut pack_progress, &mut index_progress, should_interrupt);
104 if res.is_err() {
105 should_interrupt.store(true, Ordering::SeqCst);
106 }
107 res
108 }
109 },
110 || {
111 let index_entries = util::index_entries_sorted_by_offset_ascending(
112 self,
113 &mut progress.add_child_with_id(
114 "collecting sorted index".into(),
115 ProgressId::CollectSortedIndexEntries.into(),
116 ),
117 );
118
119 let (chunk_size, thread_limit, available_cores) =
120 parallel::optimize_chunk_size_and_thread_limit(1000, Some(index_entries.len()), thread_limit, None);
121 let there_are_enough_entries_to_process = || index_entries.len() > chunk_size * available_cores;
122 let input_chunks = index_entries.chunks(chunk_size);
123 let reduce_progress = OwnShared::new(Mutable::new({
124 let mut p = progress.add_child_with_id("Traversing".into(), ProgressId::DecodedObjects.into());
125 p.init(Some(self.num_objects() as usize), progress::count("objects"));
126 p
127 }));
128 let state_per_thread = {
129 let reduce_progress = reduce_progress.clone();
130 move |index| {
131 (
132 make_pack_lookup_cache(),
133 Vec::with_capacity(2048), zlib::Inflate::default(),
135 lock(&reduce_progress)
136 .add_child_with_id(format!("thread {index}"), gix_features::progress::UNKNOWN), )
138 }
139 };
140
141 in_parallel_if(
142 there_are_enough_entries_to_process,
143 input_chunks,
144 thread_limit,
145 state_per_thread,
146 move |entries: &[index::Entry],
147 (cache, buf, inflate, progress)|
148 -> Result<Vec<data::decode::entry::Outcome>, Error<_>> {
149 progress.init(
150 Some(entries.len()),
151 gix_features::progress::count_with_decimals("objects", 2),
152 );
153 let mut stats = exact_vec(entries.len());
154 progress.set(0);
155 for index_entry in entries.iter() {
156 let result = self.decode_and_process_entry(
157 check,
158 pack,
159 cache,
160 buf,
161 inflate,
162 progress,
163 index_entry,
164 &mut processor,
165 );
166 progress.inc();
167 let stat = match result {
168 Err(err @ Error::PackDecode { .. }) if !check.fatal_decode_error() => {
169 progress.info(format!("Ignoring decode error: {err}"));
170 continue;
171 }
172 res => res,
173 }?;
174 stats.push(stat);
175 if should_interrupt.load(Ordering::Relaxed) {
176 break;
177 }
178 }
179 Ok(stats)
180 },
181 Reducer::from_progress(reduce_progress, pack.data_len(), check, should_interrupt),
182 )
183 },
184 );
185 Ok(Outcome {
186 actual_index_checksum: verify_result?,
187 statistics: traversal_result?,
188 })
189 }
190}