git_pack/multi_index/
write.rs

1use std::{
2    convert::TryInto,
3    path::PathBuf,
4    sync::atomic::{AtomicBool, Ordering},
5    time::{Instant, SystemTime},
6};
7
8use git_features::progress::Progress;
9
10use crate::multi_index;
11
12mod error {
13    /// The error returned by [multi_index::File::write_from_index_paths()][super::multi_index::File::write_from_index_paths()]..
14    #[derive(Debug, thiserror::Error)]
15    #[allow(missing_docs)]
16    pub enum Error {
17        #[error(transparent)]
18        Io(#[from] std::io::Error),
19        #[error("Interrupted")]
20        Interrupted,
21        #[error(transparent)]
22        OpenIndex(#[from] crate::index::init::Error),
23    }
24}
25pub use error::Error;
26
27/// An entry suitable for sorting and writing
28pub(crate) struct Entry {
29    pub(crate) id: git_hash::ObjectId,
30    pub(crate) pack_index: u32,
31    pub(crate) pack_offset: crate::data::Offset,
32    /// Used for sorting in case of duplicates
33    index_mtime: SystemTime,
34}
35
36/// Options for use in [`multi_index::File::write_from_index_paths()`].
37pub struct Options {
38    /// The kind of hash to use for objects and to expect in the input files.
39    pub object_hash: git_hash::Kind,
40}
41
42/// The result of [`multi_index::File::write_from_index_paths()`].
43pub struct Outcome<P> {
44    /// The calculated multi-index checksum of the file at `multi_index_path`.
45    pub multi_index_checksum: git_hash::ObjectId,
46    /// The input progress
47    pub progress: P,
48}
49
50/// The progress ids used in [`write_from_index_paths()`][multi_index::File::write_from_index_paths()].
51///
52/// Use this information to selectively extract the progress of interest in case the parent application has custom visualization.
53#[derive(Debug, Copy, Clone)]
54pub enum ProgressId {
55    /// Counts each path in the input set whose entries we enumerate and write into the multi-index
56    FromPathsCollectingEntries,
57    /// The amount of bytes written as part of the multi-index.
58    BytesWritten,
59}
60
61impl From<ProgressId> for git_features::progress::Id {
62    fn from(v: ProgressId) -> Self {
63        match v {
64            ProgressId::FromPathsCollectingEntries => *b"MPCE",
65            ProgressId::BytesWritten => *b"MPBW",
66        }
67    }
68}
69
70impl multi_index::File {
71    pub(crate) const SIGNATURE: &'static [u8] = b"MIDX";
72    pub(crate) const HEADER_LEN: usize = 4 /*signature*/ +
73        1 /*version*/ +
74        1 /*object id version*/ +
75        1 /*num chunks */ +
76        1 /*num base files */ +
77        4 /*num pack files*/;
78
79    /// Create a new multi-index file for writing to `out` from the pack index files at `index_paths`.
80    ///
81    /// Progress is sent to `progress` and interruptions checked via `should_interrupt`.
82    pub fn write_from_index_paths<P>(
83        mut index_paths: Vec<PathBuf>,
84        out: impl std::io::Write,
85        mut progress: P,
86        should_interrupt: &AtomicBool,
87        Options { object_hash }: Options,
88    ) -> Result<Outcome<P>, Error>
89    where
90        P: Progress,
91    {
92        let out = git_features::hash::Write::new(out, object_hash);
93        let (index_paths_sorted, index_filenames_sorted) = {
94            index_paths.sort();
95            let file_names = index_paths
96                .iter()
97                .map(|p| PathBuf::from(p.file_name().expect("file name present")))
98                .collect::<Vec<_>>();
99            (index_paths, file_names)
100        };
101
102        let entries = {
103            let mut entries = Vec::new();
104            let start = Instant::now();
105            let mut progress =
106                progress.add_child_with_id("Collecting entries", ProgressId::FromPathsCollectingEntries.into());
107            progress.init(Some(index_paths_sorted.len()), git_features::progress::count("indices"));
108
109            // This could be parallelized… but it's probably not worth it unless you have 500mio objects.
110            for (index_id, index) in index_paths_sorted.iter().enumerate() {
111                let mtime = index
112                    .metadata()
113                    .and_then(|m| m.modified())
114                    .unwrap_or(SystemTime::UNIX_EPOCH);
115                let index = crate::index::File::at(index, object_hash)?;
116
117                entries.reserve(index.num_objects() as usize);
118                entries.extend(index.iter().map(|e| Entry {
119                    id: e.oid,
120                    pack_index: index_id as u32,
121                    pack_offset: e.pack_offset,
122                    index_mtime: mtime,
123                }));
124                progress.inc();
125                if should_interrupt.load(Ordering::Relaxed) {
126                    return Err(Error::Interrupted);
127                }
128            }
129            progress.show_throughput(start);
130
131            let start = Instant::now();
132            progress.set_name("Deduplicate");
133            progress.init(Some(entries.len()), git_features::progress::count("entries"));
134            entries.sort_by(|l, r| {
135                l.id.cmp(&r.id)
136                    .then_with(|| l.index_mtime.cmp(&r.index_mtime).reverse())
137                    .then_with(|| l.pack_index.cmp(&r.pack_index))
138            });
139            entries.dedup_by_key(|e| e.id);
140            progress.inc_by(entries.len());
141            progress.show_throughput(start);
142            if should_interrupt.load(Ordering::Relaxed) {
143                return Err(Error::Interrupted);
144            }
145            entries
146        };
147
148        let mut cf = git_chunk::file::Index::for_writing();
149        cf.plan_chunk(
150            multi_index::chunk::index_names::ID,
151            multi_index::chunk::index_names::storage_size(&index_filenames_sorted),
152        );
153        cf.plan_chunk(multi_index::chunk::fanout::ID, multi_index::chunk::fanout::SIZE as u64);
154        cf.plan_chunk(
155            multi_index::chunk::lookup::ID,
156            multi_index::chunk::lookup::storage_size(entries.len(), object_hash),
157        );
158        cf.plan_chunk(
159            multi_index::chunk::offsets::ID,
160            multi_index::chunk::offsets::storage_size(entries.len()),
161        );
162
163        let num_large_offsets = multi_index::chunk::large_offsets::num_large_offsets(&entries);
164        if let Some(num_large_offsets) = num_large_offsets {
165            cf.plan_chunk(
166                multi_index::chunk::large_offsets::ID,
167                multi_index::chunk::large_offsets::storage_size(num_large_offsets),
168            );
169        }
170
171        let mut write_progress = progress.add_child_with_id("Writing multi-index", ProgressId::BytesWritten.into());
172        let write_start = Instant::now();
173        write_progress.init(
174            Some(cf.planned_storage_size() as usize + Self::HEADER_LEN),
175            git_features::progress::bytes(),
176        );
177        let mut out = git_features::progress::Write {
178            inner: out,
179            progress: write_progress,
180        };
181
182        let bytes_written = Self::write_header(
183            &mut out,
184            cf.num_chunks().try_into().expect("BUG: wrote more than 256 chunks"),
185            index_paths_sorted.len() as u32,
186            object_hash,
187        )?;
188
189        {
190            progress.set_name("Writing chunks");
191            progress.init(Some(cf.num_chunks()), git_features::progress::count("chunks"));
192
193            let mut chunk_write = cf.into_write(&mut out, bytes_written)?;
194            while let Some(chunk_to_write) = chunk_write.next_chunk() {
195                match chunk_to_write {
196                    multi_index::chunk::index_names::ID => {
197                        multi_index::chunk::index_names::write(&index_filenames_sorted, &mut chunk_write)?
198                    }
199                    multi_index::chunk::fanout::ID => multi_index::chunk::fanout::write(&entries, &mut chunk_write)?,
200                    multi_index::chunk::lookup::ID => multi_index::chunk::lookup::write(&entries, &mut chunk_write)?,
201                    multi_index::chunk::offsets::ID => {
202                        multi_index::chunk::offsets::write(&entries, num_large_offsets.is_some(), &mut chunk_write)?
203                    }
204                    multi_index::chunk::large_offsets::ID => multi_index::chunk::large_offsets::write(
205                        &entries,
206                        num_large_offsets.expect("available if planned"),
207                        &mut chunk_write,
208                    )?,
209                    unknown => unreachable!("BUG: forgot to implement chunk {:?}", std::str::from_utf8(&unknown)),
210                }
211                progress.inc();
212                if should_interrupt.load(Ordering::Relaxed) {
213                    return Err(Error::Interrupted);
214                }
215            }
216        }
217
218        // write trailing checksum
219        let multi_index_checksum: git_hash::ObjectId = out.inner.hash.digest().into();
220        out.inner.inner.write_all(multi_index_checksum.as_slice())?;
221        out.progress.show_throughput(write_start);
222
223        Ok(Outcome {
224            multi_index_checksum,
225            progress,
226        })
227    }
228
229    fn write_header(
230        mut out: impl std::io::Write,
231        num_chunks: u8,
232        num_indices: u32,
233        object_hash: git_hash::Kind,
234    ) -> std::io::Result<usize> {
235        out.write_all(Self::SIGNATURE)?;
236        out.write_all(&[crate::multi_index::Version::V1 as u8])?;
237        out.write_all(&[object_hash as u8])?;
238        out.write_all(&[num_chunks])?;
239        out.write_all(&[0])?; /* unused number of base files */
240        out.write_all(&num_indices.to_be_bytes())?;
241
242        Ok(Self::HEADER_LEN)
243    }
244}