git_pack/data/output/
bytes.rs

1use std::io::Write;
2
3use git_features::hash;
4
5use crate::data::output;
6
7/// The error returned by `next()` in the [`FromEntriesIter`] iterator.
8#[allow(missing_docs)]
9#[derive(Debug, thiserror::Error)]
10pub enum Error<E>
11where
12    E: std::error::Error + 'static,
13{
14    #[error(transparent)]
15    Io(#[from] std::io::Error),
16    #[error(transparent)]
17    Input(E),
18}
19
20/// An implementation of [`Iterator`] to write [encoded entries][output::Entry] to an inner implementation each time
21/// `next()` is called.
22pub struct FromEntriesIter<I, W> {
23    /// An iterator for input [`output::Entry`] instances
24    pub input: I,
25    /// A way of writing encoded bytes.
26    output: hash::Write<W>,
27    /// Our trailing hash when done writing all input entries
28    trailer: Option<git_hash::ObjectId>,
29    /// The amount of objects in the iteration and the version of the packfile to be written.
30    /// Will be `None` to signal the header was written already.
31    header_info: Option<(crate::data::Version, u32)>,
32    /// The pack data version with which pack entries should be written.
33    entry_version: crate::data::Version,
34    /// The amount of written bytes thus far
35    written: u64,
36    /// Required to quickly find offsets by object IDs, as future objects may refer to those in the past to become a delta offset base.
37    /// It stores the pack offsets at which objects begin.
38    /// Additionally we store if an object was invalid, and if so we will not write it nor will we allow delta objects to it.
39    pack_offsets_and_validity: Vec<(u64, bool)>,
40    /// If we are done, no additional writes will occur
41    is_done: bool,
42}
43
44impl<I, W, E> FromEntriesIter<I, W>
45where
46    I: Iterator<Item = Result<Vec<output::Entry>, E>>,
47    W: std::io::Write,
48    E: std::error::Error + 'static,
49{
50    /// Create a new instance reading [entries][output::Entry] from an `input` iterator and write pack data bytes to
51    /// `output` writer, resembling a pack of `version` with exactly `num_entries` amount of objects contained in it.
52    /// `object_hash` is the kind of hash to use for the pack checksum and maybe other places, depending on the version.
53    ///
54    /// The input chunks are expected to be sorted already. You can use the [InOrderIter][git_features::parallel::InOrderIter] to assure
55    /// this happens on the fly holding entire chunks in memory as long as needed for them to be dispensed in order.
56    ///
57    /// # Panics
58    ///
59    /// Not all combinations of `object_hash` and `version` are supported currently triggering assertion errors.
60    pub fn new(
61        input: I,
62        output: W,
63        num_entries: u32,
64        version: crate::data::Version,
65        object_hash: git_hash::Kind,
66    ) -> Self {
67        assert!(
68            matches!(version, crate::data::Version::V2),
69            "currently only pack version 2 can be written",
70        );
71        FromEntriesIter {
72            input,
73            output: hash::Write::new(output, object_hash),
74            trailer: None,
75            entry_version: version,
76            pack_offsets_and_validity: Vec::with_capacity(num_entries as usize),
77            written: 0,
78            header_info: Some((version, num_entries)),
79            is_done: false,
80        }
81    }
82
83    /// Consume this instance and return the `output` implementation.
84    ///
85    /// _Note_ that the `input` iterator can be moved out of this instance beforehand.
86    pub fn into_write(self) -> W {
87        self.output.inner
88    }
89
90    /// Returns the trailing hash over all written entries once done.
91    /// It's `None` if we are not yet done writing.
92    pub fn digest(&self) -> Option<git_hash::ObjectId> {
93        self.trailer
94    }
95
96    fn next_inner(&mut self) -> Result<u64, Error<E>> {
97        let previous_written = self.written;
98        if let Some((version, num_entries)) = self.header_info.take() {
99            let header_bytes = crate::data::header::encode(version, num_entries);
100            self.output.write_all(&header_bytes[..])?;
101            self.written += header_bytes.len() as u64;
102        }
103        match self.input.next() {
104            Some(entries) => {
105                for entry in entries.map_err(Error::Input)? {
106                    if entry.is_invalid() {
107                        self.pack_offsets_and_validity.push((0, false));
108                        continue;
109                    };
110                    self.pack_offsets_and_validity.push((self.written, true));
111                    let header = entry.to_entry_header(self.entry_version, |index| {
112                        let (base_offset, is_valid_object) = self.pack_offsets_and_validity[index];
113                        if !is_valid_object {
114                            unreachable!("if you see this the object database is correct as a delta refers to a non-existing object")
115                        }
116                        self.written - base_offset
117                    });
118                    self.written += header.write_to(entry.decompressed_size as u64, &mut self.output)? as u64;
119                    self.written += std::io::copy(&mut &*entry.compressed_data, &mut self.output)?;
120                }
121            }
122            None => {
123                let digest = self.output.hash.clone().digest();
124                self.output.inner.write_all(&digest[..])?;
125                self.written += digest.len() as u64;
126                self.output.inner.flush()?;
127                self.is_done = true;
128                self.trailer = Some(git_hash::ObjectId::from(digest));
129            }
130        };
131        Ok(self.written - previous_written)
132    }
133}
134
135impl<I, W, E> Iterator for FromEntriesIter<I, W>
136where
137    I: Iterator<Item = Result<Vec<output::Entry>, E>>,
138    W: std::io::Write,
139    E: std::error::Error + 'static,
140{
141    /// The amount of bytes written to `out` if `Ok` or the error `E` received from the input.
142    type Item = Result<u64, Error<E>>;
143
144    fn next(&mut self) -> Option<Self::Item> {
145        if self.is_done {
146            return None;
147        }
148        Some(match self.next_inner() {
149            Err(err) => {
150                self.is_done = true;
151                Err(err)
152            }
153            Ok(written) => Ok(written),
154        })
155    }
156}