gitoxide_core/repository/
odb.rs

1use std::io;
2use std::sync::atomic::Ordering;
3
4use anyhow::bail;
5
6use crate::OutputFormat;
7
8#[cfg_attr(not(feature = "serde"), allow(unused_variables))]
9pub fn info(
10    repo: gix::Repository,
11    format: OutputFormat,
12    out: impl io::Write,
13    mut err: impl io::Write,
14) -> anyhow::Result<()> {
15    if format == OutputFormat::Human {
16        writeln!(err, "Only JSON is implemented - using that instead")?;
17    }
18
19    #[cfg_attr(feature = "serde", derive(serde::Serialize))]
20    pub struct Statistics {
21        pub path: std::path::PathBuf,
22        pub object_hash: String,
23        pub use_multi_pack_index: bool,
24        pub structure: Vec<gix::odb::store::structure::Record>,
25        pub metrics: gix::odb::store::Metrics,
26    }
27
28    let store = repo.objects.store_ref();
29    let stats = Statistics {
30        path: store.path().into(),
31        object_hash: store.object_hash().to_string(),
32        use_multi_pack_index: store.use_multi_pack_index(),
33        structure: store.structure()?,
34        metrics: store.metrics(),
35    };
36
37    #[cfg(feature = "serde")]
38    {
39        serde_json::to_writer_pretty(out, &stats)?;
40    }
41
42    Ok(())
43}
44
45pub mod statistics {
46    use crate::OutputFormat;
47
48    pub const PROGRESS_RANGE: std::ops::RangeInclusive<u8> = 0..=3;
49
50    #[derive(Debug, Copy, Clone)]
51    pub struct Options {
52        pub format: OutputFormat,
53        pub thread_limit: Option<usize>,
54        /// A debug-flag that triggers looking up the headers of all objects again, but without indices preloaded
55        pub extra_header_lookup: bool,
56    }
57}
58
59#[cfg_attr(not(feature = "serde"), allow(unused_variables))]
60pub fn statistics(
61    repo: gix::Repository,
62    mut progress: impl gix::Progress,
63    out: impl io::Write,
64    mut err: impl io::Write,
65    statistics::Options {
66        format,
67        thread_limit,
68        extra_header_lookup,
69    }: statistics::Options,
70) -> anyhow::Result<()> {
71    use bytesize::ByteSize;
72    use gix::odb::{find, HeaderExt};
73
74    if format == OutputFormat::Human {
75        writeln!(err, "Only JSON is implemented - using that instead")?;
76    }
77
78    progress.init(None, gix::progress::count("objects"));
79    progress.set_name("counting".into());
80    let counter = progress.counter();
81    let start = std::time::Instant::now();
82
83    #[cfg_attr(feature = "serde", derive(serde::Serialize))]
84    #[derive(Default)]
85    struct Statistics {
86        /// All objects that were used to produce these statistics.
87        /// Only `Some` if we are doing an extra round of header queries on a repository without loaded indices.
88        #[cfg_attr(feature = "serde", serde(skip_serializing))]
89        ids: Option<Vec<gix::ObjectId>>,
90        total_objects: usize,
91        loose_objects: usize,
92        packed_objects: usize,
93        packed_delta_objects: usize,
94        total_delta_chain_length: u64,
95        trees: usize,
96        trees_size: ByteSize,
97        tags: usize,
98        tags_size: ByteSize,
99        commits: usize,
100        commits_size: ByteSize,
101        blobs: usize,
102        blobs_size: ByteSize,
103    }
104
105    impl Statistics {
106        fn count(&mut self, kind: gix::object::Kind, size: u64) {
107            use gix::object::Kind::*;
108            match kind {
109                Commit => {
110                    self.commits += 1;
111                    self.commits_size += size;
112                }
113                Tree => {
114                    self.trees += 1;
115                    self.trees_size += size;
116                }
117                Tag => {
118                    self.tags += 1;
119                    self.tags_size += size;
120                }
121                Blob => {
122                    self.blobs += 1;
123                    self.blobs_size += size;
124                }
125            }
126        }
127        fn consume(&mut self, item: gix::odb::find::Header) {
128            match item {
129                find::Header::Loose { size, kind } => {
130                    self.loose_objects += 1;
131                    self.count(kind, size);
132                }
133                find::Header::Packed(packed) => {
134                    self.packed_objects += 1;
135                    self.packed_delta_objects += usize::from(packed.num_deltas > 0);
136                    self.total_delta_chain_length += u64::from(packed.num_deltas);
137                    self.count(packed.kind, packed.object_size);
138                }
139            }
140        }
141    }
142
143    #[derive(Default)]
144    struct Reduce {
145        stats: Statistics,
146    }
147
148    impl gix::parallel::Reduce for Reduce {
149        type Input = Result<Vec<(gix::ObjectId, gix::odb::find::Header)>, anyhow::Error>;
150        type FeedProduce = ();
151        type Output = Statistics;
152        type Error = anyhow::Error;
153
154        fn feed(&mut self, items: Self::Input) -> Result<Self::FeedProduce, Self::Error> {
155            for (id, item) in items? {
156                self.stats.consume(item);
157                if let Some(ids) = self.stats.ids.as_mut() {
158                    ids.push(id);
159                }
160            }
161            Ok(())
162        }
163
164        fn finalize(mut self) -> Result<Self::Output, Self::Error> {
165            self.stats.total_objects = self.stats.loose_objects + self.stats.packed_objects;
166            Ok(self.stats)
167        }
168    }
169
170    let cancelled = || anyhow::anyhow!("Cancelled by user");
171    let object_ids = repo.objects.iter()?.filter_map(Result::ok);
172    let chunk_size = 1_000;
173    let mut stats = if gix::parallel::num_threads(thread_limit) > 1 {
174        gix::parallel::in_parallel(
175            gix::interrupt::Iter::new(
176                gix::features::iter::Chunks {
177                    inner: object_ids,
178                    size: chunk_size,
179                },
180                cancelled,
181            ),
182            thread_limit,
183            {
184                let objects = repo.objects.clone();
185                move |_| (objects.clone().into_inner(), counter)
186            },
187            |ids, (handle, counter)| {
188                let ids = ids?;
189                counter.fetch_add(ids.len(), Ordering::Relaxed);
190                let out = ids
191                    .into_iter()
192                    .map(|id| handle.header(id).map(|hdr| (id, hdr)))
193                    .collect::<Result<Vec<_>, _>>()?;
194                Ok(out)
195            },
196            Reduce {
197                stats: Statistics {
198                    ids: extra_header_lookup.then(Vec::new),
199                    ..Default::default()
200                },
201            },
202        )?
203    } else {
204        if extra_header_lookup {
205            bail!("extra-header-lookup is only meaningful in threaded mode");
206        }
207        let mut stats = Statistics::default();
208
209        for (count, id) in object_ids.enumerate() {
210            if count % chunk_size == 0 && gix::interrupt::is_triggered() {
211                return Err(cancelled());
212            }
213            stats.consume(repo.objects.header(id)?);
214            progress.inc();
215        }
216        stats
217    };
218
219    progress.show_throughput(start);
220
221    if let Some(mut ids) = stats.ids.take() {
222        // Critical to re-open the repo to assure we don't have any ODB state and start fresh.
223        let start = std::time::Instant::now();
224        let repo = gix::open_opts(repo.git_dir(), repo.open_options().to_owned())?;
225        progress.set_name("re-counting".into());
226        progress.init(Some(ids.len()), gix::progress::count("objects"));
227        let counter = progress.counter();
228        counter.store(0, Ordering::Relaxed);
229        let errors = gix::parallel::in_parallel_with_slice(
230            &mut ids,
231            thread_limit,
232            {
233                let objects = repo.objects.clone();
234                move |_| (objects.clone().into_inner(), counter, false)
235            },
236            |id, (odb, counter, has_error), _threads_left, _stop_everything| -> anyhow::Result<()> {
237                counter.fetch_add(1, Ordering::Relaxed);
238                if let Err(_err) = odb.header(id) {
239                    *has_error = true;
240                    gix::trace::error!(err = ?_err, "Object that is known to be present wasn't found");
241                }
242                Ok(())
243            },
244            || Some(std::time::Duration::from_millis(100)),
245            |(_, _, has_error)| has_error,
246        )?;
247
248        progress.show_throughput(start);
249        if errors.contains(&true) {
250            bail!("At least one object couldn't be looked up even though it must exist");
251        }
252    }
253
254    #[cfg(feature = "serde")]
255    {
256        serde_json::to_writer_pretty(out, &stats)?;
257    }
258
259    Ok(())
260}
261
262pub fn entries(repo: gix::Repository, format: OutputFormat, mut out: impl io::Write) -> anyhow::Result<()> {
263    if format != OutputFormat::Human {
264        bail!("Only human output format is supported at the moment");
265    }
266
267    for object in repo.objects.iter()? {
268        let object = object?;
269        writeln!(out, "{object}")?;
270    }
271
272    Ok(())
273}