gix_pack/data/output/
bytes.rs

1use std::io::Write;
2
3use crate::{data::output, exact_vec};
4
5/// The error returned by `next()` in the [`FromEntriesIter`] iterator.
6#[allow(missing_docs)]
7#[derive(Debug, thiserror::Error)]
8pub enum Error<E>
9where
10    E: std::error::Error + 'static,
11{
12    #[error(transparent)]
13    Io(#[from] gix_hash::io::Error),
14    #[error(transparent)]
15    Input(E),
16}
17
18/// An implementation of [`Iterator`] to write [encoded entries][output::Entry] to an inner implementation each time
19/// `next()` is called.
20pub struct FromEntriesIter<I, W> {
21    /// An iterator for input [`output::Entry`] instances
22    pub input: I,
23    /// A way of writing encoded bytes.
24    output: gix_hash::io::Write<W>,
25    /// Our trailing hash when done writing all input entries
26    trailer: Option<gix_hash::ObjectId>,
27    /// The amount of objects in the iteration and the version of the packfile to be written.
28    /// Will be `None` to signal the header was written already.
29    header_info: Option<(crate::data::Version, u32)>,
30    /// The pack data version with which pack entries should be written.
31    entry_version: crate::data::Version,
32    /// The amount of written bytes thus far
33    written: u64,
34    /// Required to quickly find offsets by object IDs, as future objects may refer to those in the past to become a delta offset base.
35    /// It stores the pack offsets at which objects begin.
36    /// Additionally we store if an object was invalid, and if so we will not write it nor will we allow delta objects to it.
37    pack_offsets_and_validity: Vec<(u64, bool)>,
38    /// If we are done, no additional writes will occur
39    is_done: bool,
40}
41
42impl<I, W, E> FromEntriesIter<I, W>
43where
44    I: Iterator<Item = Result<Vec<output::Entry>, E>>,
45    W: std::io::Write,
46    E: std::error::Error + 'static,
47{
48    /// Create a new instance reading [entries][output::Entry] from an `input` iterator and write pack data bytes to
49    /// `output` writer, resembling a pack of `version` with exactly `num_entries` amount of objects contained in it.
50    /// `object_hash` is the kind of hash to use for the pack checksum and maybe other places, depending on the version.
51    ///
52    /// The input chunks are expected to be sorted already. You can use the [`InOrderIter`][gix_features::parallel::InOrderIter] to assure
53    /// this happens on the fly holding entire chunks in memory as long as needed for them to be dispensed in order.
54    ///
55    /// # Panics
56    ///
57    /// Not all combinations of `object_hash` and `version` are supported currently triggering assertion errors.
58    pub fn new(
59        input: I,
60        output: W,
61        num_entries: u32,
62        version: crate::data::Version,
63        object_hash: gix_hash::Kind,
64    ) -> Self {
65        assert!(
66            matches!(version, crate::data::Version::V2),
67            "currently only pack version 2 can be written",
68        );
69        FromEntriesIter {
70            input,
71            output: gix_hash::io::Write::new(output, object_hash),
72            trailer: None,
73            entry_version: version,
74            pack_offsets_and_validity: exact_vec(num_entries as usize),
75            written: 0,
76            header_info: Some((version, num_entries)),
77            is_done: false,
78        }
79    }
80
81    /// Consume this instance and return the `output` implementation.
82    ///
83    /// _Note_ that the `input` iterator can be moved out of this instance beforehand.
84    pub fn into_write(self) -> W {
85        self.output.inner
86    }
87
88    /// Returns the trailing hash over all written entries once done.
89    /// It's `None` if we are not yet done writing.
90    pub fn digest(&self) -> Option<gix_hash::ObjectId> {
91        self.trailer
92    }
93
94    fn next_inner(&mut self) -> Result<u64, Error<E>> {
95        let previous_written = self.written;
96        if let Some((version, num_entries)) = self.header_info.take() {
97            let header_bytes = crate::data::header::encode(version, num_entries);
98            self.output
99                .write_all(&header_bytes[..])
100                .map_err(gix_hash::io::Error::from)?;
101            self.written += header_bytes.len() as u64;
102        }
103        match self.input.next() {
104            Some(entries) => {
105                for entry in entries.map_err(Error::Input)? {
106                    if entry.is_invalid() {
107                        self.pack_offsets_and_validity.push((0, false));
108                        continue;
109                    }
110                    self.pack_offsets_and_validity.push((self.written, true));
111                    let header = entry.to_entry_header(self.entry_version, |index| {
112                        let (base_offset, is_valid_object) = self.pack_offsets_and_validity[index];
113                        if !is_valid_object {
114                            unreachable!("if you see this the object database is correct as a delta refers to a non-existing object")
115                        }
116                        self.written - base_offset
117                    });
118                    self.written += header
119                        .write_to(entry.decompressed_size as u64, &mut self.output)
120                        .map_err(gix_hash::io::Error::from)? as u64;
121                    self.written += std::io::copy(&mut &*entry.compressed_data, &mut self.output)
122                        .map_err(gix_hash::io::Error::from)?;
123                }
124            }
125            None => {
126                let digest = self
127                    .output
128                    .hash
129                    .clone()
130                    .try_finalize()
131                    .map_err(gix_hash::io::Error::from)?;
132                self.output
133                    .inner
134                    .write_all(digest.as_slice())
135                    .map_err(gix_hash::io::Error::from)?;
136                self.written += digest.as_slice().len() as u64;
137                self.output.inner.flush().map_err(gix_hash::io::Error::from)?;
138                self.is_done = true;
139                self.trailer = Some(digest);
140            }
141        }
142        Ok(self.written - previous_written)
143    }
144}
145
146impl<I, W, E> Iterator for FromEntriesIter<I, W>
147where
148    I: Iterator<Item = Result<Vec<output::Entry>, E>>,
149    W: std::io::Write,
150    E: std::error::Error + 'static,
151{
152    /// The amount of bytes written to `out` if `Ok` or the error `E` received from the input.
153    type Item = Result<u64, Error<E>>;
154
155    fn next(&mut self) -> Option<Self::Item> {
156        if self.is_done {
157            return None;
158        }
159        Some(match self.next_inner() {
160            Err(err) => {
161                self.is_done = true;
162                Err(err)
163            }
164            Ok(written) => Ok(written),
165        })
166    }
167}