1pub(crate) mod function {
2 use std::{cmp::Ordering, sync::Arc};
3
4 use git_features::{parallel, parallel::SequenceId, progress::Progress};
5
6 use super::{reduce, util, Error, Mode, Options, Outcome, ProgressId};
7 use crate::data::output;
8
9 pub fn iter_from_counts<Find>(
39 mut counts: Vec<output::Count>,
40 db: Find,
41 mut progress: impl Progress + 'static,
42 Options {
43 version,
44 mode,
45 allow_thin_pack,
46 thread_limit,
47 chunk_size,
48 }: Options,
49 ) -> impl Iterator<Item = Result<(SequenceId, Vec<output::Entry>), Error<Find::Error>>>
50 + parallel::reduce::Finalize<Reduce = reduce::Statistics<Error<Find::Error>>>
51 where
52 Find: crate::Find + Send + Clone + 'static,
53 <Find as crate::Find>::Error: Send,
54 {
55 assert!(
56 matches!(version, crate::data::Version::V2),
57 "currently we can only write version 2"
58 );
59 let (chunk_size, thread_limit, _) =
60 parallel::optimize_chunk_size_and_thread_limit(chunk_size, Some(counts.len()), thread_limit, None);
61 {
62 let progress = Arc::new(parking_lot::Mutex::new(
63 progress.add_child_with_id("resolving", ProgressId::ResolveCounts.into()),
64 ));
65 progress.lock().init(None, git_features::progress::count("counts"));
66 let enough_counts_present = counts.len() > 4_000;
67 let start = std::time::Instant::now();
68 parallel::in_parallel_if(
69 || enough_counts_present,
70 counts.chunks_mut(chunk_size),
71 thread_limit,
72 |_n| Vec::<u8>::new(),
73 {
74 let progress = Arc::clone(&progress);
75 let db = db.clone();
76 move |chunk, buf| {
77 let chunk_size = chunk.len();
78 for count in chunk {
79 use crate::data::output::count::PackLocation::*;
80 match count.entry_pack_location {
81 LookedUp(_) => continue,
82 NotLookedUp => count.entry_pack_location = LookedUp(db.location_by_oid(count.id, buf)),
83 }
84 }
85 progress.lock().inc_by(chunk_size);
86 Ok::<_, ()>(())
87 }
88 },
89 parallel::reduce::IdentityWithResult::<(), ()>::default(),
90 )
91 .expect("infallible - we ignore none-existing objects");
92 progress.lock().show_throughput(start);
93 }
94 let counts_range_by_pack_id = match mode {
95 Mode::PackCopyAndBaseObjects => {
96 let mut progress = progress.add_child_with_id("sorting", ProgressId::SortEntries.into());
97 progress.init(Some(counts.len()), git_features::progress::count("counts"));
98 let start = std::time::Instant::now();
99
100 use crate::data::output::count::PackLocation::*;
101 counts.sort_by(|lhs, rhs| match (&lhs.entry_pack_location, &rhs.entry_pack_location) {
102 (LookedUp(None), LookedUp(None)) => Ordering::Equal,
103 (LookedUp(Some(_)), LookedUp(None)) => Ordering::Greater,
104 (LookedUp(None), LookedUp(Some(_))) => Ordering::Less,
105 (LookedUp(Some(lhs)), LookedUp(Some(rhs))) => lhs
106 .pack_id
107 .cmp(&rhs.pack_id)
108 .then(lhs.pack_offset.cmp(&rhs.pack_offset)),
109 (_, _) => unreachable!("counts were resolved beforehand"),
110 });
111
112 let mut index: Vec<(u32, std::ops::Range<usize>)> = Vec::new();
113 let mut chunks_pack_start = counts.partition_point(|e| e.entry_pack_location.is_none());
114 let mut slice = &counts[chunks_pack_start..];
115 while !slice.is_empty() {
116 let current_pack_id = slice[0].entry_pack_location.as_ref().expect("packed object").pack_id;
117 let pack_end = slice.partition_point(|e| {
118 e.entry_pack_location.as_ref().expect("packed object").pack_id == current_pack_id
119 });
120 index.push((current_pack_id, chunks_pack_start..chunks_pack_start + pack_end));
121 slice = &slice[pack_end..];
122 chunks_pack_start += pack_end;
123 }
124
125 progress.set(counts.len());
126 progress.show_throughput(start);
127
128 index
129 }
130 };
131
132 let counts = Arc::new(counts);
133 let progress = Arc::new(parking_lot::Mutex::new(progress));
134 let chunks = util::ChunkRanges::new(chunk_size, counts.len());
135
136 parallel::reduce::Stepwise::new(
137 chunks.enumerate(),
138 thread_limit,
139 {
140 let progress = Arc::clone(&progress);
141 move |n| {
142 (
143 Vec::new(), progress
145 .lock()
146 .add_child_with_id(format!("thread {n}"), git_features::progress::UNKNOWN),
147 )
148 }
149 },
150 {
151 let counts = Arc::clone(&counts);
152 move |(chunk_id, chunk_range): (SequenceId, std::ops::Range<usize>), (buf, progress)| {
153 let mut out = Vec::new();
154 let chunk = &counts[chunk_range];
155 let mut stats = Outcome::default();
156 let mut pack_offsets_to_id = None;
157 progress.init(Some(chunk.len()), git_features::progress::count("objects"));
158
159 for count in chunk.iter() {
160 out.push(match count
161 .entry_pack_location
162 .as_ref()
163 .and_then(|l| db.entry_by_location(l).map(|pe| (l, pe)))
164 {
165 Some((location, pack_entry)) => {
166 if let Some((cached_pack_id, _)) = &pack_offsets_to_id {
167 if *cached_pack_id != location.pack_id {
168 pack_offsets_to_id = None;
169 }
170 }
171 let pack_range = counts_range_by_pack_id[counts_range_by_pack_id
172 .binary_search_by_key(&location.pack_id, |e| e.0)
173 .expect("pack-id always present")]
174 .1
175 .clone();
176 let base_index_offset = pack_range.start;
177 let counts_in_pack = &counts[pack_range];
178 match output::Entry::from_pack_entry(
179 pack_entry,
180 count,
181 counts_in_pack,
182 base_index_offset,
183 allow_thin_pack.then_some({
184 |pack_id, base_offset| {
185 let (cached_pack_id, cache) = pack_offsets_to_id.get_or_insert_with(|| {
186 db.pack_offsets_and_oid(pack_id)
187 .map(|mut v| {
188 v.sort_by_key(|e| e.0);
189 (pack_id, v)
190 })
191 .expect("pack used for counts is still available")
192 });
193 debug_assert_eq!(*cached_pack_id, pack_id);
194 stats.ref_delta_objects += 1;
195 cache
196 .binary_search_by_key(&base_offset, |e| e.0)
197 .ok()
198 .map(|idx| cache[idx].1)
199 }
200 }),
201 version,
202 ) {
203 Some(entry) => {
204 stats.objects_copied_from_pack += 1;
205 entry
206 }
207 None => match db.try_find(count.id, buf).map_err(Error::FindExisting)? {
208 Some((obj, _location)) => {
209 stats.decoded_and_recompressed_objects += 1;
210 output::Entry::from_data(count, &obj)
211 }
212 None => {
213 stats.missing_objects += 1;
214 Ok(output::Entry::invalid())
215 }
216 },
217 }
218 }
219 None => match db.try_find(count.id, buf).map_err(Error::FindExisting)? {
220 Some((obj, _location)) => {
221 stats.decoded_and_recompressed_objects += 1;
222 output::Entry::from_data(count, &obj)
223 }
224 None => {
225 stats.missing_objects += 1;
226 Ok(output::Entry::invalid())
227 }
228 },
229 }?);
230 progress.inc();
231 }
232 Ok((chunk_id, out, stats))
233 }
234 },
235 reduce::Statistics::default(),
236 )
237 }
238}
239
240mod util {
241 #[derive(Clone)]
242 pub struct ChunkRanges {
243 cursor: usize,
244 size: usize,
245 len: usize,
246 }
247
248 impl ChunkRanges {
249 pub fn new(size: usize, total: usize) -> Self {
250 ChunkRanges {
251 cursor: 0,
252 size,
253 len: total,
254 }
255 }
256 }
257
258 impl Iterator for ChunkRanges {
259 type Item = std::ops::Range<usize>;
260
261 fn next(&mut self) -> Option<Self::Item> {
262 if self.cursor >= self.len {
263 None
264 } else {
265 let upper = (self.cursor + self.size).min(self.len);
266 let range = self.cursor..upper;
267 self.cursor = upper;
268 Some(range)
269 }
270 }
271 }
272}
273
274mod reduce {
275 use std::marker::PhantomData;
276
277 use git_features::{parallel, parallel::SequenceId};
278
279 use super::Outcome;
280 use crate::data::output;
281
282 pub struct Statistics<E> {
283 total: Outcome,
284 _err: PhantomData<E>,
285 }
286
287 impl<E> Default for Statistics<E> {
288 fn default() -> Self {
289 Statistics {
290 total: Default::default(),
291 _err: PhantomData::default(),
292 }
293 }
294 }
295
296 impl<Error> parallel::Reduce for Statistics<Error> {
297 type Input = Result<(SequenceId, Vec<output::Entry>, Outcome), Error>;
298 type FeedProduce = (SequenceId, Vec<output::Entry>);
299 type Output = Outcome;
300 type Error = Error;
301
302 fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error> {
303 item.map(|(cid, entries, stats)| {
304 self.total.aggregate(stats);
305 (cid, entries)
306 })
307 }
308
309 fn finalize(self) -> Result<Self::Output, Self::Error> {
310 Ok(self.total)
311 }
312 }
313}
314
315mod types {
316 use crate::data::output::entry;
317
318 #[derive(Default, PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)]
320 #[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
321 pub struct Outcome {
322 pub decoded_and_recompressed_objects: usize,
324 pub missing_objects: usize,
326 pub objects_copied_from_pack: usize,
329 pub ref_delta_objects: usize,
331 }
332
333 impl Outcome {
334 pub(in crate::data::output::entry) fn aggregate(
335 &mut self,
336 Outcome {
337 decoded_and_recompressed_objects: decoded_objects,
338 missing_objects,
339 objects_copied_from_pack,
340 ref_delta_objects,
341 }: Self,
342 ) {
343 self.decoded_and_recompressed_objects += decoded_objects;
344 self.missing_objects += missing_objects;
345 self.objects_copied_from_pack += objects_copied_from_pack;
346 self.ref_delta_objects += ref_delta_objects;
347 }
348 }
349
350 #[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)]
352 #[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
353 pub enum Mode {
354 PackCopyAndBaseObjects,
359 }
360
361 #[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)]
363 #[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
364 pub struct Options {
365 pub thread_limit: Option<usize>,
367 pub mode: Mode,
369 pub allow_thin_pack: bool,
376 pub chunk_size: usize,
379 pub version: crate::data::Version,
381 }
382
383 impl Default for Options {
384 fn default() -> Self {
385 Options {
386 thread_limit: None,
387 mode: Mode::PackCopyAndBaseObjects,
388 allow_thin_pack: false,
389 chunk_size: 10,
390 version: Default::default(),
391 }
392 }
393 }
394
395 #[derive(Debug, thiserror::Error)]
397 #[allow(missing_docs)]
398 pub enum Error<FindErr>
399 where
400 FindErr: std::error::Error + 'static,
401 {
402 #[error(transparent)]
403 FindExisting(FindErr),
404 #[error(transparent)]
405 NewEntry(#[from] entry::Error),
406 }
407
408 #[derive(Debug, Copy, Clone)]
412 pub enum ProgressId {
413 ResolveCounts,
415 SortEntries,
417 }
418
419 impl From<ProgressId> for git_features::progress::Id {
420 fn from(v: ProgressId) -> Self {
421 match v {
422 ProgressId::ResolveCounts => *b"ECRC",
423 ProgressId::SortEntries => *b"ECSE",
424 }
425 }
426 }
427}
428pub use types::{Error, Mode, Options, Outcome, ProgressId};