1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use super::{Error, Reducer, SafetyCheck};
use crate::pack::{self, data, index, index::util};
use git_features::{
interrupt::ResetOnDrop,
parallel::{self, in_parallel_if},
progress::{self, unit, Progress},
};
impl index::File {
pub fn traverse_with_lookup<P, C, Processor, E>(
&self,
check: SafetyCheck,
thread_limit: Option<usize>,
new_processor: impl Fn() -> Processor + Send + Sync,
new_cache: impl Fn() -> C + Send + Sync,
mut progress: P,
pack: &pack::data::File,
) -> Result<(git_hash::ObjectId, index::traverse::Outcome, P), Error<E>>
where
P: Progress,
C: pack::cache::DecodeEntry,
E: std::error::Error + Send + Sync + 'static,
Processor: FnMut(
git_object::Kind,
&[u8],
&index::Entry,
&mut <<P as Progress>::SubProgress as Progress>::SubProgress,
) -> Result<(), E>,
{
let _reset_interrupt = ResetOnDrop::default();
let (verify_result, traversal_result) = parallel::join(
{
let pack_progress = progress.add_child("SHA1 of pack");
let index_progress = progress.add_child("SHA1 of index");
move || {
let res = self.possibly_verify(pack, check, pack_progress, index_progress);
if res.is_err() {
git_features::interrupt::trigger();
}
res
}
},
|| {
let index_entries =
util::index_entries_sorted_by_offset_ascending(self, progress.add_child("collecting sorted index"));
let (chunk_size, thread_limit, available_cores) =
parallel::optimize_chunk_size_and_thread_limit(1000, Some(index_entries.len()), thread_limit, None);
let there_are_enough_entries_to_process = || index_entries.len() > chunk_size * available_cores;
let input_chunks = index_entries.chunks(chunk_size.max(chunk_size));
let reduce_progress = parking_lot::Mutex::new({
let mut p = progress.add_child("Traversing");
p.init(Some(self.num_objects() as usize), progress::count("objects"));
p
});
let state_per_thread = |index| {
(
new_cache(),
new_processor(),
Vec::with_capacity(2048),
reduce_progress.lock().add_child(format!("thread {}", index)),
)
};
in_parallel_if(
there_are_enough_entries_to_process,
input_chunks,
thread_limit,
state_per_thread,
|entries: &[index::Entry],
(cache, ref mut processor, buf, progress)|
-> Result<Vec<data::decode_entry::Outcome>, Error<_>> {
progress.init(
Some(entries.len()),
Some(unit::dynamic(unit::Human::new(
unit::human::Formatter::new(),
"objects",
))),
);
let mut stats = Vec::with_capacity(entries.len());
let mut header_buf = [0u8; 64];
for index_entry in entries.iter() {
let result = self.decode_and_process_entry(
check,
pack,
cache,
buf,
progress,
&mut header_buf,
index_entry,
processor,
);
progress.inc();
let stat = match result {
Err(err @ Error::PackDecode { .. }) if !check.fatal_decode_error() => {
progress.info(format!("Ignoring decode error: {}", err));
continue;
}
res => res,
}?;
stats.push(stat);
}
Ok(stats)
},
Reducer::from_progress(&reduce_progress, pack.data_len(), check),
)
},
);
let id = verify_result?;
let res = traversal_result?;
Ok((id, res, progress))
}
}