use std::{
collections::{BTreeMap, HashMap},
fs::File,
hash::Hash,
io::{BufRead, BufReader, Read, Seek, SeekFrom, Take, Write},
mem::MaybeUninit,
mem::size_of,
ops::Range,
sync::Arc,
};
use anyhow::{Context, Error, Result, bail, ensure};
use fn_error_context::context;
use rustix::{
buffer::spare_capacity,
io::{pread, read},
};
use tokio::task::JoinHandle;
use crate::repository::ObjectStoreMethod;
use zerocopy::{
FromBytes, Immutable, IntoBytes, KnownLayout,
little_endian::{I64, U16, U64},
};
use zstd::stream::{read::Decoder, write::Encoder};
use crate::{
fsverity::FsVerityHashValue,
repository::{Repository, WritableRepo},
util::read_exactish,
};
const SPLITSTREAM_MAGIC: [u8; 11] = *b"SplitStream";
const LG_BLOCKSIZE: u8 = 12;
#[repr(C)]
#[derive(Debug, Clone, Copy, FromBytes, Immutable, IntoBytes, KnownLayout)]
struct FileRange {
start: U64,
end: U64,
}
#[repr(C)]
#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)]
struct SplitstreamHeader {
pub magic: [u8; 11], pub version: u8, pub _flags: U16, pub algorithm: u8, pub lg_blocksize: u8, pub info: FileRange, }
#[repr(C)]
#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)]
struct SplitstreamInfo {
pub stream_refs: FileRange, pub object_refs: FileRange, pub stream: FileRange, pub named_refs: FileRange, pub content_type: U64, pub stream_size: U64, }
#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)]
struct OldSplitstreamHeader {
pub magic: [u8; 11],
pub version: u8,
pub _flags: U16,
pub algorithm: u8,
pub lg_blocksize: u8,
pub info: FileRange,
}
#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)]
struct OldSplitstreamInfo {
pub stream_refs: FileRange,
pub object_refs: FileRange,
pub stream: FileRange,
pub named_refs: FileRange,
pub content_type: U64,
pub stream_size: U64,
}
const _: () = {
assert!(std::mem::offset_of!(FileRange, start) == 0);
assert!(std::mem::offset_of!(FileRange, end) == 8);
assert!(std::mem::size_of::<FileRange>() == 16);
};
const _: () = {
assert!(std::mem::offset_of!(SplitstreamHeader, magic) == 0);
assert!(std::mem::offset_of!(SplitstreamHeader, info) == 16);
assert!(std::mem::size_of::<SplitstreamHeader>() == 32);
assert!(std::mem::offset_of!(OldSplitstreamHeader, info) == 0);
assert!(std::mem::offset_of!(OldSplitstreamHeader, _flags) == 16);
assert!(std::mem::offset_of!(OldSplitstreamHeader, magic) == 18);
assert!(std::mem::offset_of!(OldSplitstreamHeader, version) == 29);
assert!(std::mem::offset_of!(OldSplitstreamHeader, algorithm) == 30);
assert!(std::mem::offset_of!(OldSplitstreamHeader, lg_blocksize) == 31);
assert!(std::mem::size_of::<OldSplitstreamHeader>() == 32);
};
const _: () = {
assert!(
std::mem::offset_of!(SplitstreamInfo, stream_refs)
== std::mem::offset_of!(OldSplitstreamInfo, stream_refs)
);
assert!(
std::mem::offset_of!(SplitstreamInfo, object_refs)
== std::mem::offset_of!(OldSplitstreamInfo, object_refs)
);
assert!(
std::mem::offset_of!(SplitstreamInfo, stream)
== std::mem::offset_of!(OldSplitstreamInfo, stream)
);
assert!(
std::mem::offset_of!(SplitstreamInfo, named_refs)
== std::mem::offset_of!(OldSplitstreamInfo, named_refs)
);
assert!(
std::mem::offset_of!(SplitstreamInfo, content_type)
== std::mem::offset_of!(OldSplitstreamInfo, content_type)
);
assert!(
std::mem::offset_of!(SplitstreamInfo, stream_size)
== std::mem::offset_of!(OldSplitstreamInfo, stream_size)
);
assert!(std::mem::size_of::<SplitstreamInfo>() == std::mem::size_of::<OldSplitstreamInfo>());
};
impl From<OldSplitstreamHeader> for SplitstreamHeader {
fn from(old: OldSplitstreamHeader) -> Self {
Self {
magic: old.magic,
version: old.version,
_flags: old._flags,
algorithm: old.algorithm,
lg_blocksize: old.lg_blocksize,
info: old.info,
}
}
}
impl FileRange {
fn len(&self) -> Result<u64> {
self.end
.get()
.checked_sub(self.start.get())
.context("Negative-sized range in splitstream")
}
}
impl From<Range<u64>> for FileRange {
fn from(value: Range<u64>) -> Self {
Self {
start: U64::from(value.start),
end: U64::from(value.end),
}
}
}
#[context("Reading range from splitstream file")]
fn read_range(file: &mut File, range: FileRange) -> Result<Vec<u8>> {
let size: usize = (range.len()?.try_into())
.context("Unable to allocate buffer for implausibly large splitstream section")?;
let mut buffer = Vec::with_capacity(size);
if size > 0 {
pread(file, spare_capacity(&mut buffer), range.start.get())
.context("Unable to read section from splitstream file")?;
}
ensure!(
buffer.len() == size,
"Incomplete read from splitstream file"
);
Ok(buffer)
}
struct UniqueVec<T: Clone + Hash + Eq> {
items: Vec<T>,
index: HashMap<T, usize>,
}
impl<T: Clone + Hash + Eq + std::fmt::Debug> std::fmt::Debug for UniqueVec<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("UniqueVec")
.field("items", &self.items)
.field("index", &self.index)
.finish()
}
}
impl<T: Clone + Hash + Eq + IntoBytes + Immutable> UniqueVec<T> {
fn as_bytes(&self) -> &[u8] {
self.items.as_bytes()
}
}
impl<T: Clone + Hash + Eq> UniqueVec<T> {
fn new() -> Self {
Self {
items: Vec::new(),
index: HashMap::new(),
}
}
fn get(&self, item: &T) -> Option<usize> {
self.index.get(item).copied()
}
fn ensure(&mut self, item: &T) -> usize {
self.get(item).unwrap_or_else(|| {
let idx = self.items.len();
self.index.insert(item.clone(), idx);
self.items.push(item.clone());
idx
})
}
fn get_by_index(&self, idx: usize) -> Option<&T> {
self.items.get(idx)
}
}
#[derive(Debug, Clone, Default)]
pub struct SplitStreamStats {
pub inline_bytes: u64,
pub external_objects: Vec<(u64, ObjectStoreMethod)>,
}
pub enum SplitStreamEntry<ObjectID: FsVerityHashValue> {
Inline(Vec<u8>),
External {
handle: JoinHandle<Result<(ObjectID, ObjectStoreMethod)>>,
size: u64,
},
}
impl<ObjectID: FsVerityHashValue> std::fmt::Debug for SplitStreamEntry<ObjectID> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SplitStreamEntry::Inline(data) => {
f.debug_struct("Inline").field("len", &data.len()).finish()
}
SplitStreamEntry::External { size, .. } => f
.debug_struct("External")
.field("size", size)
.finish_non_exhaustive(),
}
}
}
pub struct SplitStreamBuilder<ObjectID: FsVerityHashValue> {
repo: Arc<Repository<ObjectID>>,
writable: WritableRepo,
entries: Vec<SplitStreamEntry<ObjectID>>,
total_external_size: u64,
total_inline_bytes: u64,
content_type: u64,
stream_refs: UniqueVec<ObjectID>,
named_refs: BTreeMap<Box<str>, usize>,
}
impl<ObjectID: FsVerityHashValue> std::fmt::Debug for SplitStreamBuilder<ObjectID> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SplitStreamBuilder")
.field("repo", &self.repo)
.field("entries", &self.entries)
.field("total_external_size", &self.total_external_size)
.field("content_type", &self.content_type)
.finish_non_exhaustive()
}
}
impl<ObjectID: FsVerityHashValue> SplitStreamBuilder<ObjectID> {
pub fn new(repo: Arc<Repository<ObjectID>>, content_type: u64) -> Result<Self> {
let writable = repo.ensure_writable_token()?;
Ok(Self {
repo,
writable,
entries: Vec::new(),
total_external_size: 0,
total_inline_bytes: 0,
content_type,
stream_refs: UniqueVec::new(),
named_refs: Default::default(),
})
}
pub fn push_inline(&mut self, data: &[u8]) {
if data.is_empty() {
return;
}
self.total_inline_bytes += data.len() as u64;
if let Some(SplitStreamEntry::Inline(existing)) = self.entries.last_mut() {
existing.extend_from_slice(data);
} else {
self.entries.push(SplitStreamEntry::Inline(data.to_vec()));
}
}
pub fn push_external(
&mut self,
handle: JoinHandle<Result<(ObjectID, ObjectStoreMethod)>>,
size: u64,
) {
self.total_external_size += size;
self.entries
.push(SplitStreamEntry::External { handle, size });
}
pub fn add_named_stream_ref(&mut self, name: &str, verity: &ObjectID) {
let idx = self.stream_refs.ensure(verity);
self.named_refs.insert(Box::from(name), idx);
}
pub async fn finish(self) -> Result<(ObjectID, SplitStreamStats)> {
let mut stats = SplitStreamStats {
inline_bytes: self.total_inline_bytes,
external_objects: Vec::new(),
};
let mut resolved_entries: Vec<ResolvedEntry<ObjectID>> =
Vec::with_capacity(self.entries.len());
for entry in self.entries {
match entry {
SplitStreamEntry::Inline(data) => {
resolved_entries.push(ResolvedEntry::Inline(data));
}
SplitStreamEntry::External { handle, size } => {
let (id, method) = handle.await??;
stats.external_objects.push((size, method));
resolved_entries.push(ResolvedEntry::External { id, size });
}
}
}
let mut writer = SplitStreamWriter::new(&self.repo, self.content_type, self.writable);
for (name, idx) in &self.named_refs {
let verity = self
.stream_refs
.get_by_index(*idx)
.expect("named ref index out of bounds");
writer.add_named_stream_ref(name, verity);
}
for entry in resolved_entries {
match entry {
ResolvedEntry::Inline(data) => {
writer.write_inline(&data);
}
ResolvedEntry::External { id, size } => {
writer.add_external_size(size);
writer.write_reference(id)?;
}
}
}
let id = tokio::task::spawn_blocking(move || writer.done()).await??;
Ok((id, stats))
}
}
#[derive(Debug)]
enum ResolvedEntry<ObjectID: FsVerityHashValue> {
Inline(Vec<u8>),
External { id: ObjectID, size: u64 },
}
pub struct SplitStreamWriter<ObjectId: FsVerityHashValue> {
repo: Arc<Repository<ObjectId>>,
writable: WritableRepo,
stream_refs: UniqueVec<ObjectId>,
object_refs: UniqueVec<ObjectId>,
named_refs: BTreeMap<Box<str>, usize>, inline_buffer: Vec<u8>,
total_size: u64,
writer: Encoder<'static, Vec<u8>>,
content_type: u64,
#[cfg(any(test, feature = "test"))]
write_old_format: bool,
}
impl<ObjectID: FsVerityHashValue> std::fmt::Debug for SplitStreamWriter<ObjectID> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SplitStreamWriter")
.field("repo", &self.repo)
.field("inline_content", &self.inline_buffer)
.finish()
}
}
impl<ObjectID: FsVerityHashValue> SplitStreamWriter<ObjectID> {
pub fn repo(&self) -> &Repository<ObjectID> {
&self.repo
}
pub(crate) fn writable(&self) -> &WritableRepo {
&self.writable
}
pub(crate) fn new(
repo: &Arc<Repository<ObjectID>>,
content_type: u64,
writable: WritableRepo,
) -> Self {
let writer = Encoder::new(vec![], 0).unwrap();
Self {
repo: Arc::clone(repo),
writable,
content_type,
inline_buffer: vec![],
stream_refs: UniqueVec::new(),
object_refs: UniqueVec::new(),
named_refs: Default::default(),
total_size: 0,
writer,
#[cfg(any(test, feature = "test"))]
write_old_format: repo.write_old_splitstream_format(),
}
}
pub fn add_object_ref(&mut self, verity: &ObjectID) -> usize {
self.object_refs.ensure(verity)
}
pub fn lookup_object_ref(&self, verity: &ObjectID) -> Option<usize> {
self.object_refs.get(verity)
}
pub fn add_named_stream_ref(&mut self, name: &str, verity: &ObjectID) {
let idx = self.stream_refs.ensure(verity);
self.named_refs.insert(Box::from(name), idx);
}
fn flush_inline(&mut self) -> Result<()> {
let size = self.inline_buffer.len();
if size > 0 {
let instruction = -i64::try_from(size).expect("implausibly large inline chunk");
self.writer.write_all(I64::new(instruction).as_bytes())?;
self.writer.write_all(&self.inline_buffer)?;
self.inline_buffer.clear();
}
Ok(())
}
pub fn write_inline(&mut self, data: &[u8]) {
self.total_size += data.len() as u64;
self.inline_buffer.extend(data);
}
pub fn add_external_size(&mut self, size: u64) {
self.total_size += size;
}
pub fn write_reference(&mut self, id: ObjectID) -> Result<()> {
self.flush_inline()?;
let index = self.add_object_ref(&id);
let instruction = i64::try_from(index).expect("implausibly large external index");
self.writer.write_all(I64::from(instruction).as_bytes())?;
Ok(())
}
pub fn write_external(&mut self, data: &[u8]) -> Result<()> {
self.total_size += data.len() as u64;
let id = self.repo.ensure_object_impl(data, &self.writable)?;
self.write_reference(id)
}
pub async fn write_external_async(&mut self, data: Vec<u8>) -> Result<()> {
self.total_size += data.len() as u64;
let self_ = Arc::clone(&self.repo);
let writable = self.writable;
let id = tokio::task::spawn_blocking(move || self_.ensure_object_impl(&data, &writable))
.await??;
self.write_reference(id)
}
fn write_named_refs(named_refs: BTreeMap<Box<str>, usize>) -> Result<Vec<u8>> {
let mut encoder = Encoder::new(vec![], 0)?;
for (name, idx) in &named_refs {
write!(&mut encoder, "{idx}:{name}\0")?;
}
Ok(encoder.finish()?)
}
pub fn done(mut self) -> Result<ObjectID> {
self.flush_inline()?;
let stream = self.writer.finish()?;
let header_start = 0u64;
let header_end = header_start + size_of::<SplitstreamHeader>() as u64;
let info_start = header_end;
let info_end = info_start + size_of::<SplitstreamInfo>() as u64;
assert_eq!(info_start % 8, 0);
let stream_refs_size = self.stream_refs.as_bytes().len();
let stream_refs_start = info_end;
let stream_refs_end = stream_refs_start + stream_refs_size as u64;
assert_eq!(stream_refs_start % 8, 0);
let object_refs_size = self.object_refs.as_bytes().len();
let object_refs_start = stream_refs_end;
let object_refs_end = object_refs_start + object_refs_size as u64;
assert_eq!(object_refs_start % 8, 0);
let named_refs =
Self::write_named_refs(self.named_refs).context("Formatting named references")?;
let named_refs_start = object_refs_end;
let named_refs_end = named_refs_start + named_refs.len() as u64;
assert_eq!(named_refs_start % 8, 0);
let stream_start = named_refs_end;
let stream_end = stream_start + stream.len() as u64;
let mut buf = vec![];
assert_eq!(buf.len() as u64, header_start);
buf.extend_from_slice(
SplitstreamHeader {
magic: SPLITSTREAM_MAGIC,
version: 0,
_flags: U16::ZERO,
algorithm: ObjectID::ALGORITHM.kernel_id(),
lg_blocksize: LG_BLOCKSIZE,
info: (info_start..info_end).into(),
}
.as_bytes(),
);
assert_eq!(buf.len() as u64, header_end);
assert_eq!(buf.len() as u64, info_start);
buf.extend_from_slice(
SplitstreamInfo {
stream_refs: (stream_refs_start..stream_refs_end).into(),
object_refs: (object_refs_start..object_refs_end).into(),
stream: (stream_start..stream_end).into(),
named_refs: (named_refs_start..named_refs_end).into(),
content_type: self.content_type.into(),
stream_size: self.total_size.into(),
}
.as_bytes(),
);
assert_eq!(buf.len() as u64, info_end);
assert_eq!(buf.len() as u64, stream_refs_start);
buf.extend_from_slice(self.stream_refs.as_bytes());
assert_eq!(buf.len() as u64, stream_refs_end);
assert_eq!(buf.len() as u64, object_refs_start);
buf.extend_from_slice(self.object_refs.as_bytes());
assert_eq!(buf.len() as u64, object_refs_end);
assert_eq!(buf.len() as u64, named_refs_start);
buf.extend_from_slice(&named_refs);
assert_eq!(buf.len() as u64, named_refs_end);
assert_eq!(buf.len() as u64, stream_start);
buf.extend_from_slice(&stream);
assert_eq!(buf.len() as u64, stream_end);
#[cfg(any(test, feature = "test"))]
let buf = if self.write_old_format {
new_to_old_format(&buf)
} else {
buf
};
self.repo.ensure_object_impl(&buf, &self.writable)
}
pub async fn done_async(self) -> Result<ObjectID> {
tokio::task::spawn_blocking(move || self.done()).await?
}
}
#[derive(Debug)]
pub enum SplitStreamData<ObjectID: FsVerityHashValue> {
Inline(Box<[u8]>),
External(ObjectID),
}
pub struct SplitStreamReader<ObjectID: FsVerityHashValue> {
decoder: Decoder<'static, BufReader<Take<File>>>,
inline_bytes: usize,
pub content_type: u64,
pub total_size: u64,
object_refs: Vec<ObjectID>,
named_refs: HashMap<Box<str>, ObjectID>,
}
impl<ObjectID: FsVerityHashValue> std::fmt::Debug for SplitStreamReader<ObjectID> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SplitStreamReader")
.field("refs", &self.object_refs)
.field("inline_bytes", &self.inline_bytes)
.finish()
}
}
fn read_into_vec(reader: &mut impl Read, vec: &mut Vec<u8>, size: usize) -> Result<()> {
vec.resize(size, 0u8);
reader.read_exact(vec.as_mut_slice())?;
Ok(())
}
enum ChunkType<ObjectID: FsVerityHashValue> {
Eof,
Inline,
External(ObjectID),
}
impl<ObjectID: FsVerityHashValue> SplitStreamReader<ObjectID> {
#[context("Creating new splitstream reader")]
pub fn new(mut file: File, expected_content_type: Option<u64>) -> Result<Self> {
let header = SplitstreamHeader::read_from_io(&mut file)
.map_err(|e| Error::msg(format!("Error reading splitstream header: {e:?}")))?;
let header = if header.magic != SPLITSTREAM_MAGIC {
file.seek(SeekFrom::Start(0))
.context("Seeking back to start for old-format header")?;
let old_header = OldSplitstreamHeader::read_from_io(&mut file).map_err(|e| {
Error::msg(format!(
"Error reading old-format splitstream header: {e:?}"
))
})?;
if old_header.magic != SPLITSTREAM_MAGIC {
bail!("Invalid splitstream header magic value");
}
old_header.into()
} else {
header
};
if header.version != 0 {
bail!("Invalid splitstream version {}", header.version);
}
if header.algorithm != ObjectID::ALGORITHM.kernel_id() {
bail!("Invalid splitstream fs-verity algorithm type");
}
if header.lg_blocksize != LG_BLOCKSIZE {
bail!("Invalid splitstream fs-verity block size");
}
let info_bytes = read_range(&mut file, header.info)?;
let (info, _) = SplitstreamInfo::ref_from_prefix(&info_bytes)
.map_err(|e| Error::msg(format!("Error reading splitstream metadata: {e:?}")))?;
let content_type: u64 = info.content_type.into();
if let Some(expected) = expected_content_type {
ensure!(content_type == expected, "Invalid splitstream content type");
}
let total_size: u64 = info.stream_size.into();
let stream_refs_bytes = read_range(&mut file, info.stream_refs)?;
let stream_refs = <[ObjectID]>::ref_from_bytes(&stream_refs_bytes)
.map_err(|e| Error::msg(format!("Error reading splitstream references: {e:?}")))?;
let object_refs_bytes = read_range(&mut file, info.object_refs)?;
let object_refs = <[ObjectID]>::ref_from_bytes(&object_refs_bytes)
.map_err(|e| Error::msg(format!("Error reading object references: {e:?}")))?;
let named_refs_bytes = read_range(&mut file, info.named_refs)?;
let named_refs = Self::read_named_references(&named_refs_bytes, stream_refs)
.map_err(|e| Error::msg(format!("Error reading splitstream mappings: {e:?}")))?;
file.seek(SeekFrom::Start(info.stream.start.get()))
.context("Unable to seek to start of splitstream content")?;
let decoder = Decoder::new(file.take(info.stream.len()?))
.context("Unable to decode zstd-compressed content in splitstream")?;
Ok(Self {
decoder,
inline_bytes: 0,
content_type,
total_size,
object_refs: object_refs.to_vec(),
named_refs,
})
}
fn read_named_references<ObjectId: FsVerityHashValue>(
section: &[u8],
references: &[ObjectId],
) -> Result<HashMap<Box<str>, ObjectId>> {
let mut map = HashMap::new();
let mut buffer = vec![];
let mut reader = BufReader::new(
Decoder::new(section).context("Creating zstd decoder for named references section")?,
);
loop {
reader
.read_until(b'\0', &mut buffer)
.context("Reading named references section")?;
let Some(item) = buffer.strip_suffix(b"\0") else {
ensure!(
buffer.is_empty(),
"Trailing junk in named references section"
);
return Ok(map);
};
let (idx_str, name) = std::str::from_utf8(item)
.context("Reading named references section")?
.split_once(":")
.context("Named reference doesn't contain a colon")?;
let idx: usize = idx_str
.parse()
.context("Named reference contains a non-integer index")?;
let object_id = references
.get(idx)
.context("Named reference out of bounds")?;
map.insert(Box::from(name), object_id.clone());
buffer.clear();
}
}
pub fn iter_named_refs(&self) -> impl Iterator<Item = (&str, &ObjectID)> {
self.named_refs.iter().map(|(name, id)| (name.as_ref(), id))
}
pub fn into_named_refs(self) -> HashMap<Box<str>, ObjectID> {
self.named_refs
}
fn ensure_chunk(
&mut self,
eof_ok: bool,
ext_ok: bool,
expected_bytes: usize,
) -> Result<ChunkType<ObjectID>> {
if self.inline_bytes == 0 {
let mut value = I64::ZERO;
if !read_exactish(&mut self.decoder, value.as_mut_bytes())? {
ensure!(eof_ok, "Unexpected EOF in splitstream");
return Ok(ChunkType::Eof);
}
match value.get() {
n if n < 0i64 => {
self.inline_bytes = (n.unsigned_abs().try_into())
.context("Splitstream inline section is too large")?;
}
n => {
ensure!(ext_ok, "Unexpected external reference in splitstream");
let idx = usize::try_from(n)
.context("Splitstream external reference is too large")?;
let id: &ObjectID = (self.object_refs.get(idx))
.context("Splitstream external reference is out of range")?;
return Ok(ChunkType::External(id.clone()));
}
}
}
if self.inline_bytes < expected_bytes {
bail!("Unexpectedly small inline content when parsing splitstream");
}
Ok(ChunkType::Inline)
}
pub fn read_inline_exact(&mut self, buffer: &mut [u8]) -> Result<bool> {
if let ChunkType::Inline = self.ensure_chunk(true, false, buffer.len())? {
self.inline_bytes -= buffer.len();
self.decoder.read_exact(buffer)?;
Ok(true)
} else {
Ok(false)
}
}
fn discard_padding(&mut self, size: usize) -> Result<()> {
let mut buf = [0u8; 512];
assert!(size <= 512);
self.ensure_chunk(false, false, size)?;
self.decoder.read_exact(&mut buf[0..size])?;
self.inline_bytes -= size;
Ok(())
}
pub fn read_exact(
&mut self,
actual_size: usize,
stored_size: usize,
) -> Result<SplitStreamData<ObjectID>> {
if let ChunkType::External(id) = self.ensure_chunk(false, true, stored_size)? {
if actual_size < stored_size {
self.discard_padding(stored_size - actual_size)?;
}
Ok(SplitStreamData::External(id))
} else {
let mut content = vec![];
read_into_vec(&mut self.decoder, &mut content, stored_size)?;
content.truncate(actual_size);
self.inline_bytes -= stored_size;
Ok(SplitStreamData::Inline(content.into()))
}
}
#[context("Concatenating splitstream to output")]
pub fn cat(&mut self, repo: &Repository<ObjectID>, output: &mut impl Write) -> Result<()> {
let mut buffer = vec![];
loop {
match self.ensure_chunk(true, true, 0)? {
ChunkType::Eof => break Ok(()),
ChunkType::Inline => {
read_into_vec(&mut self.decoder, &mut buffer, self.inline_bytes)?;
self.inline_bytes = 0;
output.write_all(&buffer)?;
}
ChunkType::External(ref id) => {
let mut buffer = [MaybeUninit::<u8>::uninit(); 1024 * 1024];
let fd = repo.open_object(id)?;
loop {
let (result, _) = read(&fd, &mut buffer)?;
if result.is_empty() {
break;
}
output.write_all(result)?;
}
}
}
}
}
#[context("Getting object references from splitstream")]
pub fn get_object_refs(&mut self, mut callback: impl FnMut(&ObjectID)) -> Result<()> {
for entry in &self.object_refs {
callback(entry);
}
Ok(())
}
pub fn lookup_named_ref(&self, name: &str) -> Option<&ObjectID> {
self.named_refs.get(name)
}
}
impl<ObjectID: FsVerityHashValue> Read for SplitStreamReader<ObjectID> {
fn read(&mut self, data: &mut [u8]) -> std::io::Result<usize> {
match self.ensure_chunk(true, false, 1) {
Ok(ChunkType::Eof) => Ok(0),
Ok(ChunkType::Inline) => {
let n_bytes = std::cmp::min(data.len(), self.inline_bytes);
self.decoder.read_exact(&mut data[0..n_bytes])?;
self.inline_bytes -= n_bytes;
Ok(n_bytes)
}
Ok(ChunkType::External(..)) => unreachable!(),
Err(e) => Err(std::io::Error::other(e)),
}
}
}
#[cfg(any(test, feature = "test"))]
pub fn new_to_old_format(new_format: &[u8]) -> Vec<u8> {
assert!(new_format.len() >= size_of::<SplitstreamHeader>());
let (header, _) = SplitstreamHeader::ref_from_prefix(new_format).unwrap();
assert_eq!(header.magic, SPLITSTREAM_MAGIC, "input must be new-format");
let old_header = OldSplitstreamHeader {
info: header.info,
_flags: header._flags,
magic: header.magic,
version: header.version,
algorithm: header.algorithm,
lg_blocksize: header.lg_blocksize,
};
let mut result = Vec::with_capacity(new_format.len());
result.extend_from_slice(old_header.as_bytes());
result.extend_from_slice(&new_format[size_of::<SplitstreamHeader>()..]);
result
}
#[cfg(test)]
mod tests {
use super::*;
use crate::fsverity::{Sha256HashValue, compute_verity};
use crate::test::tempdir;
use rustix::fs::CWD;
use std::io::Cursor;
use std::path::Path;
#[test]
fn test_read_into_vec() -> Result<()> {
let mut reader = Cursor::new(vec![]);
let mut vec = Vec::new();
let result = read_into_vec(&mut reader, &mut vec, 0);
assert!(result.is_ok());
assert_eq!(vec.len(), 0);
let mut reader = Cursor::new(vec![1, 2, 3, 4, 5]);
let mut vec = Vec::new();
let result = read_into_vec(&mut reader, &mut vec, 3);
assert!(result.is_ok());
assert_eq!(vec.len(), 3);
assert_eq!(vec, vec![1, 2, 3]);
let mut reader = Cursor::new(vec![1, 2, 3]);
let mut vec = Vec::new();
let result = read_into_vec(&mut reader, &mut vec, 5);
assert!(result.is_err());
let mut reader = Cursor::new(vec![1, 2, 3]);
let mut vec = Vec::new();
let result = read_into_vec(&mut reader, &mut vec, 3);
assert!(result.is_ok());
assert_eq!(vec.len(), 3);
assert_eq!(vec, vec![1, 2, 3]);
let mut reader = Cursor::new(vec![1, 2, 3, 4, 5]);
let mut vec = Vec::with_capacity(10);
let result = read_into_vec(&mut reader, &mut vec, 4);
assert!(result.is_ok());
assert_eq!(vec.len(), 4);
assert_eq!(vec, vec![1, 2, 3, 4]);
assert_eq!(vec.capacity(), 10);
let mut reader = Cursor::new(vec![1, 2, 3]);
let mut vec = vec![9, 9, 9];
let result = read_into_vec(&mut reader, &mut vec, 2);
assert!(result.is_ok());
assert_eq!(vec.len(), 2);
assert_eq!(vec, vec![1, 2]);
Ok(())
}
fn create_test_repo(path: &Path) -> Result<Arc<Repository<Sha256HashValue>>> {
let (repo, _) = Repository::init_path(
CWD,
path,
crate::repository::RepositoryConfig::default().set_insecure(),
)?;
Ok(Arc::new(repo))
}
fn generate_test_data(size: usize, seed: u8) -> Vec<u8> {
(0..size)
.map(|i| ((i as u8).wrapping_add(seed)).wrapping_mul(17))
.collect()
}
#[test]
fn test_splitstream_inline_only() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let inline1 = generate_test_data(32, 0xAB);
let inline2 = generate_test_data(48, 0xCD);
let mut writer = repo.create_stream(0)?;
writer.write_inline(&inline1);
writer.write_inline(&inline2);
let stream_id = repo.write_stream(writer, "test-inline", None)?;
let mut reader = repo.open_stream("test-inline", Some(&stream_id), None)?;
let mut output = Vec::new();
reader.cat(&repo, &mut output)?;
let mut expected = inline1.clone();
expected.extend(&inline2);
assert_eq!(output, expected, "inline-only roundtrip must be exact");
Ok(())
}
#[test]
fn test_splitstream_large_external() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let large_content = generate_test_data(128 * 1024, 0x42);
let expected_digest: Sha256HashValue = compute_verity(&large_content);
let mut writer = repo.create_stream(0)?;
writer.write_external(&large_content)?;
let stream_id = repo.write_stream(writer, "test-external", None)?;
let mut reader = repo.open_stream("test-external", Some(&stream_id), None)?;
let mut refs = Vec::new();
reader.get_object_refs(|id| refs.push(id.clone()))?;
assert_eq!(refs.len(), 1);
assert_eq!(
refs[0], expected_digest,
"external object must have correct fs-verity digest"
);
let mut reader = repo.open_stream("test-external", Some(&stream_id), None)?;
let mut output = Vec::new();
reader.cat(&repo, &mut output)?;
assert_eq!(output.len(), large_content.len());
assert_eq!(
output, large_content,
"large external content must roundtrip exactly"
);
Ok(())
}
#[test]
fn test_splitstream_mixed_content() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let header = generate_test_data(512, 0x01);
let file_content = generate_test_data(64 * 1024, 0x02);
let trailer = generate_test_data(1024, 0x03);
let expected_digest: Sha256HashValue = compute_verity(&file_content);
let mut writer = repo.create_stream(0)?;
writer.write_inline(&header);
writer.write_external(&file_content)?;
writer.write_inline(&trailer);
let stream_id = repo.write_stream(writer, "test-mixed", None)?;
let mut reader = repo.open_stream("test-mixed", Some(&stream_id), None)?;
let mut refs = Vec::new();
reader.get_object_refs(|id| refs.push(id.clone()))?;
assert_eq!(refs.len(), 1);
assert_eq!(
refs[0], expected_digest,
"external object must have correct fs-verity digest"
);
let mut reader = repo.open_stream("test-mixed", Some(&stream_id), None)?;
let mut output = Vec::new();
reader.cat(&repo, &mut output)?;
let mut expected = header.clone();
expected.extend(&file_content);
expected.extend(&trailer);
assert_eq!(output.len(), expected.len());
assert_eq!(output, expected, "mixed content must roundtrip exactly");
Ok(())
}
#[test]
fn test_splitstream_multiple_externals() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let file1 = generate_test_data(32 * 1024, 0x10);
let file2 = generate_test_data(256 * 1024, 0x20);
let file3 = generate_test_data(8 * 1024, 0x30);
let separator = generate_test_data(64, 0xFF);
let expected_digest1: Sha256HashValue = compute_verity(&file1);
let expected_digest2: Sha256HashValue = compute_verity(&file2);
let expected_digest3: Sha256HashValue = compute_verity(&file3);
let mut writer = repo.create_stream(0)?;
writer.write_external(&file1)?;
writer.write_inline(&separator);
writer.write_external(&file2)?;
writer.write_inline(&separator);
writer.write_external(&file3)?;
let stream_id = repo.write_stream(writer, "test-multi", None)?;
let mut reader = repo.open_stream("test-multi", Some(&stream_id), None)?;
let mut refs = Vec::new();
reader.get_object_refs(|id| refs.push(id.clone()))?;
assert_eq!(refs.len(), 3);
assert_eq!(refs[0], expected_digest1, "file1 digest mismatch");
assert_eq!(refs[1], expected_digest2, "file2 digest mismatch");
assert_eq!(refs[2], expected_digest3, "file3 digest mismatch");
let mut reader = repo.open_stream("test-multi", Some(&stream_id), None)?;
let mut output = Vec::new();
reader.cat(&repo, &mut output)?;
let mut expected = file1.clone();
expected.extend(&separator);
expected.extend(&file2);
expected.extend(&separator);
expected.extend(&file3);
assert_eq!(output.len(), expected.len());
assert_eq!(
output, expected,
"multiple externals must roundtrip exactly"
);
Ok(())
}
#[test]
fn test_splitstream_deduplication() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let repeated_chunk = generate_test_data(64 * 1024, 0xDE);
let unique_chunk = generate_test_data(32 * 1024, 0xAD);
let repeated_digest: Sha256HashValue = compute_verity(&repeated_chunk);
let unique_digest: Sha256HashValue = compute_verity(&unique_chunk);
let mut writer = repo.create_stream(0)?;
writer.write_external(&repeated_chunk)?;
writer.write_external(&unique_chunk)?;
writer.write_external(&repeated_chunk)?; writer.write_external(&repeated_chunk)?; let stream_id = repo.write_stream(writer, "test-dedup", None)?;
let mut reader = repo.open_stream("test-dedup", Some(&stream_id), None)?;
let mut refs = Vec::new();
reader.get_object_refs(|id| refs.push(id.clone()))?;
assert_eq!(refs.len(), 2, "should only have 2 unique object refs");
assert_eq!(
refs[0], repeated_digest,
"first ref should be repeated chunk"
);
assert_eq!(refs[1], unique_digest, "second ref should be unique chunk");
let mut reader = repo.open_stream("test-dedup", Some(&stream_id), None)?;
let mut output = Vec::new();
reader.cat(&repo, &mut output)?;
let mut expected = repeated_chunk.clone();
expected.extend(&unique_chunk);
expected.extend(&repeated_chunk);
expected.extend(&repeated_chunk);
assert_eq!(output.len(), expected.len());
assert_eq!(
output, expected,
"deduplicated content must still roundtrip exactly"
);
Ok(())
}
#[test]
fn test_splitstream_get_object_refs() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let chunk1 = generate_test_data(16 * 1024, 0x11);
let chunk2 = generate_test_data(24 * 1024, 0x22);
let inline_data = generate_test_data(128, 0x33);
let expected_digest1: Sha256HashValue = compute_verity(&chunk1);
let expected_digest2: Sha256HashValue = compute_verity(&chunk2);
let mut writer = repo.create_stream(0)?;
writer.write_inline(&inline_data);
writer.write_external(&chunk1)?;
writer.write_external(&chunk2)?;
let stream_id = repo.write_stream(writer, "test-refs", None)?;
let mut reader = repo.open_stream("test-refs", Some(&stream_id), None)?;
let mut refs = Vec::new();
reader.get_object_refs(|id| refs.push(id.clone()))?;
assert_eq!(refs.len(), 2);
assert_eq!(refs[0], expected_digest1, "chunk1 digest mismatch");
assert_eq!(refs[1], expected_digest2, "chunk2 digest mismatch");
let obj1 = repo.read_object(&refs[0])?;
let obj2 = repo.read_object(&refs[1])?;
assert_eq!(obj1, chunk1, "first external reference must match");
assert_eq!(obj2, chunk2, "second external reference must match");
Ok(())
}
#[test]
fn test_splitstream_boundary_sizes() -> Result<()> {
let sizes = [4095, 4096, 4097, 65535, 65536, 65537];
for size in sizes {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let data = generate_test_data(size, size as u8);
let expected_digest: Sha256HashValue = compute_verity(&data);
let mut writer = repo.create_stream(0)?;
writer.write_external(&data)?;
let stream_id = repo.write_stream(writer, "test-boundary", None)?;
let mut reader = repo.open_stream("test-boundary", Some(&stream_id), None)?;
let mut refs = Vec::new();
reader.get_object_refs(|id| refs.push(id.clone()))?;
assert_eq!(refs.len(), 1);
assert_eq!(
refs[0], expected_digest,
"size {} must have correct digest",
size
);
let mut reader = repo.open_stream("test-boundary", Some(&stream_id), None)?;
let mut output = Vec::new();
reader.cat(&repo, &mut output)?;
assert_eq!(
output.len(),
data.len(),
"size {} must roundtrip with correct length",
size
);
assert_eq!(output, data, "size {} must roundtrip exactly", size);
}
Ok(())
}
#[test]
fn test_splitstream_content_type() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let content_type = 0xDEADBEEF_u64;
let mut writer = repo.create_stream(content_type)?;
writer.write_inline(b"test data");
let stream_id = repo.write_stream(writer, "test-ctype", None)?;
let reader = repo.open_stream("test-ctype", Some(&stream_id), Some(content_type))?;
assert_eq!(reader.content_type, content_type);
Ok(())
}
#[test]
fn test_splitstream_total_size_tracking() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let inline_data = generate_test_data(100, 0x01);
let external_data = generate_test_data(1000, 0x02);
let mut writer = repo.create_stream(0)?;
writer.write_inline(&inline_data);
writer.write_external(&external_data)?;
let stream_id = repo.write_stream(writer, "test-size", None)?;
let reader = repo.open_stream("test-size", Some(&stream_id), None)?;
assert_eq!(reader.total_size, 1100, "total size should be tracked");
Ok(())
}
#[test]
fn test_old_format_header_differs_from_new() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let mut writer = repo.create_stream(0)?;
writer.write_inline(b"hello");
let stream_id = repo.write_stream(writer, "test-old-hdr", None)?;
let new_bytes = repo.read_object(&stream_id)?;
let old_bytes = new_to_old_format(&new_bytes);
assert_eq!(&new_bytes[..11], b"SplitStream");
assert_ne!(
&old_bytes[..11],
b"SplitStream",
"old format should NOT start with magic"
);
Ok(())
}
#[test]
fn test_read_old_format_splitstream() -> Result<()> {
use std::io::Write as _;
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let inline_data = b"hello";
let mut writer = repo.create_stream(0)?;
writer.write_inline(inline_data);
let stream_id = repo.write_stream(writer, "test-old-read", None)?;
let new_bytes = repo.read_object(&stream_id)?;
let old_bytes = new_to_old_format(&new_bytes);
let mut tmpfile = tempfile::NamedTempFile::new()?;
tmpfile.write_all(&old_bytes)?;
tmpfile.flush()?;
let file = std::fs::File::open(tmpfile.path())?;
let mut old_reader = SplitStreamReader::<Sha256HashValue>::new(file, None)?;
assert_eq!(old_reader.total_size, inline_data.len() as u64);
assert_eq!(old_reader.content_type, 0);
let mut old_output = Vec::new();
std::io::Read::read_to_end(&mut old_reader, &mut old_output)?;
let mut new_reader = repo.open_stream("test-old-read", Some(&stream_id), None)?;
let mut new_output = Vec::new();
std::io::Read::read_to_end(&mut new_reader, &mut new_output)?;
assert_eq!(
old_output, new_output,
"old and new format must produce identical data"
);
assert_eq!(&old_output, inline_data);
Ok(())
}
#[test]
fn test_new_format_still_works() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let inline_data = b"world";
let mut writer = repo.create_stream(0)?;
writer.write_inline(inline_data);
let stream_id = repo.write_stream(writer, "test-new-fmt", None)?;
let raw_bytes = repo.read_object(&stream_id)?;
assert_eq!(&raw_bytes[..11], b"SplitStream");
let mut reader = repo.open_stream("test-new-fmt", Some(&stream_id), None)?;
assert_eq!(reader.total_size, inline_data.len() as u64);
let mut output = Vec::new();
std::io::Read::read_to_end(&mut reader, &mut output)?;
assert_eq!(&output, inline_data);
Ok(())
}
}