use crate::zlib::stream::inflate::InflateReaderBoxed;
use crate::{hash, pack, zlib::stream::inflate::Inflate};
use git_features::hash::Sha1;
use git_object::owned;
use quick_error::quick_error;
use std::{fs, io};
quick_error! {
#[derive(Debug)]
pub enum Error {
Io(err: io::Error) {
display("An IO operation failed while streaming an entry")
from()
source(err)
}
PackParse(err: pack::data::parse::Error) {
display("The pack header could not be parsed")
from()
source(err)
}
ChecksumMismatch { expected: owned::Id, actual: owned::Id } {
display("pack checksum in trailer was {}, but actual checksum was {}", expected, actual)
}
}
}
#[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub struct Entry {
pub header: pack::data::Header,
pub header_size: u16,
pub pack_offset: u64,
pub compressed: Option<Vec<u8>>,
pub compressed_size: u64,
pub crc32: Option<u32>,
pub decompressed_size: u64,
pub trailer: Option<owned::Id>,
}
pub struct Iter<R> {
read: R,
decompressor: Option<Box<Inflate>>,
offset: u64,
had_error: bool,
kind: pack::data::Kind,
objects_left: u32,
hash: Option<Sha1>,
mode: Mode,
compressed: CompressedBytesMode,
compressed_buf: Option<Vec<u8>>,
}
#[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub enum Mode {
AsIs,
Verify,
Restore,
}
#[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub enum CompressedBytesMode {
Ignore,
CRC32,
Keep,
KeepAndCRC32,
}
impl CompressedBytesMode {
pub fn crc32(&self) -> bool {
match self {
CompressedBytesMode::KeepAndCRC32 | CompressedBytesMode::CRC32 => true,
CompressedBytesMode::Keep | CompressedBytesMode::Ignore => false,
}
}
pub fn keep(&self) -> bool {
match self {
CompressedBytesMode::Keep | CompressedBytesMode::KeepAndCRC32 => true,
CompressedBytesMode::Ignore | CompressedBytesMode::CRC32 => false,
}
}
}
impl<R> Iter<R>
where
R: io::BufRead,
{
pub fn kind(&self) -> pack::data::Kind {
self.kind
}
pub fn mode(&self) -> Mode {
self.mode
}
pub fn new_from_header(mut read: R, trailer: Mode, compressed: CompressedBytesMode) -> Result<Iter<R>, Error> {
let mut header_data = [0u8; 12];
read.read_exact(&mut header_data)?;
let (kind, num_objects) = pack::data::parse::header(&header_data)?;
assert_eq!(
kind,
pack::data::Kind::V2,
"let's stop here if we see undocumented pack formats"
);
Ok(Iter {
read,
decompressor: None,
compressed,
offset: 12,
had_error: false,
kind,
objects_left: num_objects,
hash: if trailer != Mode::AsIs {
let mut hash = Sha1::default();
hash.update(&header_data);
Some(hash)
} else {
None
},
mode: trailer,
compressed_buf: None,
})
}
fn next_inner(&mut self) -> Result<Entry, Error> {
self.objects_left -= 1;
let entry = match self.hash.take() {
Some(hash) => {
let mut read = read_and_pass_to(
&mut self.read,
hash::Write {
inner: io::sink(),
hash,
},
);
let res = pack::data::Entry::from_read(&mut read, self.offset);
self.hash = Some(read.write.hash);
res
}
None => pack::data::Entry::from_read(&mut self.read, self.offset),
}
.map_err(Error::from)?;
let mut decompressor = self.decompressor.take().unwrap_or_default();
let compressed_buf = self.compressed_buf.take().unwrap_or_else(|| Vec::with_capacity(4096));
decompressor.reset();
let mut decompressed_reader = InflateReaderBoxed {
inner: read_and_pass_to(
&mut self.read,
if self.compressed.keep() {
Vec::with_capacity(entry.decompressed_size as usize)
} else {
compressed_buf
},
),
decompressor,
};
let bytes_copied = io::copy(&mut decompressed_reader, &mut io::sink())?;
debug_assert_eq!(
bytes_copied, entry.decompressed_size,
"We should have decompressed {} bytes, but got {} instead",
entry.decompressed_size, bytes_copied
);
let pack_offset = self.offset;
let compressed_size = decompressed_reader.decompressor.total_in;
self.offset += entry.header_size() as u64 + compressed_size;
self.decompressor = Some(decompressed_reader.decompressor);
let mut compressed = decompressed_reader.inner.write;
debug_assert_eq!(
compressed_size,
compressed.len() as u64,
"we must track exactly the same amount of bytes as read by the decompressor"
);
if let Some(hash) = self.hash.as_mut() {
hash.update(&compressed);
}
let crc32 = if self.compressed.crc32() {
let mut header_buf = [0u8; 32];
let header_len = entry.header.to_write(bytes_copied, header_buf.as_mut())?;
let state = git_features::hash::crc32_update(0, &header_buf[..header_len]);
Some(git_features::hash::crc32_update(state, &compressed))
} else {
None
};
let compressed = if self.compressed.keep() {
Some(compressed)
} else {
compressed.clear();
self.compressed_buf = Some(compressed);
None
};
let trailer = if self.objects_left == 0 {
let mut id = owned::Id::from([0; 20]);
if let Err(err) = self.read.read_exact(id.as_mut_slice()) {
if self.mode != Mode::Restore {
return Err(err.into());
}
}
if let Some(hash) = self.hash.take() {
let actual_id = owned::Id::from(hash.digest());
if self.mode == Mode::Restore {
id = actual_id;
}
if id != actual_id {
return Err(Error::ChecksumMismatch {
actual: actual_id,
expected: id,
});
}
}
Some(id)
} else if self.mode == Mode::Restore {
let hash = self.hash.clone().expect("in restore mode a hash is set");
Some(owned::Id::from(hash.digest()))
} else {
None
};
Ok(Entry {
header: entry.header,
header_size: entry.header_size() as u16,
compressed,
compressed_size,
crc32,
pack_offset,
decompressed_size: bytes_copied,
trailer,
})
}
}
fn read_and_pass_to<R: io::Read, W: io::Write>(read: &mut R, to: W) -> PassThrough<&mut R, W> {
PassThrough { read, write: to }
}
impl<R> Iterator for Iter<R>
where
R: io::BufRead,
{
type Item = Result<Entry, Error>;
fn next(&mut self) -> Option<Self::Item> {
if self.had_error || self.objects_left == 0 {
return None;
}
let result = self.next_inner();
self.had_error = result.is_err();
if self.had_error {
self.objects_left = 0;
}
if self.mode == Mode::Restore && self.had_error {
None
} else {
Some(result)
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.objects_left as usize, Some(self.objects_left as usize))
}
}
impl<R> std::iter::ExactSizeIterator for Iter<R> where R: io::BufRead {}
struct PassThrough<R, W> {
read: R,
write: W,
}
impl<R, W> io::BufRead for PassThrough<R, W>
where
Self: io::Read,
R: io::BufRead,
W: io::Write,
{
fn fill_buf(&mut self) -> io::Result<&[u8]> {
self.read.fill_buf()
}
fn consume(&mut self, amt: usize) {
let buf = self
.read
.fill_buf()
.expect("never fail as we called fill-buf before and this does nothing");
self.write
.write_all(&buf[..amt])
.expect("a write to never fail - should be a memory buffer");
self.read.consume(amt)
}
}
impl<R, W> io::Read for PassThrough<R, W>
where
W: io::Write,
R: io::Read,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let bytes_read = self.read.read(buf)?;
self.write.write_all(&buf[..bytes_read])?;
Ok(bytes_read)
}
}
impl pack::data::File {
pub fn streaming_iter(&self) -> Result<Iter<impl io::BufRead>, Error> {
let reader = io::BufReader::with_capacity(4096 * 8, fs::File::open(&self.path)?);
Iter::new_from_header(reader, Mode::Verify, CompressedBytesMode::KeepAndCRC32)
}
}