1use std::{ffi::OsStr, io, path::Path, str::FromStr, time::Instant};
2
3use anyhow::anyhow;
4use gix::{
5 hash, hash::ObjectId, interrupt, objs::bstr::ByteVec, odb::pack, parallel::InOrderIter, prelude::Finalize,
6 progress, traverse, Count, NestedProgress, Progress,
7};
8
9use crate::OutputFormat;
10
11pub const PROGRESS_RANGE: std::ops::RangeInclusive<u8> = 1..=2;
12
13#[derive(Default, Eq, PartialEq, Debug, Clone)]
14pub enum ObjectExpansion {
15 #[default]
16 None,
17 TreeTraversal,
18 TreeDiff,
19}
20
21impl ObjectExpansion {
22 pub fn variants() -> &'static [&'static str] {
23 &["none", "tree-traversal", "tree-diff"]
24 }
25}
26
27impl FromStr for ObjectExpansion {
28 type Err = String;
29
30 fn from_str(s: &str) -> Result<Self, Self::Err> {
31 use ObjectExpansion::*;
32 let slc = s.to_ascii_lowercase();
33 Ok(match slc.as_str() {
34 "none" => None,
35 "tree-traversal" => TreeTraversal,
36 "tree-diff" => TreeDiff,
37 _ => return Err("invalid value".into()),
38 })
39 }
40}
41
42impl From<ObjectExpansion> for pack::data::output::count::objects::ObjectExpansion {
43 fn from(v: ObjectExpansion) -> Self {
44 use pack::data::output::count::objects::ObjectExpansion::*;
45 match v {
46 ObjectExpansion::None => AsIs,
47 ObjectExpansion::TreeTraversal => TreeContents,
48 ObjectExpansion::TreeDiff => TreeAdditionsComparedToAncestor,
49 }
50 }
51}
52
53pub struct Context<W> {
55 pub expansion: ObjectExpansion,
57 pub nondeterministic_thread_count: Option<usize>,
62 pub thin: bool,
65 pub thread_limit: Option<usize>,
69 pub statistics: Option<OutputFormat>,
71 pub pack_cache_size_in_bytes: usize,
77 pub object_cache_size_in_bytes: usize,
83 pub out: W,
85}
86
87pub fn create<W, P>(
88 repository_path: impl AsRef<Path>,
89 tips: impl IntoIterator<Item = impl AsRef<OsStr>>,
90 input: Option<impl io::BufRead + Send + 'static>,
91 output_directory: Option<impl AsRef<Path>>,
92 mut progress: P,
93 Context {
94 expansion,
95 nondeterministic_thread_count,
96 thin,
97 thread_limit,
98 statistics,
99 pack_cache_size_in_bytes,
100 object_cache_size_in_bytes,
101 mut out,
102 }: Context<W>,
103) -> anyhow::Result<()>
104where
105 W: std::io::Write,
106 P: NestedProgress,
107 P::SubProgress: 'static,
108{
109 type ObjectIdIter = dyn Iterator<Item = Result<ObjectId, Box<dyn std::error::Error + Send + Sync>>> + Send;
110
111 let repo = gix::discover(repository_path)?.into_sync();
112 progress.init(Some(2), progress::steps());
113 let tips = tips.into_iter();
114 let make_cancellation_err = || anyhow!("Cancelled by user");
115 let (mut handle, mut input): (_, Box<ObjectIdIter>) = match input {
116 None => {
117 let mut progress = progress.add_child("traversing");
118 progress.init(None, progress::count("commits"));
119 let tips = tips
120 .map({
121 let easy = repo.to_thread_local();
122 move |tip| {
123 ObjectId::from_hex(&Vec::from_os_str_lossy(tip.as_ref())).or_else(|_| {
124 easy.find_reference(tip.as_ref())
125 .map_err(anyhow::Error::from)
126 .and_then(|r| r.into_fully_peeled_id().map(gix::Id::detach).map_err(Into::into))
127 })
128 }
129 })
130 .collect::<Result<Vec<_>, _>>()?;
131 let handle = repo.objects.into_shared_arc().to_cache_arc();
132 let iter = Box::new(
133 traverse::commit::Simple::new(tips, handle.clone())
134 .map(|res| res.map_err(|err| Box::new(err) as Box<_>).map(|c| c.id))
135 .inspect(move |_| progress.inc()),
136 );
137 (handle, iter)
138 }
139 Some(input) => {
140 let mut progress = progress.add_child("iterating");
141 progress.init(None, progress::count("objects"));
142 let handle = repo.objects.into_shared_arc().to_cache_arc();
143 (
144 handle,
145 Box::new(
146 input
147 .lines()
148 .map(|hex_id| {
149 hex_id
150 .map_err(|err| Box::new(err) as Box<_>)
151 .and_then(|hex_id| ObjectId::from_hex(hex_id.as_bytes()).map_err(Into::into))
152 })
153 .inspect(move |_| progress.inc()),
154 ),
155 )
156 }
157 };
158
159 let mut stats = Statistics::default();
160 let chunk_size = 1000; let counts = {
162 let mut progress = progress.add_child("counting");
163 progress.init(None, progress::count("objects"));
164 let may_use_multiple_threads =
165 nondeterministic_thread_count.is_some() || matches!(expansion, ObjectExpansion::None);
166 let thread_limit = if may_use_multiple_threads {
167 nondeterministic_thread_count.or(thread_limit)
168 } else {
169 Some(1)
170 };
171 if nondeterministic_thread_count.is_some() && !may_use_multiple_threads {
172 progress.fail("Cannot use multi-threaded counting in tree-diff object expansion mode as it may yield way too many objects.".into());
173 }
174 let (_, _, thread_count) = gix::parallel::optimize_chunk_size_and_thread_limit(50, None, thread_limit, None);
175 let progress = progress::ThroughputOnDrop::new(progress);
176
177 {
178 let per_thread_object_pack_size = pack_cache_size_in_bytes / thread_count;
179 if per_thread_object_pack_size >= 10_000 {
180 handle.set_pack_cache(move || {
181 Box::new(pack::cache::lru::MemoryCappedHashmap::new(per_thread_object_pack_size))
182 });
183 }
184 if matches!(expansion, ObjectExpansion::TreeDiff) {
185 handle.set_object_cache(move || {
186 let per_thread_object_cache_size = object_cache_size_in_bytes / thread_count;
187 Box::new(pack::cache::object::MemoryCappedHashmap::new(
188 per_thread_object_cache_size,
189 ))
190 });
191 }
192 }
193 let input_object_expansion = expansion.into();
194 handle.prevent_pack_unload();
195 handle.ignore_replacements = true;
196 let (mut counts, count_stats) = if may_use_multiple_threads {
197 pack::data::output::count::objects(
198 handle.clone(),
199 input,
200 &progress,
201 &interrupt::IS_INTERRUPTED,
202 pack::data::output::count::objects::Options {
203 thread_limit,
204 chunk_size,
205 input_object_expansion,
206 },
207 )?
208 } else {
209 pack::data::output::count::objects_unthreaded(
210 &handle,
211 &mut input,
212 &progress,
213 &interrupt::IS_INTERRUPTED,
214 input_object_expansion,
215 )?
216 };
217 stats.counts = count_stats;
218 counts.shrink_to_fit();
219 counts
220 };
221
222 progress.inc();
223 let num_objects = counts.len();
224 let mut in_order_entries = {
225 let progress = progress.add_child("creating entries");
226 InOrderIter::from(pack::data::output::entry::iter_from_counts(
227 counts,
228 handle,
229 Box::new(progress),
230 pack::data::output::entry::iter_from_counts::Options {
231 thread_limit,
232 mode: pack::data::output::entry::iter_from_counts::Mode::PackCopyAndBaseObjects,
233 allow_thin_pack: thin,
234 chunk_size,
235 version: Default::default(),
236 },
237 ))
238 };
239
240 let mut entries_progress = progress.add_child("consuming");
241 entries_progress.init(Some(num_objects), progress::count("entries"));
242 let mut write_progress = progress.add_child("writing");
243 write_progress.init(None, progress::bytes());
244 let start = Instant::now();
245
246 let mut named_tempfile_store: Option<tempfile::NamedTempFile> = None;
247 let mut sink_store: std::io::Sink;
248 let (mut pack_file, output_directory): (&mut dyn std::io::Write, Option<_>) = match output_directory {
249 Some(dir) => {
250 named_tempfile_store = Some(tempfile::NamedTempFile::new_in(dir.as_ref())?);
251 (named_tempfile_store.as_mut().expect("packfile just set"), Some(dir))
252 }
253 None => {
254 sink_store = std::io::sink();
255 (&mut sink_store, None)
256 }
257 };
258 let mut interruptible_output_iter = interrupt::Iter::new(
259 pack::data::output::bytes::FromEntriesIter::new(
260 in_order_entries.by_ref().inspect(|e| {
261 if let Ok(entries) = e {
262 entries_progress.inc_by(entries.len());
263 }
264 }),
265 &mut pack_file,
266 num_objects as u32,
267 pack::data::Version::default(),
268 hash::Kind::default(),
269 ),
270 make_cancellation_err,
271 );
272 for io_res in interruptible_output_iter.by_ref() {
273 let written = io_res??;
274 write_progress.inc_by(written as usize);
275 }
276
277 let hash = interruptible_output_iter
278 .into_inner()
279 .digest()
280 .expect("iteration is done");
281 let pack_name = format!("{hash}.pack");
282 if let (Some(pack_file), Some(dir)) = (named_tempfile_store.take(), output_directory) {
283 pack_file.persist(dir.as_ref().join(pack_name))?;
284 } else {
285 writeln!(out, "{pack_name}")?;
286 }
287 stats.entries = in_order_entries.inner.finalize()?;
288
289 write_progress.show_throughput(start);
290 entries_progress.show_throughput(start);
291
292 if let Some(format) = statistics {
293 print(stats, format, out)?;
294 }
295 progress.inc();
296 Ok(())
297}
298
299fn print(stats: Statistics, format: OutputFormat, out: impl std::io::Write) -> anyhow::Result<()> {
300 match format {
301 OutputFormat::Human => human_output(stats, out).map_err(Into::into),
302 #[cfg(feature = "serde")]
303 OutputFormat::Json => serde_json::to_writer_pretty(out, &stats).map_err(Into::into),
304 }
305}
306
307fn human_output(
308 Statistics {
309 counts:
310 pack::data::output::count::objects::Outcome {
311 input_objects,
312 expanded_objects,
313 decoded_objects,
314 total_objects,
315 },
316 entries:
317 pack::data::output::entry::iter_from_counts::Outcome {
318 decoded_and_recompressed_objects,
319 missing_objects,
320 objects_copied_from_pack,
321 ref_delta_objects,
322 },
323 }: Statistics,
324 mut out: impl std::io::Write,
325) -> std::io::Result<()> {
326 let width = 30;
327 writeln!(out, "counting phase")?;
328 #[rustfmt::skip]
329 writeln!(
330 out,
331 "\t{:<width$} {}\n\t{:<width$} {}\n\t{:<width$} {}\n\t{:<width$} {}",
332 "input objects", input_objects,
333 "expanded objects", expanded_objects,
334 "decoded objects", decoded_objects,
335 "total objects", total_objects,
336 width = width
337 )?;
338 writeln!(out, "generation phase")?;
339 #[rustfmt::skip]
340 writeln!(
341 out,
342 "\t{:<width$} {}\n\t{:<width$} {}\n\t{:<width$} {}\n\t{:<width$} {}",
343 "decoded and recompressed", decoded_and_recompressed_objects,
344 "pack-to-pack copies", objects_copied_from_pack,
345 "ref-delta-objects", ref_delta_objects,
346 "missing objects", missing_objects,
347 width = width
348 )?;
349 Ok(())
350}
351
352#[derive(Default)]
353#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
354struct Statistics {
355 counts: pack::data::output::count::objects::Outcome,
356 entries: pack::data::output::entry::iter_from_counts::Outcome,
357}
358
359pub mod input_iteration {
360 use gix::{hash, traverse};
361 #[derive(Debug, thiserror::Error)]
362 pub enum Error {
363 #[error("input objects couldn't be iterated completely")]
364 Iteration(#[from] traverse::commit::simple::Error),
365 #[error("An error occurred while reading hashes from standard input")]
366 InputLinesIo(#[from] std::io::Error),
367 #[error("Could not decode hex hash provided on standard input")]
368 HashDecode(#[from] hash::decode::Error),
369 }
370}