gitoxide_core/repository/
odb.rs

1use std::{io, sync::atomic::Ordering};
2
3use anyhow::bail;
4
5use crate::OutputFormat;
6
7#[cfg_attr(not(feature = "serde"), allow(unused_variables))]
8pub fn info(
9    repo: gix::Repository,
10    format: OutputFormat,
11    out: impl io::Write,
12    mut err: impl io::Write,
13) -> anyhow::Result<()> {
14    if format == OutputFormat::Human {
15        writeln!(err, "Only JSON is implemented - using that instead")?;
16    }
17
18    #[cfg_attr(feature = "serde", derive(serde::Serialize))]
19    pub struct Statistics {
20        pub path: std::path::PathBuf,
21        pub object_hash: String,
22        pub use_multi_pack_index: bool,
23        pub structure: Vec<gix::odb::store::structure::Record>,
24        pub metrics: gix::odb::store::Metrics,
25    }
26
27    let store = repo.objects.store_ref();
28    let stats = Statistics {
29        path: store.path().into(),
30        object_hash: store.object_hash().to_string(),
31        use_multi_pack_index: store.use_multi_pack_index(),
32        structure: store.structure()?,
33        metrics: store.metrics(),
34    };
35
36    #[cfg(feature = "serde")]
37    {
38        serde_json::to_writer_pretty(out, &stats)?;
39    }
40
41    Ok(())
42}
43
44pub mod statistics {
45    use crate::OutputFormat;
46
47    pub const PROGRESS_RANGE: std::ops::RangeInclusive<u8> = 0..=3;
48
49    #[derive(Debug, Copy, Clone)]
50    pub struct Options {
51        pub format: OutputFormat,
52        pub thread_limit: Option<usize>,
53        /// A debug-flag that triggers looking up the headers of all objects again, but without indices preloaded
54        pub extra_header_lookup: bool,
55    }
56}
57
58#[cfg_attr(not(feature = "serde"), allow(unused_variables))]
59pub fn statistics(
60    repo: gix::Repository,
61    mut progress: impl gix::Progress,
62    out: impl io::Write,
63    mut err: impl io::Write,
64    statistics::Options {
65        format,
66        thread_limit,
67        extra_header_lookup,
68    }: statistics::Options,
69) -> anyhow::Result<()> {
70    use bytesize::ByteSize;
71    use gix::odb::{find, HeaderExt};
72
73    if format == OutputFormat::Human {
74        writeln!(err, "Only JSON is implemented - using that instead")?;
75    }
76
77    progress.init(None, gix::progress::count("objects"));
78    progress.set_name("counting".into());
79    let counter = progress.counter();
80    let start = std::time::Instant::now();
81
82    #[cfg_attr(feature = "serde", derive(serde::Serialize))]
83    #[derive(Default)]
84    struct Statistics {
85        /// All objects that were used to produce these statistics.
86        /// Only `Some` if we are doing an extra round of header queries on a repository without loaded indices.
87        #[cfg_attr(feature = "serde", serde(skip_serializing))]
88        ids: Option<Vec<gix::ObjectId>>,
89        total_objects: usize,
90        loose_objects: usize,
91        packed_objects: usize,
92        packed_delta_objects: usize,
93        total_delta_chain_length: u64,
94        trees: usize,
95        trees_size: ByteSize,
96        tags: usize,
97        tags_size: ByteSize,
98        commits: usize,
99        commits_size: ByteSize,
100        blobs: usize,
101        blobs_size: ByteSize,
102    }
103
104    impl Statistics {
105        fn count(&mut self, kind: gix::object::Kind, size: u64) {
106            use gix::object::Kind::*;
107            match kind {
108                Commit => {
109                    self.commits += 1;
110                    self.commits_size += size;
111                }
112                Tree => {
113                    self.trees += 1;
114                    self.trees_size += size;
115                }
116                Tag => {
117                    self.tags += 1;
118                    self.tags_size += size;
119                }
120                Blob => {
121                    self.blobs += 1;
122                    self.blobs_size += size;
123                }
124            }
125        }
126        fn consume(&mut self, item: gix::odb::find::Header) {
127            match item {
128                find::Header::Loose { size, kind } => {
129                    self.loose_objects += 1;
130                    self.count(kind, size);
131                }
132                find::Header::Packed(packed) => {
133                    self.packed_objects += 1;
134                    self.packed_delta_objects += usize::from(packed.num_deltas > 0);
135                    self.total_delta_chain_length += u64::from(packed.num_deltas);
136                    self.count(packed.kind, packed.object_size);
137                }
138            }
139        }
140    }
141
142    #[derive(Default)]
143    struct Reduce {
144        stats: Statistics,
145    }
146
147    impl gix::parallel::Reduce for Reduce {
148        type Input = Result<Vec<(gix::ObjectId, gix::odb::find::Header)>, anyhow::Error>;
149        type FeedProduce = ();
150        type Output = Statistics;
151        type Error = anyhow::Error;
152
153        fn feed(&mut self, items: Self::Input) -> Result<Self::FeedProduce, Self::Error> {
154            for (id, item) in items? {
155                self.stats.consume(item);
156                if let Some(ids) = self.stats.ids.as_mut() {
157                    ids.push(id);
158                }
159            }
160            Ok(())
161        }
162
163        fn finalize(mut self) -> Result<Self::Output, Self::Error> {
164            self.stats.total_objects = self.stats.loose_objects + self.stats.packed_objects;
165            Ok(self.stats)
166        }
167    }
168
169    let cancelled = || anyhow::anyhow!("Cancelled by user");
170    let object_ids = repo.objects.iter()?.filter_map(Result::ok);
171    let chunk_size = 1_000;
172    let mut stats = if gix::parallel::num_threads(thread_limit) > 1 {
173        gix::parallel::in_parallel(
174            gix::interrupt::Iter::new(
175                gix::features::iter::Chunks {
176                    inner: object_ids,
177                    size: chunk_size,
178                },
179                cancelled,
180            ),
181            thread_limit,
182            {
183                let objects = repo.objects.clone();
184                move |_| (objects.clone().into_inner(), counter)
185            },
186            |ids, (handle, counter)| {
187                let ids = ids?;
188                counter.fetch_add(ids.len(), Ordering::Relaxed);
189                let out = ids
190                    .into_iter()
191                    .map(|id| handle.header(id).map(|hdr| (id, hdr)))
192                    .collect::<Result<Vec<_>, _>>()?;
193                Ok(out)
194            },
195            Reduce {
196                stats: Statistics {
197                    ids: extra_header_lookup.then(Vec::new),
198                    ..Default::default()
199                },
200            },
201        )?
202    } else {
203        if extra_header_lookup {
204            bail!("extra-header-lookup is only meaningful in threaded mode");
205        }
206        let mut stats = Statistics::default();
207
208        for (count, id) in object_ids.enumerate() {
209            if count % chunk_size == 0 && gix::interrupt::is_triggered() {
210                return Err(cancelled());
211            }
212            stats.consume(repo.objects.header(id)?);
213            progress.inc();
214        }
215        stats
216    };
217
218    progress.show_throughput(start);
219
220    if let Some(mut ids) = stats.ids.take() {
221        // Critical to re-open the repo to assure we don't have any ODB state and start fresh.
222        let start = std::time::Instant::now();
223        let repo = gix::open_opts(repo.git_dir(), repo.open_options().to_owned())?;
224        progress.set_name("re-counting".into());
225        progress.init(Some(ids.len()), gix::progress::count("objects"));
226        let counter = progress.counter();
227        counter.store(0, Ordering::Relaxed);
228        let errors = gix::parallel::in_parallel_with_slice(
229            &mut ids,
230            thread_limit,
231            {
232                let objects = repo.objects.clone();
233                move |_| (objects.clone().into_inner(), counter, false)
234            },
235            |id, (odb, counter, has_error), _threads_left, _stop_everything| -> anyhow::Result<()> {
236                counter.fetch_add(1, Ordering::Relaxed);
237                if let Err(_err) = odb.header(id) {
238                    *has_error = true;
239                    gix::trace::error!(err = ?_err, "Object that is known to be present wasn't found");
240                }
241                Ok(())
242            },
243            || Some(std::time::Duration::from_millis(100)),
244            |(_, _, has_error)| has_error,
245        )?;
246
247        progress.show_throughput(start);
248        if errors.contains(&true) {
249            bail!("At least one object couldn't be looked up even though it must exist");
250        }
251    }
252
253    #[cfg(feature = "serde")]
254    {
255        serde_json::to_writer_pretty(out, &stats)?;
256    }
257
258    Ok(())
259}
260
261pub fn entries(repo: gix::Repository, format: OutputFormat, mut out: impl io::Write) -> anyhow::Result<()> {
262    if format != OutputFormat::Human {
263        bail!("Only human output format is supported at the moment");
264    }
265
266    for object in repo.objects.iter()? {
267        let object = object?;
268        writeln!(out, "{object}")?;
269    }
270
271    Ok(())
272}