git_pack/data/output/entry/
iter_from_counts.rs

1pub(crate) mod function {
2    use std::{cmp::Ordering, sync::Arc};
3
4    use git_features::{parallel, parallel::SequenceId, progress::Progress};
5
6    use super::{reduce, util, Error, Mode, Options, Outcome, ProgressId};
7    use crate::data::output;
8
9    /// Given a known list of object `counts`, calculate entries ready to be put into a data pack.
10    ///
11    /// This allows objects to be written quite soon without having to wait for the entire pack to be built in memory.
12    /// A chunk of objects is held in memory and compressed using DEFLATE, and serve the output of this iterator.
13    /// That way slow writers will naturally apply back pressure, and communicate to the implementation that more time can be
14    /// spent compressing objects.
15    ///
16    /// * `counts`
17    ///   * A list of previously counted objects to add to the pack. Duplication checks are not performed, no object is expected to be duplicated.
18    /// * `progress`
19    ///   * a way to obtain progress information
20    /// * `options`
21    ///   * more configuration
22    ///
23    /// _Returns_ the checksum of the pack
24    ///
25    /// ## Discussion
26    ///
27    /// ### Advantages
28    ///
29    /// * Begins writing immediately and supports back-pressure.
30    /// * Abstract over object databases and how input is provided.
31    ///
32    /// ### Disadvantages
33    ///
34    /// * ~~currently there is no way to easily write the pack index, even though the state here is uniquely positioned to do
35    ///   so with minimal overhead (especially compared to `gix index-from-pack`)~~ Probably works now by chaining Iterators
36    ///  or keeping enough state to write a pack and then generate an index with recorded data.
37    ///
38    pub fn iter_from_counts<Find>(
39        mut counts: Vec<output::Count>,
40        db: Find,
41        mut progress: impl Progress + 'static,
42        Options {
43            version,
44            mode,
45            allow_thin_pack,
46            thread_limit,
47            chunk_size,
48        }: Options,
49    ) -> impl Iterator<Item = Result<(SequenceId, Vec<output::Entry>), Error<Find::Error>>>
50           + parallel::reduce::Finalize<Reduce = reduce::Statistics<Error<Find::Error>>>
51    where
52        Find: crate::Find + Send + Clone + 'static,
53        <Find as crate::Find>::Error: Send,
54    {
55        assert!(
56            matches!(version, crate::data::Version::V2),
57            "currently we can only write version 2"
58        );
59        let (chunk_size, thread_limit, _) =
60            parallel::optimize_chunk_size_and_thread_limit(chunk_size, Some(counts.len()), thread_limit, None);
61        {
62            let progress = Arc::new(parking_lot::Mutex::new(
63                progress.add_child_with_id("resolving", ProgressId::ResolveCounts.into()),
64            ));
65            progress.lock().init(None, git_features::progress::count("counts"));
66            let enough_counts_present = counts.len() > 4_000;
67            let start = std::time::Instant::now();
68            parallel::in_parallel_if(
69                || enough_counts_present,
70                counts.chunks_mut(chunk_size),
71                thread_limit,
72                |_n| Vec::<u8>::new(),
73                {
74                    let progress = Arc::clone(&progress);
75                    let db = db.clone();
76                    move |chunk, buf| {
77                        let chunk_size = chunk.len();
78                        for count in chunk {
79                            use crate::data::output::count::PackLocation::*;
80                            match count.entry_pack_location {
81                                LookedUp(_) => continue,
82                                NotLookedUp => count.entry_pack_location = LookedUp(db.location_by_oid(count.id, buf)),
83                            }
84                        }
85                        progress.lock().inc_by(chunk_size);
86                        Ok::<_, ()>(())
87                    }
88                },
89                parallel::reduce::IdentityWithResult::<(), ()>::default(),
90            )
91            .expect("infallible - we ignore none-existing objects");
92            progress.lock().show_throughput(start);
93        }
94        let counts_range_by_pack_id = match mode {
95            Mode::PackCopyAndBaseObjects => {
96                let mut progress = progress.add_child_with_id("sorting", ProgressId::SortEntries.into());
97                progress.init(Some(counts.len()), git_features::progress::count("counts"));
98                let start = std::time::Instant::now();
99
100                use crate::data::output::count::PackLocation::*;
101                counts.sort_by(|lhs, rhs| match (&lhs.entry_pack_location, &rhs.entry_pack_location) {
102                    (LookedUp(None), LookedUp(None)) => Ordering::Equal,
103                    (LookedUp(Some(_)), LookedUp(None)) => Ordering::Greater,
104                    (LookedUp(None), LookedUp(Some(_))) => Ordering::Less,
105                    (LookedUp(Some(lhs)), LookedUp(Some(rhs))) => lhs
106                        .pack_id
107                        .cmp(&rhs.pack_id)
108                        .then(lhs.pack_offset.cmp(&rhs.pack_offset)),
109                    (_, _) => unreachable!("counts were resolved beforehand"),
110                });
111
112                let mut index: Vec<(u32, std::ops::Range<usize>)> = Vec::new();
113                let mut chunks_pack_start = counts.partition_point(|e| e.entry_pack_location.is_none());
114                let mut slice = &counts[chunks_pack_start..];
115                while !slice.is_empty() {
116                    let current_pack_id = slice[0].entry_pack_location.as_ref().expect("packed object").pack_id;
117                    let pack_end = slice.partition_point(|e| {
118                        e.entry_pack_location.as_ref().expect("packed object").pack_id == current_pack_id
119                    });
120                    index.push((current_pack_id, chunks_pack_start..chunks_pack_start + pack_end));
121                    slice = &slice[pack_end..];
122                    chunks_pack_start += pack_end;
123                }
124
125                progress.set(counts.len());
126                progress.show_throughput(start);
127
128                index
129            }
130        };
131
132        let counts = Arc::new(counts);
133        let progress = Arc::new(parking_lot::Mutex::new(progress));
134        let chunks = util::ChunkRanges::new(chunk_size, counts.len());
135
136        parallel::reduce::Stepwise::new(
137            chunks.enumerate(),
138            thread_limit,
139            {
140                let progress = Arc::clone(&progress);
141                move |n| {
142                    (
143                        Vec::new(), // object data buffer
144                        progress
145                            .lock()
146                            .add_child_with_id(format!("thread {n}"), git_features::progress::UNKNOWN),
147                    )
148                }
149            },
150            {
151                let counts = Arc::clone(&counts);
152                move |(chunk_id, chunk_range): (SequenceId, std::ops::Range<usize>), (buf, progress)| {
153                    let mut out = Vec::new();
154                    let chunk = &counts[chunk_range];
155                    let mut stats = Outcome::default();
156                    let mut pack_offsets_to_id = None;
157                    progress.init(Some(chunk.len()), git_features::progress::count("objects"));
158
159                    for count in chunk.iter() {
160                        out.push(match count
161                            .entry_pack_location
162                            .as_ref()
163                            .and_then(|l| db.entry_by_location(l).map(|pe| (l, pe)))
164                        {
165                            Some((location, pack_entry)) => {
166                                if let Some((cached_pack_id, _)) = &pack_offsets_to_id {
167                                    if *cached_pack_id != location.pack_id {
168                                        pack_offsets_to_id = None;
169                                    }
170                                }
171                                let pack_range = counts_range_by_pack_id[counts_range_by_pack_id
172                                    .binary_search_by_key(&location.pack_id, |e| e.0)
173                                    .expect("pack-id always present")]
174                                .1
175                                .clone();
176                                let base_index_offset = pack_range.start;
177                                let counts_in_pack = &counts[pack_range];
178                                match output::Entry::from_pack_entry(
179                                    pack_entry,
180                                    count,
181                                    counts_in_pack,
182                                    base_index_offset,
183                                    allow_thin_pack.then_some({
184                                        |pack_id, base_offset| {
185                                            let (cached_pack_id, cache) = pack_offsets_to_id.get_or_insert_with(|| {
186                                                db.pack_offsets_and_oid(pack_id)
187                                                    .map(|mut v| {
188                                                        v.sort_by_key(|e| e.0);
189                                                        (pack_id, v)
190                                                    })
191                                                    .expect("pack used for counts is still available")
192                                            });
193                                            debug_assert_eq!(*cached_pack_id, pack_id);
194                                            stats.ref_delta_objects += 1;
195                                            cache
196                                                .binary_search_by_key(&base_offset, |e| e.0)
197                                                .ok()
198                                                .map(|idx| cache[idx].1)
199                                        }
200                                    }),
201                                    version,
202                                ) {
203                                    Some(entry) => {
204                                        stats.objects_copied_from_pack += 1;
205                                        entry
206                                    }
207                                    None => match db.try_find(count.id, buf).map_err(Error::FindExisting)? {
208                                        Some((obj, _location)) => {
209                                            stats.decoded_and_recompressed_objects += 1;
210                                            output::Entry::from_data(count, &obj)
211                                        }
212                                        None => {
213                                            stats.missing_objects += 1;
214                                            Ok(output::Entry::invalid())
215                                        }
216                                    },
217                                }
218                            }
219                            None => match db.try_find(count.id, buf).map_err(Error::FindExisting)? {
220                                Some((obj, _location)) => {
221                                    stats.decoded_and_recompressed_objects += 1;
222                                    output::Entry::from_data(count, &obj)
223                                }
224                                None => {
225                                    stats.missing_objects += 1;
226                                    Ok(output::Entry::invalid())
227                                }
228                            },
229                        }?);
230                        progress.inc();
231                    }
232                    Ok((chunk_id, out, stats))
233                }
234            },
235            reduce::Statistics::default(),
236        )
237    }
238}
239
240mod util {
241    #[derive(Clone)]
242    pub struct ChunkRanges {
243        cursor: usize,
244        size: usize,
245        len: usize,
246    }
247
248    impl ChunkRanges {
249        pub fn new(size: usize, total: usize) -> Self {
250            ChunkRanges {
251                cursor: 0,
252                size,
253                len: total,
254            }
255        }
256    }
257
258    impl Iterator for ChunkRanges {
259        type Item = std::ops::Range<usize>;
260
261        fn next(&mut self) -> Option<Self::Item> {
262            if self.cursor >= self.len {
263                None
264            } else {
265                let upper = (self.cursor + self.size).min(self.len);
266                let range = self.cursor..upper;
267                self.cursor = upper;
268                Some(range)
269            }
270        }
271    }
272}
273
274mod reduce {
275    use std::marker::PhantomData;
276
277    use git_features::{parallel, parallel::SequenceId};
278
279    use super::Outcome;
280    use crate::data::output;
281
282    pub struct Statistics<E> {
283        total: Outcome,
284        _err: PhantomData<E>,
285    }
286
287    impl<E> Default for Statistics<E> {
288        fn default() -> Self {
289            Statistics {
290                total: Default::default(),
291                _err: PhantomData::default(),
292            }
293        }
294    }
295
296    impl<Error> parallel::Reduce for Statistics<Error> {
297        type Input = Result<(SequenceId, Vec<output::Entry>, Outcome), Error>;
298        type FeedProduce = (SequenceId, Vec<output::Entry>);
299        type Output = Outcome;
300        type Error = Error;
301
302        fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error> {
303            item.map(|(cid, entries, stats)| {
304                self.total.aggregate(stats);
305                (cid, entries)
306            })
307        }
308
309        fn finalize(self) -> Result<Self::Output, Self::Error> {
310            Ok(self.total)
311        }
312    }
313}
314
315mod types {
316    use crate::data::output::entry;
317
318    /// Information gathered during the run of [`iter_from_counts()`][crate::data::output::entry::iter_from_counts()].
319    #[derive(Default, PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)]
320    #[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
321    pub struct Outcome {
322        /// The amount of fully decoded objects. These are the most expensive as they are fully decoded.
323        pub decoded_and_recompressed_objects: usize,
324        /// The amount of objects that could not be located despite them being mentioned during iteration
325        pub missing_objects: usize,
326        /// The amount of base or delta objects that could be copied directly from the pack. These are cheapest as they
327        /// only cost a memory copy for the most part.
328        pub objects_copied_from_pack: usize,
329        /// The amount of objects that ref to their base as ref-delta, an indication for a thin back being created.
330        pub ref_delta_objects: usize,
331    }
332
333    impl Outcome {
334        pub(in crate::data::output::entry) fn aggregate(
335            &mut self,
336            Outcome {
337                decoded_and_recompressed_objects: decoded_objects,
338                missing_objects,
339                objects_copied_from_pack,
340                ref_delta_objects,
341            }: Self,
342        ) {
343            self.decoded_and_recompressed_objects += decoded_objects;
344            self.missing_objects += missing_objects;
345            self.objects_copied_from_pack += objects_copied_from_pack;
346            self.ref_delta_objects += ref_delta_objects;
347        }
348    }
349
350    /// The way the iterator operates.
351    #[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)]
352    #[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
353    pub enum Mode {
354        /// Copy base objects and deltas from packs, while non-packed objects will be treated as base objects
355        /// (i.e. without trying to delta compress them). This is a fast way of obtaining a back while benefiting
356        /// from existing pack compression and spending the smallest possible time on compressing unpacked objects at
357        /// the cost of bandwidth.
358        PackCopyAndBaseObjects,
359    }
360
361    /// Configuration options for the pack generation functions provided in [`iter_from_counts()`][crate::data::output::entry::iter_from_counts()].
362    #[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)]
363    #[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
364    pub struct Options {
365        /// The amount of threads to use at most when resolving the pack. If `None`, all logical cores are used.
366        pub thread_limit: Option<usize>,
367        /// The algorithm to produce a pack
368        pub mode: Mode,
369        /// If set, the resulting back can have deltas that refer to an object which is not in the pack. This can happen
370        /// if the initial counted objects do not contain an object that an existing packed delta refers to, for example, because
371        /// it wasn't part of the iteration, for instance when the iteration was performed on tree deltas or only a part of the
372        /// commit graph. Please note that thin packs are not valid packs at rest, thus they are only valid for packs in transit.
373        ///
374        /// If set to false, delta objects will be decompressed and recompressed as base objects.
375        pub allow_thin_pack: bool,
376        /// The amount of objects per chunk or unit of work to be sent to threads for processing
377        /// TODO: could this become the window size?
378        pub chunk_size: usize,
379        /// The pack data version to produce for each entry
380        pub version: crate::data::Version,
381    }
382
383    impl Default for Options {
384        fn default() -> Self {
385            Options {
386                thread_limit: None,
387                mode: Mode::PackCopyAndBaseObjects,
388                allow_thin_pack: false,
389                chunk_size: 10,
390                version: Default::default(),
391            }
392        }
393    }
394
395    /// The error returned by the pack generation function [`iter_from_counts()`][crate::data::output::entry::iter_from_counts()].
396    #[derive(Debug, thiserror::Error)]
397    #[allow(missing_docs)]
398    pub enum Error<FindErr>
399    where
400        FindErr: std::error::Error + 'static,
401    {
402        #[error(transparent)]
403        FindExisting(FindErr),
404        #[error(transparent)]
405        NewEntry(#[from] entry::Error),
406    }
407
408    /// The progress ids used in [`write_to_directory()`][crate::Bundle::write_to_directory()].
409    ///
410    /// Use this information to selectively extract the progress of interest in case the parent application has custom visualization.
411    #[derive(Debug, Copy, Clone)]
412    pub enum ProgressId {
413        /// The amount of [`Count`][crate::data::output::Count] objects which are resolved to their pack location.
414        ResolveCounts,
415        /// Layout pack entries for placement into a pack (by pack-id and by offset).
416        SortEntries,
417    }
418
419    impl From<ProgressId> for git_features::progress::Id {
420        fn from(v: ProgressId) -> Self {
421            match v {
422                ProgressId::ResolveCounts => *b"ECRC",
423                ProgressId::SortEntries => *b"ECSE",
424            }
425        }
426    }
427}
428pub use types::{Error, Mode, Options, Outcome, ProgressId};