git_pack/data/input/
bytes_to_entries.rs

1use std::{fs, io};
2
3use git_features::{
4    hash,
5    hash::Sha1,
6    zlib::{stream::inflate::ReadBoxed, Decompress},
7};
8use git_hash::ObjectId;
9
10use crate::data::input;
11
12/// An iterator over [`Entries`][input::Entry] in a byte stream.
13///
14/// The iterator used as part of [Bundle::write_to_directory(…)][crate::Bundle::write_to_directory()].
15pub struct BytesToEntriesIter<BR> {
16    read: BR,
17    decompressor: Option<Box<Decompress>>,
18    offset: u64,
19    had_error: bool,
20    version: crate::data::Version,
21    objects_left: u32,
22    hash: Option<Sha1>,
23    mode: input::Mode,
24    compressed: input::EntryDataMode,
25    compressed_buf: Option<Vec<u8>>,
26    hash_len: usize,
27    object_hash: git_hash::Kind,
28}
29
30/// Access
31impl<BR> BytesToEntriesIter<BR> {
32    /// The pack version currently being iterated
33    pub fn version(&self) -> crate::data::Version {
34        self.version
35    }
36
37    /// The kind of iteration
38    pub fn mode(&self) -> input::Mode {
39        self.mode
40    }
41}
42
43/// Initialization
44impl<BR> BytesToEntriesIter<BR>
45where
46    BR: io::BufRead,
47{
48    /// Obtain an iterator from a `read` stream to a pack data file and configure it using `mode` and `compressed`.
49    /// `object_hash` specifies which hash is used for objects in ref-delta entries.
50    ///
51    /// Note that `read` is expected at the beginning of a valid pack data file with a header, entries and a trailer.
52    pub fn new_from_header(
53        mut read: BR,
54        mode: input::Mode,
55        compressed: input::EntryDataMode,
56        object_hash: git_hash::Kind,
57    ) -> Result<BytesToEntriesIter<BR>, input::Error> {
58        let mut header_data = [0u8; 12];
59        read.read_exact(&mut header_data)?;
60
61        let (version, num_objects) = crate::data::header::decode(&header_data)?;
62        assert_eq!(
63            version,
64            crate::data::Version::V2,
65            "let's stop here if we see undocumented pack formats"
66        );
67        Ok(BytesToEntriesIter {
68            read,
69            decompressor: None,
70            compressed,
71            offset: 12,
72            had_error: false,
73            version,
74            objects_left: num_objects,
75            hash: (mode != input::Mode::AsIs).then(|| {
76                let mut hash = git_features::hash::hasher(object_hash);
77                hash.update(&header_data);
78                hash
79            }),
80            mode,
81            compressed_buf: None,
82            hash_len: object_hash.len_in_bytes(),
83            object_hash,
84        })
85    }
86
87    fn next_inner(&mut self) -> Result<input::Entry, input::Error> {
88        self.objects_left -= 1; // even an error counts as objects
89
90        // Read header
91        let entry = match self.hash.take() {
92            Some(hash) => {
93                let mut read = read_and_pass_to(
94                    &mut self.read,
95                    hash::Write {
96                        inner: io::sink(),
97                        hash,
98                    },
99                );
100                let res = crate::data::Entry::from_read(&mut read, self.offset, self.hash_len);
101                self.hash = Some(read.write.hash);
102                res
103            }
104            None => crate::data::Entry::from_read(&mut self.read, self.offset, self.hash_len),
105        }
106        .map_err(input::Error::from)?;
107
108        // Decompress object to learn its compressed bytes
109        let mut decompressor = self
110            .decompressor
111            .take()
112            .unwrap_or_else(|| Box::new(Decompress::new(true)));
113        let compressed_buf = self.compressed_buf.take().unwrap_or_else(|| Vec::with_capacity(4096));
114        decompressor.reset(true);
115        let mut decompressed_reader = ReadBoxed {
116            inner: read_and_pass_to(
117                &mut self.read,
118                if self.compressed.keep() {
119                    Vec::with_capacity(entry.decompressed_size as usize)
120                } else {
121                    compressed_buf
122                },
123            ),
124            decompressor,
125        };
126
127        let bytes_copied = io::copy(&mut decompressed_reader, &mut io::sink())?;
128        if bytes_copied != entry.decompressed_size {
129            return Err(input::Error::IncompletePack {
130                actual: bytes_copied,
131                expected: entry.decompressed_size,
132            });
133        }
134
135        let pack_offset = self.offset;
136        let compressed_size = decompressed_reader.decompressor.total_in();
137        self.offset += entry.header_size() as u64 + compressed_size;
138        self.decompressor = Some(decompressed_reader.decompressor);
139
140        let mut compressed = decompressed_reader.inner.write;
141        debug_assert_eq!(
142            compressed_size,
143            compressed.len() as u64,
144            "we must track exactly the same amount of bytes as read by the decompressor"
145        );
146        if let Some(hash) = self.hash.as_mut() {
147            hash.update(&compressed);
148        }
149
150        let crc32 = if self.compressed.crc32() {
151            let mut header_buf = [0u8; 12 + git_hash::Kind::longest().len_in_bytes()];
152            let header_len = entry.header.write_to(bytes_copied, header_buf.as_mut())?;
153            let state = git_features::hash::crc32_update(0, &header_buf[..header_len]);
154            Some(git_features::hash::crc32_update(state, &compressed))
155        } else {
156            None
157        };
158
159        let compressed = if self.compressed.keep() {
160            Some(compressed)
161        } else {
162            compressed.clear();
163            self.compressed_buf = Some(compressed);
164            None
165        };
166
167        // Last objects gets trailer (which is potentially verified)
168        let trailer = self.try_read_trailer()?;
169        Ok(input::Entry {
170            header: entry.header,
171            header_size: entry.header_size() as u16,
172            compressed,
173            compressed_size,
174            crc32,
175            pack_offset,
176            decompressed_size: bytes_copied,
177            trailer,
178        })
179    }
180
181    fn try_read_trailer(&mut self) -> Result<Option<ObjectId>, input::Error> {
182        Ok(if self.objects_left == 0 {
183            let mut id = git_hash::ObjectId::null(self.object_hash);
184            if let Err(err) = self.read.read_exact(id.as_mut_slice()) {
185                if self.mode != input::Mode::Restore {
186                    return Err(err.into());
187                }
188            }
189
190            if let Some(hash) = self.hash.take() {
191                let actual_id = git_hash::ObjectId::from(hash.digest());
192                if self.mode == input::Mode::Restore {
193                    id = actual_id;
194                }
195                if id != actual_id {
196                    return Err(input::Error::ChecksumMismatch {
197                        actual: actual_id,
198                        expected: id,
199                    });
200                }
201            }
202            Some(id)
203        } else if self.mode == input::Mode::Restore {
204            let hash = self.hash.clone().expect("in restore mode a hash is set");
205            Some(git_hash::ObjectId::from(hash.digest()))
206        } else {
207            None
208        })
209    }
210}
211
212fn read_and_pass_to<R: io::Read, W: io::Write>(read: &mut R, to: W) -> PassThrough<&mut R, W> {
213    PassThrough { read, write: to }
214}
215
216impl<R> Iterator for BytesToEntriesIter<R>
217where
218    R: io::BufRead,
219{
220    type Item = Result<input::Entry, input::Error>;
221
222    fn next(&mut self) -> Option<Self::Item> {
223        if self.had_error || self.objects_left == 0 {
224            return None;
225        }
226        let result = self.next_inner();
227        self.had_error = result.is_err();
228        if self.had_error {
229            self.objects_left = 0;
230        }
231        if self.mode == input::Mode::Restore && self.had_error {
232            None
233        } else {
234            Some(result)
235        }
236    }
237
238    fn size_hint(&self) -> (usize, Option<usize>) {
239        (self.objects_left as usize, Some(self.objects_left as usize))
240    }
241}
242
243impl<R> std::iter::ExactSizeIterator for BytesToEntriesIter<R> where R: io::BufRead {}
244
245struct PassThrough<R, W> {
246    read: R,
247    write: W,
248}
249
250impl<R, W> io::BufRead for PassThrough<R, W>
251where
252    Self: io::Read,
253    R: io::BufRead,
254    W: io::Write,
255{
256    fn fill_buf(&mut self) -> io::Result<&[u8]> {
257        self.read.fill_buf()
258    }
259
260    fn consume(&mut self, amt: usize) {
261        let buf = self
262            .read
263            .fill_buf()
264            .expect("never fail as we called fill-buf before and this does nothing");
265        self.write
266            .write_all(&buf[..amt])
267            .expect("a write to never fail - should be a memory buffer");
268        self.read.consume(amt)
269    }
270}
271
272impl<R, W> io::Read for PassThrough<R, W>
273where
274    W: io::Write,
275    R: io::Read,
276{
277    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
278        let bytes_read = self.read.read(buf)?;
279        self.write.write_all(&buf[..bytes_read])?;
280        Ok(bytes_read)
281    }
282}
283
284impl crate::data::File {
285    /// Returns an iterator over [`Entries`][crate::data::input::Entry], without making use of the memory mapping.
286    pub fn streaming_iter(&self) -> Result<BytesToEntriesIter<impl io::BufRead>, input::Error> {
287        let reader = io::BufReader::with_capacity(4096 * 8, fs::File::open(&self.path)?);
288        BytesToEntriesIter::new_from_header(
289            reader,
290            input::Mode::Verify,
291            input::EntryDataMode::KeepAndCrc32,
292            self.object_hash,
293        )
294    }
295}