Skip to main content

gix_pack/multi_index/
write.rs

1use std::time::SystemTime;
2
3use crate::multi_index;
4
5mod error {
6    /// The error returned by [`crate::multi_index::write_from_index_paths()`].
7    #[derive(Debug, thiserror::Error)]
8    #[allow(missing_docs)]
9    pub enum Error {
10        #[error(transparent)]
11        Io(#[from] gix_hash::io::Error),
12        #[error("Interrupted")]
13        Interrupted,
14        #[error(transparent)]
15        OpenIndex(#[from] crate::index::init::Error),
16    }
17}
18pub use error::Error;
19
20/// An entry suitable for sorting and writing
21pub(crate) struct Entry {
22    pub(crate) id: gix_hash::ObjectId,
23    pub(crate) pack_index: u32,
24    pub(crate) pack_offset: crate::data::Offset,
25    /// Used for sorting in case of duplicates
26    index_mtime: SystemTime,
27}
28
29/// Options for use in [`multi_index::write_from_index_paths()`].
30pub struct Options {
31    /// The kind of hash to use for objects and to expect in the input files.
32    pub object_hash: gix_hash::Kind,
33}
34
35/// The result of [`multi_index::write_from_index_paths()`].
36pub struct Outcome {
37    /// The calculated multi-index checksum of the file at `multi_index_path`.
38    pub multi_index_checksum: gix_hash::ObjectId,
39}
40
41/// The progress ids used in [`crate::multi_index::write_from_index_paths()`].
42///
43/// Use this information to selectively extract the progress of interest in case the parent application has custom visualization.
44#[derive(Debug, Copy, Clone)]
45pub enum ProgressId {
46    /// Counts each path in the input set whose entries we enumerate and write into the multi-index
47    FromPathsCollectingEntries,
48    /// The amount of bytes written as part of the multi-index.
49    BytesWritten,
50}
51
52impl From<ProgressId> for gix_features::progress::Id {
53    fn from(v: ProgressId) -> Self {
54        match v {
55            ProgressId::FromPathsCollectingEntries => *b"MPCE",
56            ProgressId::BytesWritten => *b"MPBW",
57        }
58    }
59}
60
61impl<T> multi_index::File<T> {
62    pub(crate) const SIGNATURE: &'static [u8] = b"MIDX";
63    pub(crate) const HEADER_LEN: usize = 4 /*signature*/ +
64        1 /*version*/ +
65        1 /*object id version*/ +
66        1 /*num chunks */ +
67        1 /*num base files */ +
68        4 /*num pack files*/;
69}
70
71pub(super) mod function {
72    use std::{
73        path::PathBuf,
74        sync::atomic::{AtomicBool, Ordering},
75        time::{Instant, SystemTime},
76    };
77
78    use gix_features::progress::{Count, DynNestedProgress, Progress};
79
80    use crate::{MMap, multi_index};
81
82    use super::{Entry, Error, Options, Outcome, ProgressId};
83
84    /// Create a new multi-index file for writing to `out` from the pack index files at `index_paths`.
85    ///
86    /// Progress is sent to `progress` and interruptions checked via `should_interrupt`.
87    pub fn write_from_index_paths(
88        mut index_paths: Vec<PathBuf>,
89        out: &mut dyn std::io::Write,
90        progress: &mut dyn DynNestedProgress,
91        should_interrupt: &AtomicBool,
92        Options { object_hash }: Options,
93    ) -> Result<Outcome, Error> {
94        let out = gix_hash::io::Write::new(out, object_hash);
95        let (index_paths_sorted, index_filenames_sorted) = {
96            index_paths.sort();
97            let file_names = index_paths
98                .iter()
99                .map(|p| PathBuf::from(p.file_name().expect("file name present")))
100                .collect::<Vec<_>>();
101            (index_paths, file_names)
102        };
103
104        let entries = {
105            let mut entries = Vec::new();
106            let start = Instant::now();
107            let mut progress = progress.add_child_with_id(
108                "Collecting entries".into(),
109                ProgressId::FromPathsCollectingEntries.into(),
110            );
111            progress.init(Some(index_paths_sorted.len()), gix_features::progress::count("indices"));
112
113            // This could be parallelized… but it's probably not worth it unless you have 500mio objects.
114            for (index_id, index) in index_paths_sorted.iter().enumerate() {
115                let mtime = index
116                    .metadata()
117                    .and_then(|m| m.modified())
118                    .unwrap_or(SystemTime::UNIX_EPOCH);
119                let index = crate::index::File::at(index, object_hash)?;
120
121                entries.reserve(index.num_objects() as usize);
122                entries.extend(index.iter().map(|e| Entry {
123                    id: e.oid,
124                    pack_index: index_id as u32,
125                    pack_offset: e.pack_offset,
126                    index_mtime: mtime,
127                }));
128                progress.inc();
129                if should_interrupt.load(Ordering::Relaxed) {
130                    return Err(Error::Interrupted);
131                }
132            }
133            progress.show_throughput(start);
134
135            let start = Instant::now();
136            progress.set_name("Deduplicate".into());
137            progress.init(Some(entries.len()), gix_features::progress::count("entries"));
138            entries.sort_by(|l, r| {
139                l.id.cmp(&r.id)
140                    .then_with(|| l.index_mtime.cmp(&r.index_mtime).reverse())
141                    .then_with(|| l.pack_index.cmp(&r.pack_index))
142            });
143            entries.dedup_by_key(|e| e.id);
144            progress.inc_by(entries.len());
145            progress.show_throughput(start);
146            if should_interrupt.load(Ordering::Relaxed) {
147                return Err(Error::Interrupted);
148            }
149            entries
150        };
151
152        let mut cf = gix_chunk::file::Index::for_writing();
153        cf.plan_chunk(
154            multi_index::chunk::index_names::ID,
155            multi_index::chunk::index_names::storage_size(&index_filenames_sorted),
156        );
157        cf.plan_chunk(multi_index::chunk::fanout::ID, multi_index::chunk::fanout::SIZE as u64);
158        cf.plan_chunk(
159            multi_index::chunk::lookup::ID,
160            multi_index::chunk::lookup::storage_size(entries.len(), object_hash),
161        );
162        cf.plan_chunk(
163            multi_index::chunk::offsets::ID,
164            multi_index::chunk::offsets::storage_size(entries.len()),
165        );
166
167        let num_large_offsets = multi_index::chunk::large_offsets::num_large_offsets(&entries);
168        if let Some(num_large_offsets) = num_large_offsets {
169            cf.plan_chunk(
170                multi_index::chunk::large_offsets::ID,
171                multi_index::chunk::large_offsets::storage_size(num_large_offsets),
172            );
173        }
174
175        let mut write_progress =
176            progress.add_child_with_id("Writing multi-index".into(), ProgressId::BytesWritten.into());
177        let write_start = Instant::now();
178        write_progress.init(
179            Some(cf.planned_storage_size() as usize + multi_index::File::<MMap>::HEADER_LEN),
180            gix_features::progress::bytes(),
181        );
182        let mut out = gix_features::progress::Write {
183            inner: out,
184            progress: write_progress,
185        };
186
187        let bytes_written = multi_index::File::<MMap>::write_header(
188            &mut out,
189            cf.num_chunks().try_into().expect("BUG: wrote more than 256 chunks"),
190            index_paths_sorted.len() as u32,
191            object_hash,
192        )
193        .map_err(gix_hash::io::Error::from)?;
194
195        {
196            progress.set_name("Writing chunks".into());
197            progress.init(Some(cf.num_chunks()), gix_features::progress::count("chunks"));
198
199            let mut chunk_write = cf
200                .into_write(&mut out, bytes_written)
201                .map_err(gix_hash::io::Error::from)?;
202            while let Some(chunk_to_write) = chunk_write.next_chunk() {
203                match chunk_to_write {
204                    multi_index::chunk::index_names::ID => {
205                        multi_index::chunk::index_names::write(&index_filenames_sorted, &mut chunk_write)
206                    }
207                    multi_index::chunk::fanout::ID => multi_index::chunk::fanout::write(&entries, &mut chunk_write),
208                    multi_index::chunk::lookup::ID => multi_index::chunk::lookup::write(&entries, &mut chunk_write),
209                    multi_index::chunk::offsets::ID => {
210                        multi_index::chunk::offsets::write(&entries, num_large_offsets.is_some(), &mut chunk_write)
211                    }
212                    multi_index::chunk::large_offsets::ID => multi_index::chunk::large_offsets::write(
213                        &entries,
214                        num_large_offsets.expect("available if planned"),
215                        &mut chunk_write,
216                    ),
217                    unknown => unreachable!("BUG: forgot to implement chunk {:?}", std::str::from_utf8(&unknown)),
218                }
219                .map_err(gix_hash::io::Error::from)?;
220                progress.inc();
221                if should_interrupt.load(Ordering::Relaxed) {
222                    return Err(Error::Interrupted);
223                }
224            }
225        }
226
227        // write trailing checksum
228        let multi_index_checksum = out.inner.hash.try_finalize().map_err(gix_hash::io::Error::from)?;
229        out.inner
230            .inner
231            .write_all(multi_index_checksum.as_slice())
232            .map_err(gix_hash::io::Error::from)?;
233        out.progress.show_throughput(write_start);
234
235        Ok(Outcome { multi_index_checksum })
236    }
237}
238
239impl multi_index::File<crate::MMap> {
240    fn write_header(
241        out: &mut dyn std::io::Write,
242        num_chunks: u8,
243        num_indices: u32,
244        object_hash: gix_hash::Kind,
245    ) -> std::io::Result<usize> {
246        out.write_all(Self::SIGNATURE)?;
247        out.write_all(&[crate::multi_index::Version::V1 as u8])?;
248        out.write_all(&[object_hash as u8])?;
249        out.write_all(&[num_chunks])?;
250        out.write_all(&[0])?; /* unused number of base files */
251        out.write_all(&num_indices.to_be_bytes())?;
252
253        Ok(Self::HEADER_LEN)
254    }
255}