gitoxide_core/repository/
odb.rs

1use std::{io, sync::atomic::Ordering};
2
3use anyhow::bail;
4
5use crate::OutputFormat;
6
7#[cfg_attr(not(feature = "serde"), allow(unused_variables))]
8pub fn info(
9    repo: gix::Repository,
10    format: OutputFormat,
11    out: impl io::Write,
12    mut err: impl io::Write,
13) -> anyhow::Result<()> {
14    if format == OutputFormat::Human {
15        writeln!(err, "Only JSON is implemented - using that instead")?;
16    }
17
18    #[cfg_attr(feature = "serde", derive(serde::Serialize))]
19    pub struct Statistics {
20        #[cfg_attr(not(feature = "serde"), allow(dead_code))]
21        pub path: std::path::PathBuf,
22        #[cfg_attr(not(feature = "serde"), allow(dead_code))]
23        pub object_hash: String,
24        #[cfg_attr(not(feature = "serde"), allow(dead_code))]
25        pub use_multi_pack_index: bool,
26        #[cfg_attr(not(feature = "serde"), allow(dead_code))]
27        pub structure: Vec<gix::odb::store::structure::Record>,
28        #[cfg_attr(not(feature = "serde"), allow(dead_code))]
29        pub metrics: gix::odb::store::Metrics,
30    }
31
32    let store = repo.objects.store_ref();
33    let stats = Statistics {
34        path: store.path().into(),
35        object_hash: store.object_hash().to_string(),
36        use_multi_pack_index: store.use_multi_pack_index(),
37        structure: store.structure()?,
38        metrics: store.metrics(),
39    };
40
41    #[cfg(feature = "serde")]
42    {
43        serde_json::to_writer_pretty(out, &stats)?;
44    }
45
46    Ok(())
47}
48
49pub mod statistics {
50    use crate::OutputFormat;
51
52    pub const PROGRESS_RANGE: std::ops::RangeInclusive<u8> = 0..=3;
53
54    #[derive(Debug, Copy, Clone)]
55    pub struct Options {
56        pub format: OutputFormat,
57        pub thread_limit: Option<usize>,
58        /// A debug-flag that triggers looking up the headers of all objects again, but without indices preloaded
59        pub extra_header_lookup: bool,
60    }
61}
62
63#[cfg_attr(not(feature = "serde"), allow(unused_variables))]
64pub fn statistics(
65    repo: gix::Repository,
66    mut progress: impl gix::Progress,
67    out: impl io::Write,
68    mut err: impl io::Write,
69    statistics::Options {
70        format,
71        thread_limit,
72        extra_header_lookup,
73    }: statistics::Options,
74) -> anyhow::Result<()> {
75    use bytesize::ByteSize;
76    use gix::odb::{find, HeaderExt};
77
78    if format == OutputFormat::Human {
79        writeln!(err, "Only JSON is implemented - using that instead")?;
80    }
81
82    progress.init(None, gix::progress::count("objects"));
83    progress.set_name("counting".into());
84    let counter = progress.counter();
85    let start = std::time::Instant::now();
86
87    #[cfg_attr(feature = "serde", derive(serde::Serialize))]
88    #[derive(Default)]
89    struct Statistics {
90        /// All objects that were used to produce these statistics.
91        /// Only `Some` if we are doing an extra round of header queries on a repository without loaded indices.
92        #[cfg_attr(feature = "serde", serde(skip_serializing))]
93        ids: Option<Vec<gix::ObjectId>>,
94        total_objects: usize,
95        loose_objects: usize,
96        packed_objects: usize,
97        packed_delta_objects: usize,
98        total_delta_chain_length: u64,
99        trees: usize,
100        trees_size: ByteSize,
101        tags: usize,
102        tags_size: ByteSize,
103        commits: usize,
104        commits_size: ByteSize,
105        blobs: usize,
106        blobs_size: ByteSize,
107    }
108
109    impl Statistics {
110        fn count(&mut self, kind: gix::object::Kind, size: u64) {
111            use gix::object::Kind::*;
112            match kind {
113                Commit => {
114                    self.commits += 1;
115                    self.commits_size += size;
116                }
117                Tree => {
118                    self.trees += 1;
119                    self.trees_size += size;
120                }
121                Tag => {
122                    self.tags += 1;
123                    self.tags_size += size;
124                }
125                Blob => {
126                    self.blobs += 1;
127                    self.blobs_size += size;
128                }
129            }
130        }
131        fn consume(&mut self, item: gix::odb::find::Header) {
132            match item {
133                find::Header::Loose { size, kind } => {
134                    self.loose_objects += 1;
135                    self.count(kind, size);
136                }
137                find::Header::Packed(packed) => {
138                    self.packed_objects += 1;
139                    self.packed_delta_objects += usize::from(packed.num_deltas > 0);
140                    self.total_delta_chain_length += u64::from(packed.num_deltas);
141                    self.count(packed.kind, packed.object_size);
142                }
143            }
144        }
145    }
146
147    #[derive(Default)]
148    struct Reduce {
149        stats: Statistics,
150    }
151
152    impl gix::parallel::Reduce for Reduce {
153        type Input = Result<Vec<(gix::ObjectId, gix::odb::find::Header)>, anyhow::Error>;
154        type FeedProduce = ();
155        type Output = Statistics;
156        type Error = anyhow::Error;
157
158        fn feed(&mut self, items: Self::Input) -> Result<Self::FeedProduce, Self::Error> {
159            for (id, item) in items? {
160                self.stats.consume(item);
161                if let Some(ids) = self.stats.ids.as_mut() {
162                    ids.push(id);
163                }
164            }
165            Ok(())
166        }
167
168        fn finalize(mut self) -> Result<Self::Output, Self::Error> {
169            self.stats.total_objects = self.stats.loose_objects + self.stats.packed_objects;
170            Ok(self.stats)
171        }
172    }
173
174    let cancelled = || anyhow::anyhow!("Cancelled by user");
175    let object_ids = repo.objects.iter()?.filter_map(Result::ok);
176    let chunk_size = 1_000;
177    let mut stats = if gix::parallel::num_threads(thread_limit) > 1 {
178        gix::parallel::in_parallel(
179            gix::interrupt::Iter::new(
180                gix::features::iter::Chunks {
181                    inner: object_ids,
182                    size: chunk_size,
183                },
184                cancelled,
185            ),
186            thread_limit,
187            {
188                let objects = repo.objects.clone();
189                move |_| (objects.clone().into_inner(), counter)
190            },
191            |ids, (handle, counter)| {
192                let ids = ids?;
193                counter.fetch_add(ids.len(), Ordering::Relaxed);
194                let out = ids
195                    .into_iter()
196                    .map(|id| handle.header(id).map(|hdr| (id, hdr)))
197                    .collect::<Result<Vec<_>, _>>()?;
198                Ok(out)
199            },
200            Reduce {
201                stats: Statistics {
202                    ids: extra_header_lookup.then(Vec::new),
203                    ..Default::default()
204                },
205            },
206        )?
207    } else {
208        if extra_header_lookup {
209            bail!("extra-header-lookup is only meaningful in threaded mode");
210        }
211        let mut stats = Statistics::default();
212
213        for (count, id) in object_ids.enumerate() {
214            if count % chunk_size == 0 && gix::interrupt::is_triggered() {
215                return Err(cancelled());
216            }
217            stats.consume(repo.objects.header(id)?);
218            progress.inc();
219        }
220        stats
221    };
222
223    progress.show_throughput(start);
224
225    if let Some(mut ids) = stats.ids.take() {
226        // Critical to re-open the repo to assure we don't have any ODB state and start fresh.
227        let start = std::time::Instant::now();
228        let repo = gix::open_opts(repo.git_dir(), repo.open_options().to_owned())?;
229        progress.set_name("re-counting".into());
230        progress.init(Some(ids.len()), gix::progress::count("objects"));
231        let counter = progress.counter();
232        counter.store(0, Ordering::Relaxed);
233        let errors = gix::parallel::in_parallel_with_slice(
234            &mut ids,
235            thread_limit,
236            {
237                let objects = repo.objects.clone();
238                move |_| (objects.clone().into_inner(), counter, false)
239            },
240            |id, (odb, counter, has_error), _threads_left, _stop_everything| -> anyhow::Result<()> {
241                counter.fetch_add(1, Ordering::Relaxed);
242                if let Err(_err) = odb.header(id) {
243                    *has_error = true;
244                    gix::trace::error!(err = ?_err, "Object that is known to be present wasn't found");
245                }
246                Ok(())
247            },
248            || Some(std::time::Duration::from_millis(100)),
249            |(_, _, has_error)| has_error,
250        )?;
251
252        progress.show_throughput(start);
253        if errors.contains(&true) {
254            bail!("At least one object couldn't be looked up even though it must exist");
255        }
256    }
257
258    #[cfg(feature = "serde")]
259    {
260        serde_json::to_writer_pretty(out, &stats)?;
261    }
262
263    Ok(())
264}
265
266pub fn entries(repo: gix::Repository, format: OutputFormat, mut out: impl io::Write) -> anyhow::Result<()> {
267    if format != OutputFormat::Human {
268        bail!("Only human output format is supported at the moment");
269    }
270
271    for object in repo.objects.iter()? {
272        let object = object?;
273        writeln!(out, "{object}")?;
274    }
275
276    Ok(())
277}