1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use std::{
    sync::atomic::{AtomicBool, Ordering},
    time::Instant,
};

use git_features::{parallel, progress::Progress};

use crate::{data, index::traverse};

fn add_decode_result(lhs: &mut data::decode_entry::Outcome, rhs: data::decode_entry::Outcome) {
    lhs.num_deltas += rhs.num_deltas;
    lhs.decompressed_size += rhs.decompressed_size;
    lhs.compressed_size += rhs.compressed_size;
    lhs.object_size += rhs.object_size;
}

fn div_decode_result(lhs: &mut data::decode_entry::Outcome, div: usize) {
    if div != 0 {
        lhs.num_deltas = (lhs.num_deltas as f32 / div as f32) as u32;
        lhs.decompressed_size /= div as u64;
        lhs.compressed_size /= div;
        lhs.object_size /= div as u64;
    }
}

pub struct Reducer<'a, P, E> {
    progress: &'a parking_lot::Mutex<P>,
    check: traverse::SafetyCheck,
    then: Instant,
    entries_seen: usize,
    stats: traverse::Outcome,
    should_interrupt: &'a AtomicBool,
    _error: std::marker::PhantomData<E>,
}

impl<'a, P, E> Reducer<'a, P, E>
where
    P: Progress,
{
    pub fn from_progress(
        progress: &'a parking_lot::Mutex<P>,
        pack_data_len_in_bytes: usize,
        check: traverse::SafetyCheck,
        should_interrupt: &'a AtomicBool,
    ) -> Self {
        let stats = traverse::Outcome {
            pack_size: pack_data_len_in_bytes as u64,
            ..Default::default()
        };
        Reducer {
            progress,
            check,
            then: Instant::now(),
            entries_seen: 0,
            should_interrupt,
            stats,
            _error: Default::default(),
        }
    }
}

impl<'a, P, E> parallel::Reduce for Reducer<'a, P, E>
where
    P: Progress,
    E: std::error::Error + Send + Sync + 'static,
{
    type Input = Result<Vec<data::decode_entry::Outcome>, traverse::Error<E>>;
    type FeedProduce = ();
    type Output = traverse::Outcome;
    type Error = traverse::Error<E>;

    fn feed(&mut self, input: Self::Input) -> Result<(), Self::Error> {
        let chunk_stats: Vec<_> = match input {
            Err(err @ traverse::Error::PackDecode { .. }) if !self.check.fatal_decode_error() => {
                self.progress.lock().info(format!("Ignoring decode error: {}", err));
                return Ok(());
            }
            res => res,
        }?;
        self.entries_seen += chunk_stats.len();

        let chunk_total = chunk_stats.into_iter().fold(
            data::decode_entry::Outcome::default_from_kind(git_object::Kind::Tree),
            |mut total, stats| {
                *self.stats.objects_per_chain_length.entry(stats.num_deltas).or_insert(0) += 1;
                self.stats.total_decompressed_entries_size += stats.decompressed_size;
                self.stats.total_compressed_entries_size += stats.compressed_size as u64;
                self.stats.total_object_size += stats.object_size as u64;
                use git_object::Kind::*;
                match stats.kind {
                    Commit => self.stats.num_commits += 1,
                    Tree => self.stats.num_trees += 1,
                    Blob => self.stats.num_blobs += 1,
                    Tag => self.stats.num_tags += 1,
                }
                add_decode_result(&mut total, stats);
                total
            },
        );

        add_decode_result(&mut self.stats.average, chunk_total);
        self.progress.lock().set(self.entries_seen);

        if self.should_interrupt.load(Ordering::SeqCst) {
            return Err(Self::Error::Interrupted);
        }
        Ok(())
    }

    fn finalize(mut self) -> Result<Self::Output, Self::Error> {
        div_decode_result(&mut self.stats.average, self.entries_seen as usize);

        let elapsed_s = self.then.elapsed().as_secs_f32();
        let objects_per_second = (self.entries_seen as f32 / elapsed_s) as u32;

        self.progress.lock().info(format!(
            "of {} objects done in {:.2}s ({} objects/s, ~{}/s)",
            self.entries_seen,
            elapsed_s,
            objects_per_second,
            bytesize::ByteSize(self.stats.average.object_size * objects_per_second as u64)
        ));
        Ok(self.stats)
    }
}