gitoxide_core/pack/
create.rs

1use std::{ffi::OsStr, io, path::Path, str::FromStr, time::Instant};
2
3use anyhow::anyhow;
4use gix::{
5    hash, hash::ObjectId, interrupt, objs::bstr::ByteVec, odb::pack, parallel::InOrderIter, prelude::Finalize,
6    progress, traverse, Count, NestedProgress, Progress,
7};
8
9use crate::OutputFormat;
10
11pub const PROGRESS_RANGE: std::ops::RangeInclusive<u8> = 1..=2;
12
13#[derive(Default, Eq, PartialEq, Debug, Clone)]
14pub enum ObjectExpansion {
15    #[default]
16    None,
17    TreeTraversal,
18    TreeDiff,
19}
20
21impl ObjectExpansion {
22    pub fn variants() -> &'static [&'static str] {
23        &["none", "tree-traversal", "tree-diff"]
24    }
25}
26
27impl FromStr for ObjectExpansion {
28    type Err = String;
29
30    fn from_str(s: &str) -> Result<Self, Self::Err> {
31        use ObjectExpansion::*;
32        let slc = s.to_ascii_lowercase();
33        Ok(match slc.as_str() {
34            "none" => None,
35            "tree-traversal" => TreeTraversal,
36            "tree-diff" => TreeDiff,
37            _ => return Err("invalid value".into()),
38        })
39    }
40}
41
42impl From<ObjectExpansion> for pack::data::output::count::objects::ObjectExpansion {
43    fn from(v: ObjectExpansion) -> Self {
44        use pack::data::output::count::objects::ObjectExpansion::*;
45        match v {
46            ObjectExpansion::None => AsIs,
47            ObjectExpansion::TreeTraversal => TreeContents,
48            ObjectExpansion::TreeDiff => TreeAdditionsComparedToAncestor,
49        }
50    }
51}
52
53/// A general purpose context for many operations provided here
54pub struct Context<W> {
55    /// The way input objects should be handled
56    pub expansion: ObjectExpansion,
57    /// If `Some(threads)`, use this amount of `threads` to accelerate the counting phase at the cost of losing
58    /// determinism as the order of objects during expansion changes with multiple threads unless no expansion is performed.
59    /// In the latter case, this flag has no effect.
60    /// If `None`, counting will only use one thread and thus yield the same sequence of objects in any case.
61    pub nondeterministic_thread_count: Option<usize>,
62    /// If true, delta objects may refer to their base as reference, allowing it not to be included in the created back.
63    /// Otherwise these have to be recompressed in order to make the pack self-contained.
64    pub thin: bool,
65    /// If set, don't use more than this amount of threads.
66    /// Otherwise, usually use as many threads as there are logical cores.
67    /// A value of 0 is interpreted as no-limit
68    pub thread_limit: Option<usize>,
69    /// If set, statistics about the operation will be written to the output stream.
70    pub statistics: Option<OutputFormat>,
71    /// The size of the cache storing fully decoded delta objects. This can greatly speed up pack decoding by reducing the length of delta
72    /// chains. Note that caches also incur a cost and poorly used caches may reduce overall performance.
73    /// This is a total, shared among all threads if `thread_limit` permits.
74    ///
75    /// If 0, the cache is disabled entirely.
76    pub pack_cache_size_in_bytes: usize,
77    /// The size of the cache to store full objects by their ID, bypassing any lookup in the object database.
78    /// Note that caches also incur a cost and poorly used caches may reduce overall performance.
79    ///
80    /// This is a total, shared among all threads if `thread_limit` permits.
81    /// Only used when known to be effective, namely when `expansion == ObjectExpansion::TreeDiff`.
82    pub object_cache_size_in_bytes: usize,
83    /// The output stream for use of additional information
84    pub out: W,
85}
86
87pub fn create<W, P>(
88    repository_path: impl AsRef<Path>,
89    tips: impl IntoIterator<Item = impl AsRef<OsStr>>,
90    input: Option<impl io::BufRead + Send + 'static>,
91    output_directory: Option<impl AsRef<Path>>,
92    mut progress: P,
93    Context {
94        expansion,
95        nondeterministic_thread_count,
96        thin,
97        thread_limit,
98        statistics,
99        pack_cache_size_in_bytes,
100        object_cache_size_in_bytes,
101        mut out,
102    }: Context<W>,
103) -> anyhow::Result<()>
104where
105    W: std::io::Write,
106    P: NestedProgress,
107    P::SubProgress: 'static,
108{
109    type ObjectIdIter = dyn Iterator<Item = Result<ObjectId, Box<dyn std::error::Error + Send + Sync>>> + Send;
110
111    let repo = gix::discover(repository_path)?.into_sync();
112    progress.init(Some(2), progress::steps());
113    let tips = tips.into_iter();
114    let make_cancellation_err = || anyhow!("Cancelled by user");
115    let (mut handle, mut input): (_, Box<ObjectIdIter>) = match input {
116        None => {
117            let mut progress = progress.add_child("traversing");
118            progress.init(None, progress::count("commits"));
119            let tips = tips
120                .map({
121                    let easy = repo.to_thread_local();
122                    move |tip| {
123                        ObjectId::from_hex(&Vec::from_os_str_lossy(tip.as_ref())).or_else(|_| {
124                            easy.find_reference(tip.as_ref())
125                                .map_err(anyhow::Error::from)
126                                .and_then(|r| r.into_fully_peeled_id().map(gix::Id::detach).map_err(Into::into))
127                        })
128                    }
129                })
130                .collect::<Result<Vec<_>, _>>()?;
131            let handle = repo.objects.into_shared_arc().to_cache_arc();
132            let iter = Box::new(
133                traverse::commit::Simple::new(tips, handle.clone())
134                    .map(|res| res.map_err(|err| Box::new(err) as Box<_>).map(|c| c.id))
135                    .inspect(move |_| progress.inc()),
136            );
137            (handle, iter)
138        }
139        Some(input) => {
140            let mut progress = progress.add_child("iterating");
141            progress.init(None, progress::count("objects"));
142            let handle = repo.objects.into_shared_arc().to_cache_arc();
143            (
144                handle,
145                Box::new(
146                    input
147                        .lines()
148                        .map(|hex_id| {
149                            hex_id
150                                .map_err(|err| Box::new(err) as Box<_>)
151                                .and_then(|hex_id| ObjectId::from_hex(hex_id.as_bytes()).map_err(Into::into))
152                        })
153                        .inspect(move |_| progress.inc()),
154                ),
155            )
156        }
157    };
158
159    let mut stats = Statistics::default();
160    let chunk_size = 1000; // What's a good value for this?
161    let counts = {
162        let mut progress = progress.add_child("counting");
163        progress.init(None, progress::count("objects"));
164        let may_use_multiple_threads =
165            nondeterministic_thread_count.is_some() || matches!(expansion, ObjectExpansion::None);
166        let thread_limit = if may_use_multiple_threads {
167            nondeterministic_thread_count.or(thread_limit)
168        } else {
169            Some(1)
170        };
171        if nondeterministic_thread_count.is_some() && !may_use_multiple_threads {
172            progress.fail("Cannot use multi-threaded counting in tree-diff object expansion mode as it may yield way too many objects.".into());
173        }
174        let (_, _, thread_count) = gix::parallel::optimize_chunk_size_and_thread_limit(50, None, thread_limit, None);
175        let progress = progress::ThroughputOnDrop::new(progress);
176
177        {
178            let per_thread_object_pack_size = pack_cache_size_in_bytes / thread_count;
179            if per_thread_object_pack_size >= 10_000 {
180                handle.set_pack_cache(move || {
181                    Box::new(pack::cache::lru::MemoryCappedHashmap::new(per_thread_object_pack_size))
182                });
183            }
184            if matches!(expansion, ObjectExpansion::TreeDiff) {
185                handle.set_object_cache(move || {
186                    let per_thread_object_cache_size = object_cache_size_in_bytes / thread_count;
187                    Box::new(pack::cache::object::MemoryCappedHashmap::new(
188                        per_thread_object_cache_size,
189                    ))
190                });
191            }
192        }
193        let input_object_expansion = expansion.into();
194        handle.prevent_pack_unload();
195        handle.ignore_replacements = true;
196        let (mut counts, count_stats) = if may_use_multiple_threads {
197            pack::data::output::count::objects(
198                handle.clone(),
199                input,
200                &progress,
201                &interrupt::IS_INTERRUPTED,
202                pack::data::output::count::objects::Options {
203                    thread_limit,
204                    chunk_size,
205                    input_object_expansion,
206                },
207            )?
208        } else {
209            pack::data::output::count::objects_unthreaded(
210                &handle,
211                &mut input,
212                &progress,
213                &interrupt::IS_INTERRUPTED,
214                input_object_expansion,
215            )?
216        };
217        stats.counts = count_stats;
218        counts.shrink_to_fit();
219        counts
220    };
221
222    progress.inc();
223    let num_objects = counts.len();
224    let mut in_order_entries = {
225        let progress = progress.add_child("creating entries");
226        InOrderIter::from(pack::data::output::entry::iter_from_counts(
227            counts,
228            handle,
229            Box::new(progress),
230            pack::data::output::entry::iter_from_counts::Options {
231                thread_limit,
232                mode: pack::data::output::entry::iter_from_counts::Mode::PackCopyAndBaseObjects,
233                allow_thin_pack: thin,
234                chunk_size,
235                version: Default::default(),
236            },
237        ))
238    };
239
240    let mut entries_progress = progress.add_child("consuming");
241    entries_progress.init(Some(num_objects), progress::count("entries"));
242    let mut write_progress = progress.add_child("writing");
243    write_progress.init(None, progress::bytes());
244    let start = Instant::now();
245
246    let mut named_tempfile_store: Option<tempfile::NamedTempFile> = None;
247    let mut sink_store: std::io::Sink;
248    let (mut pack_file, output_directory): (&mut dyn std::io::Write, Option<_>) = match output_directory {
249        Some(dir) => {
250            named_tempfile_store = Some(tempfile::NamedTempFile::new_in(dir.as_ref())?);
251            (named_tempfile_store.as_mut().expect("packfile just set"), Some(dir))
252        }
253        None => {
254            sink_store = std::io::sink();
255            (&mut sink_store, None)
256        }
257    };
258    let mut interruptible_output_iter = interrupt::Iter::new(
259        pack::data::output::bytes::FromEntriesIter::new(
260            in_order_entries.by_ref().inspect(|e| {
261                if let Ok(entries) = e {
262                    entries_progress.inc_by(entries.len());
263                }
264            }),
265            &mut pack_file,
266            num_objects as u32,
267            pack::data::Version::default(),
268            hash::Kind::default(),
269        ),
270        make_cancellation_err,
271    );
272    for io_res in interruptible_output_iter.by_ref() {
273        let written = io_res??;
274        write_progress.inc_by(written as usize);
275    }
276
277    let hash = interruptible_output_iter
278        .into_inner()
279        .digest()
280        .expect("iteration is done");
281    let pack_name = format!("{hash}.pack");
282    if let (Some(pack_file), Some(dir)) = (named_tempfile_store.take(), output_directory) {
283        pack_file.persist(dir.as_ref().join(pack_name))?;
284    } else {
285        writeln!(out, "{pack_name}")?;
286    }
287    stats.entries = in_order_entries.inner.finalize()?;
288
289    write_progress.show_throughput(start);
290    entries_progress.show_throughput(start);
291
292    if let Some(format) = statistics {
293        print(stats, format, out)?;
294    }
295    progress.inc();
296    Ok(())
297}
298
299fn print(stats: Statistics, format: OutputFormat, out: impl std::io::Write) -> anyhow::Result<()> {
300    match format {
301        OutputFormat::Human => human_output(stats, out).map_err(Into::into),
302        #[cfg(feature = "serde")]
303        OutputFormat::Json => serde_json::to_writer_pretty(out, &stats).map_err(Into::into),
304    }
305}
306
307fn human_output(
308    Statistics {
309        counts:
310            pack::data::output::count::objects::Outcome {
311                input_objects,
312                expanded_objects,
313                decoded_objects,
314                total_objects,
315            },
316        entries:
317            pack::data::output::entry::iter_from_counts::Outcome {
318                decoded_and_recompressed_objects,
319                missing_objects,
320                objects_copied_from_pack,
321                ref_delta_objects,
322            },
323    }: Statistics,
324    mut out: impl std::io::Write,
325) -> std::io::Result<()> {
326    let width = 30;
327    writeln!(out, "counting phase")?;
328    #[rustfmt::skip]
329    writeln!(
330        out,
331        "\t{:<width$} {}\n\t{:<width$} {}\n\t{:<width$} {}\n\t{:<width$} {}",
332        "input objects", input_objects,
333        "expanded objects", expanded_objects,
334        "decoded objects", decoded_objects,
335        "total objects", total_objects,
336        width = width
337    )?;
338    writeln!(out, "generation phase")?;
339    #[rustfmt::skip]
340    writeln!(
341        out,
342        "\t{:<width$} {}\n\t{:<width$} {}\n\t{:<width$} {}\n\t{:<width$} {}",
343        "decoded and recompressed", decoded_and_recompressed_objects,
344        "pack-to-pack copies", objects_copied_from_pack,
345        "ref-delta-objects", ref_delta_objects,
346        "missing objects", missing_objects,
347        width = width
348    )?;
349    Ok(())
350}
351
352#[derive(Default)]
353#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
354struct Statistics {
355    counts: pack::data::output::count::objects::Outcome,
356    entries: pack::data::output::entry::iter_from_counts::Outcome,
357}
358
359pub mod input_iteration {
360    use gix::{hash, traverse};
361    #[derive(Debug, thiserror::Error)]
362    pub enum Error {
363        #[error("input objects couldn't be iterated completely")]
364        Iteration(#[from] traverse::commit::simple::Error),
365        #[error("An error occurred while reading hashes from standard input")]
366        InputLinesIo(#[from] std::io::Error),
367        #[error("Could not decode hex hash provided on standard input")]
368        HashDecode(#[from] hash::decode::Error),
369    }
370}