use crate::delta;
use crate::hash::{self, Hash};
use crate::object::{MkitError, Object};
use crate::store::{MAX_RAW_OBJECT_SIZE, ObjectStore};
use std::sync::Arc;
pub const MAGIC: &[u8; 4] = b"MKIT";
pub const VERSION: u32 = 1;
pub const MAX_ENTRIES: u32 = 10_000_000;
pub const MAX_TOTAL_PAYLOAD: u64 = 4 * 1024 * 1024 * 1024;
pub const TRAILER_LEN: usize = 32;
pub const HEADER_LEN: usize = 4 + 4 + 4;
pub const ENTRY_FRAME_LEN: usize = 1 + 4;
#[derive(Debug, thiserror::Error)]
pub enum PackError {
#[error("packfile is shorter than the {HEADER_LEN}-byte header + {TRAILER_LEN}-byte trailer")]
PackfileTooShort,
#[error("first 4 bytes are not ASCII \"MKIT\"")]
InvalidMagic,
#[error("version {0} is not supported (v1 only)")]
UnsupportedVersion(u32),
#[error("entry_type {0:#04x} is not 0x00 (raw) or 0x02 (delta)")]
InvalidEntryType(u8),
#[error("entry_count {0} exceeds the {MAX_ENTRIES} cap")]
TooManyObjects(u32),
#[error("sum of payload_len exceeds {MAX_TOTAL_PAYLOAD} bytes")]
PackfileTooLarge,
#[error("entry payload extends past the trailer offset")]
UnexpectedEof,
#[error("trailer BLAKE3 mismatch — packfile is corrupt or truncated")]
PackfileCorrupted,
#[error("delta entry references base hash {0} which is not in this pack or the store")]
DeltaBaseMissing(String),
#[error("delta entry payload is shorter than the 32-byte base hash prefix")]
DeltaEntryTruncated,
#[error("delta reconstruction failed: {0}")]
DeltaApply(#[from] MkitError),
#[error("pack entry is not a canonical storable object: {0}")]
InvalidObject(MkitError),
#[error("pack entry resolves to pack-only delta object")]
NonStorableObject,
#[error("pack contains trailing bytes after declared entries")]
TrailingData,
#[error("store I/O failure: {0}")]
Store(#[from] crate::store::StoreError),
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct UnpackReport {
pub raw_count: u32,
pub delta_count: u32,
pub stored: Vec<Hash>,
}
#[derive(Debug, Default)]
pub struct PackWriter {
entries: Vec<(u8, Vec<u8>)>,
total_payload: u64,
}
impl PackWriter {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn push_raw(&mut self, hash_of_bytes: Hash, bytes: Vec<u8>) -> Result<Hash, PackError> {
self.check_caps_for(bytes.len())?;
self.total_payload += bytes.len() as u64;
self.entries.push((0x00, bytes));
Ok(hash_of_bytes)
}
pub fn push_delta(&mut self, base_hash: &Hash, delta_stream: &[u8]) -> Result<(), PackError> {
let payload_len = TRAILER_LEN + delta_stream.len();
self.check_caps_for(payload_len)?;
let mut payload = Vec::with_capacity(payload_len);
payload.extend_from_slice(base_hash);
payload.extend_from_slice(delta_stream);
self.total_payload += payload.len() as u64;
self.entries.push((0x02, payload));
Ok(())
}
fn check_caps_for(&self, add_len: usize) -> Result<(), PackError> {
let next_count = self.entries.len() as u64 + 1;
if next_count > u64::from(MAX_ENTRIES) {
return Err(PackError::TooManyObjects(MAX_ENTRIES + 1));
}
let next_total = self.total_payload.saturating_add(add_len as u64);
if next_total > MAX_TOTAL_PAYLOAD {
return Err(PackError::PackfileTooLarge);
}
Ok(())
}
#[must_use]
pub fn entry_count(&self) -> usize {
self.entries.len()
}
pub fn finish(self) -> Result<Vec<u8>, PackError> {
let count: u32 = self
.entries
.len()
.try_into()
.map_err(|_| PackError::TooManyObjects(MAX_ENTRIES + 1))?;
if count > MAX_ENTRIES {
return Err(PackError::TooManyObjects(count));
}
let mut size = HEADER_LEN + TRAILER_LEN;
for (_, p) in &self.entries {
size += ENTRY_FRAME_LEN + p.len();
}
let mut buf = Vec::with_capacity(size);
buf.extend_from_slice(MAGIC);
buf.extend_from_slice(&VERSION.to_le_bytes());
buf.extend_from_slice(&count.to_le_bytes());
for (etype, payload) in self.entries {
buf.push(etype);
let plen: u32 = payload
.len()
.try_into()
.map_err(|_| PackError::PackfileTooLarge)?;
buf.extend_from_slice(&plen.to_le_bytes());
buf.extend_from_slice(&payload);
}
let trailer = hash::hash(&buf);
buf.extend_from_slice(&trailer);
Ok(buf)
}
}
#[must_use]
pub fn pack_key(pack_bytes: &[u8]) -> Hash {
hash::hash(pack_bytes)
}
#[derive(Debug)]
pub struct PackReader;
impl PackReader {
pub fn read(pack_bytes: &[u8], store: &ObjectStore) -> Result<UnpackReport, PackError> {
if pack_bytes.len() < HEADER_LEN + TRAILER_LEN {
return Err(PackError::PackfileTooShort);
}
if &pack_bytes[..4] != MAGIC.as_slice() {
return Err(PackError::InvalidMagic);
}
let version = u32::from_le_bytes(pack_bytes[4..8].try_into().expect("4 bytes"));
if version != VERSION {
return Err(PackError::UnsupportedVersion(version));
}
let split = pack_bytes.len() - TRAILER_LEN;
let body = &pack_bytes[..split];
let trailer = &pack_bytes[split..];
let computed = hash::hash(body);
if computed.as_slice() != trailer {
return Err(PackError::PackfileCorrupted);
}
let count = u32::from_le_bytes(pack_bytes[8..12].try_into().expect("4 bytes"));
if count > MAX_ENTRIES {
return Err(PackError::TooManyObjects(count));
}
let body_after_header = body.len() - HEADER_LEN;
if u64::from(count) * ENTRY_FRAME_LEN as u64 > body_after_header as u64 {
return Err(PackError::TooManyObjects(count));
}
let mut report = UnpackReport::default();
let mut pending_writes: Vec<(Hash, Arc<[u8]>)> = Vec::new();
let mut in_pack: std::collections::HashMap<Hash, Arc<[u8]>> =
std::collections::HashMap::new();
let mut total_payload: u64 = 0;
let mut pos = HEADER_LEN;
for _ in 0..count {
if pos + ENTRY_FRAME_LEN > split {
return Err(PackError::UnexpectedEof);
}
let etype = pack_bytes[pos];
pos += 1;
let payload_len =
u32::from_le_bytes(pack_bytes[pos..pos + 4].try_into().expect("4 bytes")) as usize;
pos += 4;
total_payload = total_payload.saturating_add(payload_len as u64);
if total_payload > MAX_TOTAL_PAYLOAD {
return Err(PackError::PackfileTooLarge);
}
if pos + payload_len > split {
return Err(PackError::UnexpectedEof);
}
let payload = &pack_bytes[pos..pos + payload_len];
pos += payload_len;
match etype {
0x00 => {
validate_storable_object(payload)?;
let stored_hash = hash::hash(payload);
let bytes: Arc<[u8]> = Arc::from(payload);
in_pack.insert(stored_hash, Arc::clone(&bytes));
pending_writes.push((stored_hash, bytes));
report.raw_count += 1;
report.stored.push(stored_hash);
}
0x02 => {
if payload.len() < TRAILER_LEN {
return Err(PackError::DeltaEntryTruncated);
}
let mut base_hash = [0u8; 32];
base_hash.copy_from_slice(&payload[..TRAILER_LEN]);
let stream = &payload[TRAILER_LEN..];
let base_bytes: std::borrow::Cow<'_, [u8]> =
if let Some(b) = in_pack.get(&base_hash) {
std::borrow::Cow::Borrowed(b.as_ref())
} else if store.contains(&base_hash) {
let bytes = store.read(&base_hash)?;
validate_storable_object(&bytes)?;
std::borrow::Cow::Owned(bytes)
} else {
return Err(PackError::DeltaBaseMissing(hash::to_hex(&base_hash)));
};
validate_delta_result_size(stream)?;
let resolved = delta::decode(base_bytes.as_ref(), stream)?;
validate_storable_object(&resolved)?;
let stored_hash = hash::hash(&resolved);
let bytes: Arc<[u8]> = Arc::from(resolved);
in_pack.insert(stored_hash, Arc::clone(&bytes));
pending_writes.push((stored_hash, bytes));
report.delta_count += 1;
report.stored.push(stored_hash);
}
0x01 => return Err(PackError::InvalidEntryType(0x01)),
other => return Err(PackError::InvalidEntryType(other)),
}
}
if pos != split {
return Err(PackError::TrailingData);
}
let batch = store.batch();
for (h, bytes) in pending_writes {
batch.write_prehashed(h, &[&bytes])?;
}
batch.commit()?;
Ok(report)
}
}
fn validate_storable_object(bytes: &[u8]) -> Result<(), PackError> {
if bytes.len() > MAX_RAW_OBJECT_SIZE {
return Err(PackError::Store(crate::store::StoreError::ObjectTooLarge));
}
match crate::serialize::deserialize(bytes).map_err(PackError::InvalidObject)? {
Object::Delta(_) => Err(PackError::NonStorableObject),
Object::Blob(_)
| Object::Tree(_)
| Object::Commit(_)
| Object::Remix(_)
| Object::ChunkedBlob(_)
| Object::Tag(_) => Ok(()),
}
}
fn validate_delta_result_size(stream: &[u8]) -> Result<(), PackError> {
if stream.len() < delta::HEADER_LEN {
return Err(PackError::DeltaApply(MkitError::UnexpectedEof));
}
let result_len = u32::from_le_bytes(stream[5..9].try_into().expect("4 bytes")) as usize;
if result_len > MAX_RAW_OBJECT_SIZE {
return Err(PackError::Store(crate::store::StoreError::ObjectTooLarge));
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn fresh_store() -> (TempDir, ObjectStore) {
let dir = TempDir::new().unwrap();
let store = ObjectStore::init(dir.path()).unwrap();
(dir, store)
}
fn write_blob_via_serialize(payload: &[u8]) -> Vec<u8> {
let blob = crate::object::Object::Blob(crate::object::Blob {
data: payload.to_vec(),
});
crate::serialize::serialize(&blob).expect("serialize blob")
}
fn finish_pack_body(mut body: Vec<u8>) -> Vec<u8> {
let trailer = hash::hash(&body);
body.extend_from_slice(&trailer);
body
}
#[test]
fn empty_pack_is_44_bytes() {
let pack = PackWriter::new().finish().unwrap();
assert_eq!(pack.len(), HEADER_LEN + TRAILER_LEN);
assert_eq!(&pack[..4], MAGIC);
assert_eq!(u32::from_le_bytes(pack[4..8].try_into().unwrap()), VERSION);
assert_eq!(u32::from_le_bytes(pack[8..12].try_into().unwrap()), 0);
let (_dir, store) = fresh_store();
let report = PackReader::read(&pack, &store).unwrap();
assert_eq!(report.raw_count, 0);
assert_eq!(report.delta_count, 0);
assert!(report.stored.is_empty());
}
#[test]
fn unpack_writes_objects_via_single_batch_flush() {
use crate::batch::testing::{Ev, RecordingSyncer};
use std::sync::Arc;
let mut w = PackWriter::new();
let mut blobs = Vec::new();
for i in 0u32..30 {
let blob = write_blob_via_serialize(format!("pack object {i}").as_bytes());
w.push_raw(hash::hash(&blob), blob.clone()).unwrap();
blobs.push(blob);
}
let pack = w.finish().unwrap();
let (_dir, mut store) = fresh_store();
let rec = Arc::new(RecordingSyncer::default());
store.set_syncer(rec.clone());
let report = PackReader::read(&pack, &store).unwrap();
assert_eq!(report.raw_count, 30);
let fulls = rec
.events()
.iter()
.filter(|e| matches!(e, Ev::Full(_)))
.count();
assert_eq!(
fulls, 2,
"unpack flush cost must be constant, not O(objects)"
);
for blob in &blobs {
assert_eq!(store.read(&hash::hash(blob)).unwrap(), *blob);
}
}
#[test]
fn single_raw_roundtrip() {
let blob = write_blob_via_serialize(b"hello packfile");
let h = hash::hash(&blob);
let mut w = PackWriter::new();
w.push_raw(h, blob.clone()).unwrap();
let pack = w.finish().unwrap();
let (_dir, store) = fresh_store();
let report = PackReader::read(&pack, &store).unwrap();
assert_eq!(report.raw_count, 1);
assert_eq!(report.delta_count, 0);
assert_eq!(report.stored, vec![h]);
assert_eq!(store.read(&h).unwrap(), blob);
}
#[test]
fn raw_then_delta_resolves_in_pack() {
let mut content_base = vec![0u8; 1024];
for (i, b) in content_base.iter_mut().enumerate() {
*b = u8::try_from(i % 251).expect("modulo < 256");
}
let mut content_target = content_base.clone();
content_target[500] = 0xFF;
content_target[501] = 0xFE;
let base_obj = write_blob_via_serialize(&content_base);
let target_obj = write_blob_via_serialize(&content_target);
let base_hash = hash::hash(&base_obj);
let target_hash = hash::hash(&target_obj);
let stream = delta::encode(&base_obj, &target_obj).unwrap();
let mut w = PackWriter::new();
w.push_raw(base_hash, base_obj.clone()).unwrap();
w.push_delta(&base_hash, &stream).unwrap();
let pack = w.finish().unwrap();
let (_dir, store) = fresh_store();
let report = PackReader::read(&pack, &store).unwrap();
assert_eq!(report.raw_count, 1);
assert_eq!(report.delta_count, 1);
assert_eq!(report.stored, vec![base_hash, target_hash]);
assert_eq!(store.read(&target_hash).unwrap(), target_obj);
}
#[test]
fn rejects_raw_payload_that_is_not_canonical_object_without_store_write() {
let payload = b"not a serialized mkit object".to_vec();
let payload_hash = hash::hash(&payload);
let mut body = Vec::new();
body.extend_from_slice(MAGIC);
body.extend_from_slice(&VERSION.to_le_bytes());
body.extend_from_slice(&1u32.to_le_bytes());
body.push(0x00);
let payload_len = u32::try_from(payload.len()).unwrap();
body.extend_from_slice(&payload_len.to_le_bytes());
body.extend_from_slice(&payload);
let pack = finish_pack_body(body);
let (_dir, store) = fresh_store();
let err = PackReader::read(&pack, &store).unwrap_err();
assert!(matches!(err, PackError::InvalidObject(_)), "got {err:?}");
assert!(!store.contains(&payload_hash));
}
#[test]
fn rejects_raw_delta_object_without_store_write() {
let delta = crate::object::Object::Delta(crate::object::Delta {
base_hash: [0xAB; 32],
result_size: 0,
instructions: Vec::new(),
});
let payload = crate::serialize::serialize(&delta).unwrap();
let payload_hash = hash::hash(&payload);
let mut w = PackWriter::new();
w.push_raw(payload_hash, payload).unwrap();
let pack = w.finish().unwrap();
let (_dir, store) = fresh_store();
let err = PackReader::read(&pack, &store).unwrap_err();
assert!(matches!(err, PackError::NonStorableObject), "got {err:?}");
assert!(!store.contains(&payload_hash));
}
#[test]
fn rejects_delta_resolving_to_non_object_without_partial_store_write() {
let base_obj = write_blob_via_serialize(b"base bytes");
let base_hash = hash::hash(&base_obj);
let invalid_target = b"not a serialized object".to_vec();
let invalid_hash = hash::hash(&invalid_target);
let stream = delta::encode(&base_obj, &invalid_target).unwrap();
let mut w = PackWriter::new();
w.push_raw(base_hash, base_obj).unwrap();
w.push_delta(&base_hash, &stream).unwrap();
let pack = w.finish().unwrap();
let (_dir, store) = fresh_store();
let err = PackReader::read(&pack, &store).unwrap_err();
assert!(matches!(err, PackError::InvalidObject(_)), "got {err:?}");
assert!(!store.contains(&base_hash));
assert!(!store.contains(&invalid_hash));
}
#[test]
fn rejects_delta_result_over_object_cap_without_partial_store_write() {
let base_obj = write_blob_via_serialize(b"base bytes");
let base_hash = hash::hash(&base_obj);
let mut stream = Vec::new();
stream.push(delta::STREAM_VERSION);
stream.extend_from_slice(&u32::try_from(base_obj.len()).unwrap().to_le_bytes());
stream.extend_from_slice(
&u32::try_from(MAX_RAW_OBJECT_SIZE + 1)
.unwrap()
.to_le_bytes(),
);
let mut w = PackWriter::new();
w.push_raw(base_hash, base_obj).unwrap();
w.push_delta(&base_hash, &stream).unwrap();
let pack = w.finish().unwrap();
let (_dir, store) = fresh_store();
let err = PackReader::read(&pack, &store).unwrap_err();
assert!(
matches!(
err,
PackError::Store(crate::store::StoreError::ObjectTooLarge)
),
"got {err:?}"
);
assert!(!store.contains(&base_hash));
}
#[test]
fn rejects_trailing_bytes_after_declared_entries_without_store_write() {
let blob = write_blob_via_serialize(b"trailing bytes test");
let blob_hash = hash::hash(&blob);
let mut body = Vec::new();
body.extend_from_slice(MAGIC);
body.extend_from_slice(&VERSION.to_le_bytes());
body.extend_from_slice(&1u32.to_le_bytes());
body.push(0x00);
let blob_len = u32::try_from(blob.len()).unwrap();
body.extend_from_slice(&blob_len.to_le_bytes());
body.extend_from_slice(&blob);
body.extend_from_slice(b"junk");
let pack = finish_pack_body(body);
let (_dir, store) = fresh_store();
let err = PackReader::read(&pack, &store).unwrap_err();
assert!(matches!(err, PackError::TrailingData), "got {err:?}");
assert!(!store.contains(&blob_hash));
}
#[test]
fn rejects_invalid_magic() {
let mut pack = PackWriter::new().finish().unwrap();
pack[0] = b'X';
pack[1] = b'X';
pack[2] = b'X';
pack[3] = b'X';
let (_dir, store) = fresh_store();
let err = PackReader::read(&pack, &store).unwrap_err();
assert!(matches!(err, PackError::InvalidMagic));
}
#[test]
fn rejects_unknown_version() {
let mut pack = PackWriter::new().finish().unwrap();
pack[4] = 99;
let (_dir, store) = fresh_store();
let err = PackReader::read(&pack, &store).unwrap_err();
assert!(matches!(err, PackError::UnsupportedVersion(99)));
}
#[test]
fn rejects_truncated_pack() {
let pack = vec![b'M', b'K']; let (_dir, store) = fresh_store();
let err = PackReader::read(&pack, &store).unwrap_err();
assert!(matches!(err, PackError::PackfileTooShort));
}
#[test]
fn rejects_bit_flipped_trailer() {
let blob = write_blob_via_serialize(b"trailer test");
let h = hash::hash(&blob);
let mut w = PackWriter::new();
w.push_raw(h, blob).unwrap();
let mut pack = w.finish().unwrap();
let last = pack.len() - 1;
pack[last] ^= 0x01; let (_dir, store) = fresh_store();
let err = PackReader::read(&pack, &store).unwrap_err();
assert!(matches!(err, PackError::PackfileCorrupted));
}
#[test]
fn rejects_reserved_entry_type_0x01() {
let mut buf = Vec::new();
buf.extend_from_slice(MAGIC);
buf.extend_from_slice(&VERSION.to_le_bytes());
buf.extend_from_slice(&1u32.to_le_bytes());
buf.push(0x01); buf.extend_from_slice(&0u32.to_le_bytes()); let trailer = hash::hash(&buf);
buf.extend_from_slice(&trailer);
let (_dir, store) = fresh_store();
let err = PackReader::read(&buf, &store).unwrap_err();
assert!(matches!(err, PackError::InvalidEntryType(0x01)));
}
#[test]
fn rejects_unknown_entry_type() {
let mut buf = Vec::new();
buf.extend_from_slice(MAGIC);
buf.extend_from_slice(&VERSION.to_le_bytes());
buf.extend_from_slice(&1u32.to_le_bytes());
buf.push(0x77); buf.extend_from_slice(&0u32.to_le_bytes());
let trailer = hash::hash(&buf);
buf.extend_from_slice(&trailer);
let (_dir, store) = fresh_store();
let err = PackReader::read(&buf, &store).unwrap_err();
assert!(matches!(err, PackError::InvalidEntryType(0x77)));
}
#[test]
fn delta_base_missing_is_loud() {
let mut fake_base = [0u8; 32];
fake_base[0] = 0xAB;
let mut stream = Vec::new();
stream.push(0x01); stream.extend_from_slice(&0u32.to_le_bytes()); stream.extend_from_slice(&0u32.to_le_bytes()); let mut w = PackWriter::new();
w.push_delta(&fake_base, &stream).unwrap();
let pack = w.finish().unwrap();
let (_dir, store) = fresh_store();
let err = PackReader::read(&pack, &store).unwrap_err();
assert!(matches!(err, PackError::DeltaBaseMissing(_)), "got {err:?}");
}
#[test]
fn entry_payload_past_trailer_rejected() {
let mut buf = Vec::new();
buf.extend_from_slice(MAGIC);
buf.extend_from_slice(&VERSION.to_le_bytes());
buf.extend_from_slice(&1u32.to_le_bytes());
buf.push(0x00);
buf.extend_from_slice(&1_000_000u32.to_le_bytes());
let trailer = hash::hash(&buf);
buf.extend_from_slice(&trailer);
let (_dir, store) = fresh_store();
let err = PackReader::read(&buf, &store).unwrap_err();
assert!(matches!(err, PackError::UnexpectedEof));
}
#[test]
fn entry_count_over_cap_rejected() {
let mut buf = Vec::new();
buf.extend_from_slice(MAGIC);
buf.extend_from_slice(&VERSION.to_le_bytes());
buf.extend_from_slice(&u32::MAX.to_le_bytes());
let trailer = hash::hash(&buf);
buf.extend_from_slice(&trailer);
let (_dir, store) = fresh_store();
let err = PackReader::read(&buf, &store).unwrap_err();
assert!(
matches!(err, PackError::TooManyObjects(_)),
"expected TooManyObjects, got {err:?}"
);
}
#[test]
fn pack_key_is_blake3_of_pack_bytes() {
let blob = write_blob_via_serialize(b"key test");
let h = hash::hash(&blob);
let mut w = PackWriter::new();
w.push_raw(h, blob).unwrap();
let pack = w.finish().unwrap();
assert_eq!(pack_key(&pack), hash::hash(&pack));
}
#[test]
fn delta_resolves_against_pre_existing_store_object() {
let (_dir, store) = fresh_store();
let mut content_base = vec![0u8; 256];
for (i, b) in content_base.iter_mut().enumerate() {
*b = u8::try_from(i % 251).expect("modulo < 256");
}
let base_obj = write_blob_via_serialize(&content_base);
let base_hash = store.write(&base_obj).unwrap();
let mut content_target = content_base.clone();
content_target[100] = 0xAA;
let target_obj = write_blob_via_serialize(&content_target);
let target_hash = hash::hash(&target_obj);
let stream = delta::encode(&base_obj, &target_obj).unwrap();
let mut w = PackWriter::new();
w.push_delta(&base_hash, &stream).unwrap();
let pack = w.finish().unwrap();
let report = PackReader::read(&pack, &store).unwrap();
assert_eq!(report.delta_count, 1);
assert_eq!(report.raw_count, 0);
assert_eq!(store.read(&target_hash).unwrap(), target_obj);
}
}