use std::{cmp::Ordering, sync::Arc};
use git_features::{parallel, progress::Progress};
use crate::data::{output, output::ChunkId};
pub fn iter_from_counts<Find, Cache>(
mut counts: Vec<output::Count>,
db: Find,
make_cache: impl Fn() -> Cache + Send + Clone + Sync + 'static,
mut progress: impl Progress,
Options {
version,
mode,
allow_thin_pack,
thread_limit,
chunk_size,
}: Options,
) -> impl Iterator<Item = Result<(ChunkId, Vec<output::Entry>), Error<Find::Error>>>
+ parallel::reduce::Finalize<Reduce = reduce::Statistics<Error<Find::Error>>>
where
Find: crate::Find + Clone + Send + Sync + 'static,
<Find as crate::Find>::Error: Send,
Cache: crate::cache::DecodeEntry,
{
assert!(
matches!(version, crate::data::Version::V2),
"currently we can only write version 2"
);
let (chunk_size, thread_limit, _) =
parallel::optimize_chunk_size_and_thread_limit(chunk_size, Some(counts.len()), thread_limit, None);
let chunks = util::ChunkRanges::new(chunk_size, counts.len());
{
let progress = Arc::new(parking_lot::Mutex::new(progress.add_child("resolving")));
progress
.lock()
.init(Some(counts.len()), git_features::progress::count("counts"));
let enough_counts_present = counts.len() > 4_000;
let start = std::time::Instant::now();
parallel::in_parallel_if(
|| enough_counts_present,
chunks.clone(),
thread_limit,
|_n| Vec::<u8>::new(),
{
let progress = Arc::clone(&progress);
let counts = &counts;
let db = &db;
move |chunk_range, buf| {
let chunk = {
let c = &counts[chunk_range];
let mut_ptr = c.as_ptr() as *mut output::Count;
#[allow(unsafe_code)]
unsafe {
std::slice::from_raw_parts_mut(mut_ptr, c.len())
}
};
let chunk_size = chunk.len();
for count in chunk {
use crate::data::output::count::PackLocation::*;
match count.entry_pack_location {
LookedUp(_) => continue,
NotLookedUp => count.entry_pack_location = LookedUp(db.location_by_oid(count.id, buf)),
}
}
progress.lock().inc_by(chunk_size);
Ok::<_, ()>(())
}
},
parallel::reduce::IdentityWithResult::<(), ()>::default(),
)
.expect("infallible - we ignore none-existing objects");
progress.lock().show_throughput(start);
}
let counts_range_by_pack_id = match mode {
Mode::PackCopyAndBaseObjects => {
let mut progress = progress.add_child("sorting");
progress.init(Some(counts.len()), git_features::progress::count("counts"));
let start = std::time::Instant::now();
use crate::data::output::count::PackLocation::*;
counts.sort_by(|lhs, rhs| match (&lhs.entry_pack_location, &rhs.entry_pack_location) {
(LookedUp(None), LookedUp(None)) => Ordering::Equal,
(LookedUp(Some(_)), LookedUp(None)) => Ordering::Greater,
(LookedUp(None), LookedUp(Some(_))) => Ordering::Less,
(LookedUp(Some(lhs)), LookedUp(Some(rhs))) => lhs
.pack_id
.cmp(&rhs.pack_id)
.then(lhs.pack_offset.cmp(&rhs.pack_offset)),
(_, _) => unreachable!("counts were resolved beforehand"),
});
let mut index: Vec<(u32, std::ops::Range<usize>)> = Vec::new();
let mut chunks_pack_start = counts.partition_point(|e| e.entry_pack_location.is_none());
let mut slice = &counts[chunks_pack_start..];
while !slice.is_empty() {
let current_pack_id = slice[0].entry_pack_location.as_ref().expect("packed object").pack_id;
let pack_end = slice.partition_point(|e| {
e.entry_pack_location.as_ref().expect("packed object").pack_id == current_pack_id
});
index.push((current_pack_id, chunks_pack_start..chunks_pack_start + pack_end));
slice = &slice[pack_end..];
chunks_pack_start += pack_end;
}
progress.set(counts.len());
progress.show_throughput(start);
index
}
};
let counts = Arc::new(counts);
let progress = Arc::new(parking_lot::Mutex::new(progress));
parallel::reduce::Stepwise::new(
chunks.enumerate(),
thread_limit,
{
let progress = Arc::clone(&progress);
move |n| {
(
Vec::new(), make_cache(), progress.lock().add_child(format!("thread {}", n)),
)
}
},
{
let counts = Arc::clone(&counts);
move |(chunk_id, chunk_range): (ChunkId, std::ops::Range<usize>), (buf, cache, progress)| {
let mut out = Vec::new();
let chunk = &counts[chunk_range];
let mut stats = Outcome::default();
let mut pack_offsets_to_id = None;
progress.init(Some(chunk.len()), git_features::progress::count("objects"));
for count in chunk.iter() {
out.push(match count
.entry_pack_location
.as_ref()
.and_then(|l| db.entry_by_location(l).map(|pe| (l, pe)))
{
Some((location, pack_entry)) => {
if let Some((cached_pack_id, _)) = &pack_offsets_to_id {
if *cached_pack_id != location.pack_id {
pack_offsets_to_id = None;
}
}
let pack_range = counts_range_by_pack_id[counts_range_by_pack_id
.binary_search_by_key(&location.pack_id, |e| e.0)
.expect("pack-id always present")]
.1
.clone();
let base_index_offset = pack_range.start;
let counts_in_pack = &counts[pack_range];
match output::Entry::from_pack_entry(
pack_entry,
count,
counts_in_pack,
base_index_offset,
allow_thin_pack.then(|| {
|pack_id, base_offset| {
let (cached_pack_id, cache) = pack_offsets_to_id.get_or_insert_with(|| {
db.bundle_by_pack_id(pack_id)
.map(|b| {
let mut v = b
.index
.iter()
.map(|e| (e.pack_offset, e.oid))
.collect::<Vec<_>>();
v.sort_by_key(|e| e.0);
(pack_id, v)
})
.expect("pack used for counts is still available")
});
debug_assert_eq!(*cached_pack_id, pack_id);
cache
.binary_search_by_key(&base_offset, |e| e.0)
.ok()
.map(|idx| cache[idx].1)
}
}),
version,
) {
Some(entry) => {
stats.objects_copied_from_pack += 1;
entry
}
None => match db.try_find(count.id, buf, cache).map_err(Error::FindExisting)? {
Some(obj) => {
stats.decoded_and_recompressed_objects += 1;
output::Entry::from_data(count, &obj)
}
None => {
stats.missing_objects += 1;
Ok(output::Entry::invalid())
}
},
}
}
None => match db.try_find(count.id, buf, cache).map_err(Error::FindExisting)? {
Some(obj) => {
stats.decoded_and_recompressed_objects += 1;
output::Entry::from_data(count, &obj)
}
None => {
stats.missing_objects += 1;
Ok(output::Entry::invalid())
}
},
}?);
progress.inc();
}
Ok((chunk_id, out, stats))
}
},
reduce::Statistics::default(),
)
}
mod util {
#[derive(Clone)]
pub struct ChunkRanges {
cursor: usize,
size: usize,
len: usize,
}
impl ChunkRanges {
pub fn new(size: usize, total: usize) -> Self {
ChunkRanges {
cursor: 0,
size,
len: total,
}
}
}
impl Iterator for ChunkRanges {
type Item = std::ops::Range<usize>;
fn next(&mut self) -> Option<Self::Item> {
if self.cursor >= self.len {
None
} else {
let upper = (self.cursor + self.size).min(self.len);
let range = self.cursor..upper;
self.cursor = upper;
Some(range)
}
}
}
}
mod reduce {
use std::marker::PhantomData;
use git_features::parallel;
use super::{ChunkId, Outcome};
use crate::data::output;
pub struct Statistics<E> {
total: Outcome,
_err: PhantomData<E>,
}
impl<E> Default for Statistics<E> {
fn default() -> Self {
Statistics {
total: Default::default(),
_err: PhantomData::default(),
}
}
}
impl<Error> parallel::Reduce for Statistics<Error> {
type Input = Result<(ChunkId, Vec<output::Entry>, Outcome), Error>;
type FeedProduce = (ChunkId, Vec<output::Entry>);
type Output = Outcome;
type Error = Error;
fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error> {
item.map(|(cid, entries, stats)| {
self.total.aggregate(stats);
(cid, entries)
})
}
fn finalize(self) -> Result<Self::Output, Self::Error> {
Ok(self.total)
}
}
}
mod types {
use crate::data::output::entry;
#[derive(Default, PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub struct Outcome {
pub decoded_and_recompressed_objects: usize,
pub missing_objects: usize,
pub objects_copied_from_pack: usize,
}
impl Outcome {
pub(in crate::data::output::entry) fn aggregate(
&mut self,
Outcome {
decoded_and_recompressed_objects: decoded_objects,
missing_objects,
objects_copied_from_pack,
}: Self,
) {
self.decoded_and_recompressed_objects += decoded_objects;
self.missing_objects += missing_objects;
self.objects_copied_from_pack += objects_copied_from_pack;
}
}
#[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub enum Mode {
PackCopyAndBaseObjects,
}
#[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub struct Options {
pub thread_limit: Option<usize>,
pub mode: Mode,
pub allow_thin_pack: bool,
pub chunk_size: usize,
pub version: crate::data::Version,
}
impl Default for Options {
fn default() -> Self {
Options {
thread_limit: None,
mode: Mode::PackCopyAndBaseObjects,
allow_thin_pack: false,
chunk_size: 10,
version: Default::default(),
}
}
}
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum Error<FindErr>
where
FindErr: std::error::Error + 'static,
{
#[error(transparent)]
FindExisting(FindErr),
#[error(transparent)]
NewEntry(#[from] entry::Error),
}
}
pub use types::{Error, Mode, Options, Outcome};