1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
use std::{ sync::atomic::{AtomicBool, Ordering}, time::Instant, }; use git_features::{parallel, progress::Progress}; use crate::{data, index::traverse}; fn add_decode_result(lhs: &mut data::decode_entry::Outcome, rhs: data::decode_entry::Outcome) { lhs.num_deltas += rhs.num_deltas; lhs.decompressed_size += rhs.decompressed_size; lhs.compressed_size += rhs.compressed_size; lhs.object_size += rhs.object_size; } fn div_decode_result(lhs: &mut data::decode_entry::Outcome, div: usize) { if div != 0 { lhs.num_deltas = (lhs.num_deltas as f32 / div as f32) as u32; lhs.decompressed_size /= div as u64; lhs.compressed_size /= div; lhs.object_size /= div as u64; } } pub struct Reducer<'a, P, E> { progress: &'a parking_lot::Mutex<P>, check: traverse::SafetyCheck, then: Instant, entries_seen: usize, stats: traverse::Outcome, should_interrupt: &'a AtomicBool, _error: std::marker::PhantomData<E>, } impl<'a, P, E> Reducer<'a, P, E> where P: Progress, { pub fn from_progress( progress: &'a parking_lot::Mutex<P>, pack_data_len_in_bytes: usize, check: traverse::SafetyCheck, should_interrupt: &'a AtomicBool, ) -> Self { let stats = traverse::Outcome { pack_size: pack_data_len_in_bytes as u64, ..Default::default() }; Reducer { progress, check, then: Instant::now(), entries_seen: 0, should_interrupt, stats, _error: Default::default(), } } } impl<'a, P, E> parallel::Reduce for Reducer<'a, P, E> where P: Progress, E: std::error::Error + Send + Sync + 'static, { type Input = Result<Vec<data::decode_entry::Outcome>, traverse::Error<E>>; type FeedProduce = (); type Output = traverse::Outcome; type Error = traverse::Error<E>; fn feed(&mut self, input: Self::Input) -> Result<(), Self::Error> { let chunk_stats: Vec<_> = match input { Err(err @ traverse::Error::PackDecode { .. }) if !self.check.fatal_decode_error() => { self.progress.lock().info(format!("Ignoring decode error: {}", err)); return Ok(()); } res => res, }?; self.entries_seen += chunk_stats.len(); let chunk_total = chunk_stats.into_iter().fold( data::decode_entry::Outcome::default_from_kind(git_object::Kind::Tree), |mut total, stats| { *self.stats.objects_per_chain_length.entry(stats.num_deltas).or_insert(0) += 1; self.stats.total_decompressed_entries_size += stats.decompressed_size; self.stats.total_compressed_entries_size += stats.compressed_size as u64; self.stats.total_object_size += stats.object_size as u64; use git_object::Kind::*; match stats.kind { Commit => self.stats.num_commits += 1, Tree => self.stats.num_trees += 1, Blob => self.stats.num_blobs += 1, Tag => self.stats.num_tags += 1, } add_decode_result(&mut total, stats); total }, ); add_decode_result(&mut self.stats.average, chunk_total); self.progress.lock().set(self.entries_seen); if self.should_interrupt.load(Ordering::SeqCst) { return Err(Self::Error::Interrupted); } Ok(()) } fn finalize(mut self) -> Result<Self::Output, Self::Error> { div_decode_result(&mut self.stats.average, self.entries_seen as usize); let elapsed_s = self.then.elapsed().as_secs_f32(); let objects_per_second = (self.entries_seen as f32 / elapsed_s) as u32; self.progress.lock().info(format!( "of {} objects done in {:.2}s ({} objects/s, ~{}/s)", self.entries_seen, elapsed_s, objects_per_second, bytesize::ByteSize(self.stats.average.object_size * objects_per_second as u64) )); Ok(self.stats) } }