1use std::sync::atomic::{AtomicBool, Ordering};
2
3use git_features::{
4 parallel::{self, in_parallel_if},
5 progress::{self, unit, Progress},
6 threading::{lock, Mutable, OwnShared},
7};
8
9use super::{Error, Reducer};
10use crate::{
11 data, index,
12 index::{traverse::Outcome, util},
13};
14
15pub struct Options<F> {
17 pub thread_limit: Option<usize>,
20 pub check: index::traverse::SafetyCheck,
22 pub make_pack_lookup_cache: F,
24}
25
26impl Default for Options<fn() -> crate::cache::Never> {
27 fn default() -> Self {
28 Options {
29 check: Default::default(),
30 thread_limit: None,
31 make_pack_lookup_cache: || crate::cache::Never,
32 }
33 }
34}
35
36#[derive(Debug, Copy, Clone)]
40pub enum ProgressId {
41 HashPackDataBytes,
43 HashPackIndexBytes,
45 CollectSortedIndexEntries,
47 DecodedObjects,
49}
50
51impl From<ProgressId> for git_features::progress::Id {
52 fn from(v: ProgressId) -> Self {
53 match v {
54 ProgressId::HashPackDataBytes => *b"PTHP",
55 ProgressId::HashPackIndexBytes => *b"PTHI",
56 ProgressId::CollectSortedIndexEntries => *b"PTCE",
57 ProgressId::DecodedObjects => *b"PTRO",
58 }
59 }
60}
61
62impl index::File {
64 pub fn traverse_with_lookup<P, C, Processor, E, F>(
69 &self,
70 new_processor: impl Fn() -> Processor + Send + Clone,
71 pack: &crate::data::File,
72 mut progress: P,
73 should_interrupt: &AtomicBool,
74 Options {
75 thread_limit,
76 check,
77 make_pack_lookup_cache,
78 }: Options<F>,
79 ) -> Result<Outcome<P>, Error<E>>
80 where
81 P: Progress,
82 C: crate::cache::DecodeEntry,
83 E: std::error::Error + Send + Sync + 'static,
84 Processor: FnMut(
85 git_object::Kind,
86 &[u8],
87 &index::Entry,
88 &mut <P::SubProgress as Progress>::SubProgress,
89 ) -> Result<(), E>,
90 F: Fn() -> C + Send + Clone,
91 {
92 let (verify_result, traversal_result) = parallel::join(
93 {
94 let pack_progress = progress.add_child_with_id(
95 format!(
96 "Hash of pack '{}'",
97 pack.path().file_name().expect("pack has filename").to_string_lossy()
98 ),
99 ProgressId::HashPackDataBytes.into(),
100 );
101 let index_progress = progress.add_child_with_id(
102 format!(
103 "Hash of index '{}'",
104 self.path.file_name().expect("index has filename").to_string_lossy()
105 ),
106 ProgressId::HashPackIndexBytes.into(),
107 );
108 move || {
109 let res = self.possibly_verify(pack, check, pack_progress, index_progress, should_interrupt);
110 if res.is_err() {
111 should_interrupt.store(true, Ordering::SeqCst);
112 }
113 res
114 }
115 },
116 || {
117 let index_entries = util::index_entries_sorted_by_offset_ascending(
118 self,
119 progress.add_child_with_id("collecting sorted index", ProgressId::CollectSortedIndexEntries.into()),
120 );
121
122 let (chunk_size, thread_limit, available_cores) =
123 parallel::optimize_chunk_size_and_thread_limit(1000, Some(index_entries.len()), thread_limit, None);
124 let there_are_enough_entries_to_process = || index_entries.len() > chunk_size * available_cores;
125 let input_chunks = index_entries.chunks(chunk_size.max(chunk_size));
126 let reduce_progress = OwnShared::new(Mutable::new({
127 let mut p = progress.add_child_with_id("Traversing", ProgressId::DecodedObjects.into());
128 p.init(Some(self.num_objects() as usize), progress::count("objects"));
129 p
130 }));
131 let state_per_thread = {
132 let reduce_progress = reduce_progress.clone();
133 move |index| {
134 (
135 make_pack_lookup_cache(),
136 new_processor(),
137 Vec::with_capacity(2048), lock(&reduce_progress)
139 .add_child_with_id(format!("thread {index}"), git_features::progress::UNKNOWN), )
141 }
142 };
143
144 in_parallel_if(
145 there_are_enough_entries_to_process,
146 input_chunks,
147 thread_limit,
148 state_per_thread,
149 |entries: &[index::Entry],
150 (cache, ref mut processor, buf, progress)|
151 -> Result<Vec<data::decode::entry::Outcome>, Error<_>> {
152 progress.init(
153 Some(entries.len()),
154 Some(unit::dynamic(unit::Human::new(
155 unit::human::Formatter::new(),
156 "objects",
157 ))),
158 );
159 let mut stats = Vec::with_capacity(entries.len());
160 progress.set(0);
161 for index_entry in entries.iter() {
162 let result = self.decode_and_process_entry(
163 check,
164 pack,
165 cache,
166 buf,
167 progress,
168 index_entry,
169 processor,
170 );
171 progress.inc();
172 let stat = match result {
173 Err(err @ Error::PackDecode { .. }) if !check.fatal_decode_error() => {
174 progress.info(format!("Ignoring decode error: {err}"));
175 continue;
176 }
177 res => res,
178 }?;
179 stats.push(stat);
180 }
181 Ok(stats)
182 },
183 Reducer::from_progress(reduce_progress, pack.data_len(), check, should_interrupt),
184 )
185 },
186 );
187 Ok(Outcome {
188 actual_index_checksum: verify_result?,
189 statistics: traversal_result?,
190 progress,
191 })
192 }
193}