#![forbid(unsafe_code)]
pub mod dynamic;
pub mod header;
pub mod migrate;
pub mod schema;
pub use crate::codec::dynamic::Dynamic;
pub use crate::codec::header::{DocumentHeader, DOC_HEADER_SIZE, MAX_INLINE_DOC};
pub use crate::codec::migrate::Migrate;
pub use crate::codec::schema::{DynamicSchema, EnumVariantSchema, Schema, MAX_SCHEMA_DEPTH};
use crate::error::{Error, Result};
use crate::pager::checksum::crc32c;
use serde::de::DeserializeOwned;
use serde::Serialize;
pub trait Document: Serialize + DeserializeOwned + 'static {
const COLLECTION: &'static str;
const VERSION: u32;
fn migrate(_dynamic: crate::codec::Dynamic, from_version: u32) -> Result<Self> {
Err(Error::SchemaMigrationNotImplemented {
collection: Self::COLLECTION,
from_version,
to_version: Self::VERSION,
})
}
#[must_use]
fn historical_schemas() -> Vec<(u32, crate::codec::schema::DynamicSchema)> {
Vec::new()
}
#[must_use]
fn indexes() -> Vec<crate::index::IndexSpec> {
Vec::new()
}
}
pub fn encode<T: Document>(doc: &T, collection_id: u32) -> Result<Vec<u8>> {
let payload = postcard::to_allocvec(doc)?;
let payload_len = u32::try_from(payload.len()).map_err(|_| Error::DocumentTooLarge {
len: payload.len(),
max: MAX_INLINE_DOC,
})?;
let payload_crc32c = crc32c(&payload);
let header = DocumentHeader {
collection_id,
type_version: T::VERSION,
payload_len,
payload_crc32c,
};
let total = DOC_HEADER_SIZE
.checked_add(payload.len())
.ok_or(Error::DocumentTooLarge {
len: usize::MAX,
max: MAX_INLINE_DOC,
})?;
if total > MAX_INLINE_DOC {
return Err(Error::DocumentTooLarge {
len: total,
max: MAX_INLINE_DOC,
});
}
let mut out = Vec::with_capacity(total);
header.write_to(&mut out);
out.extend_from_slice(&payload);
debug_assert_eq!(out.len(), total, "encode: assembled size mismatch");
Ok(out)
}
pub fn decode<T: Document>(bytes: &[u8], expected_collection_id: u32) -> Result<T> {
let header = DocumentHeader::read_from(bytes)?;
if header.collection_id != expected_collection_id {
return Err(Error::CollectionIdMismatch {
expected: expected_collection_id,
found: header.collection_id,
});
}
let payload_len =
usize::try_from(header.payload_len).map_err(|_| Error::Corruption { page_id: 0 })?;
let total = DOC_HEADER_SIZE
.checked_add(payload_len)
.ok_or(Error::Corruption { page_id: 0 })?;
if bytes.len() != total {
return Err(Error::Corruption { page_id: 0 });
}
let payload = &bytes[DOC_HEADER_SIZE..total];
if crc32c(payload) != header.payload_crc32c {
return Err(Error::Corruption { page_id: 0 });
}
if header.type_version > T::VERSION {
return Err(Error::SchemaVersionFromFuture {
collection: T::COLLECTION,
from: header.type_version,
to: T::VERSION,
});
}
if header.type_version < T::VERSION {
let history = <T as Document>::historical_schemas();
debug_assert!(
history.windows(2).all(|w| w[0].0 < w[1].0),
"historical_schemas() must be strictly ascending by version",
);
let schema = history
.iter()
.find(|(v, _)| *v == header.type_version)
.map(|(_, s)| s)
.ok_or(Error::SchemaNotRegistered {
collection: T::COLLECTION,
version: header.type_version,
})?;
let dynamic = Dynamic::from_postcard_bytes(payload, schema)?;
return <T as Migrate>::migrate(dynamic, header.type_version);
}
postcard::from_bytes::<T>(payload).map_err(Error::from)
}
#[cfg(test)]
mod tests {
use super::*;
use serde::{Deserialize, Serialize};
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
struct TinyDoc {
a: u32,
b: String,
}
impl Document for TinyDoc {
const COLLECTION: &'static str = "tiny";
const VERSION: u32 = 1;
}
#[test]
fn round_trip_small_document() {
let d = TinyDoc {
a: 42,
b: "hello".to_owned(),
};
let bytes = encode(&d, 7).expect("encode");
let back: TinyDoc = decode(&bytes, 7).expect("decode");
assert_eq!(back, d);
}
#[test]
fn collection_id_mismatch_errors() {
let d = TinyDoc {
a: 1,
b: "x".to_owned(),
};
let bytes = encode(&d, 7).expect("encode");
let err = decode::<TinyDoc>(&bytes, 9).expect_err("mismatched id");
assert!(matches!(
err,
Error::CollectionIdMismatch {
expected: 9,
found: 7
}
));
}
#[test]
fn crc_mismatch_errors() {
let d = TinyDoc {
a: 1,
b: "x".to_owned(),
};
let mut bytes = encode(&d, 7).expect("encode");
bytes[DOC_HEADER_SIZE] ^= 0xFF;
let err = decode::<TinyDoc>(&bytes, 7).expect_err("crc mismatch");
assert!(matches!(err, Error::Corruption { page_id: 0 }));
}
#[test]
fn truncated_payload_errors() {
let d = TinyDoc {
a: 1,
b: "x".to_owned(),
};
let bytes = encode(&d, 7).expect("encode");
let truncated = &bytes[..bytes.len() - 1];
let err = decode::<TinyDoc>(truncated, 7).expect_err("truncated");
assert!(matches!(err, Error::Corruption { page_id: 0 }));
}
#[test]
fn header_too_short_errors() {
let bytes = [0u8; DOC_HEADER_SIZE - 1];
let err = decode::<TinyDoc>(&bytes, 0).expect_err("short header");
assert!(matches!(err, Error::Corruption { page_id: 0 }));
}
#[test]
fn oversize_document_errors() {
#[derive(Serialize, Deserialize)]
struct Big {
blob: Vec<u8>,
}
impl Document for Big {
const COLLECTION: &'static str = "big";
const VERSION: u32 = 1;
}
let huge: Vec<u8> = vec![0xAB; MAX_INLINE_DOC + 64];
let big = Big { blob: huge };
let err = encode(&big, 1).expect_err("oversize");
match err {
Error::DocumentTooLarge { len, max } => {
assert!(len > max, "len {len} should exceed max {max}");
assert_eq!(max, MAX_INLINE_DOC);
}
other => panic!("expected DocumentTooLarge, got {other:?}"),
}
}
#[test]
fn future_version_errors() {
let d = TinyDoc {
a: 1,
b: "x".to_owned(),
};
let mut bytes = encode(&d, 7).expect("encode");
bytes[4..8].copy_from_slice(&99u32.to_le_bytes());
let err = decode::<TinyDoc>(&bytes, 7).expect_err("future");
assert!(matches!(
err,
Error::SchemaVersionFromFuture {
collection: "tiny",
from: 99,
to: 1
}
));
}
#[test]
fn missing_schema_errors_schema_not_registered() {
let payload = postcard::to_allocvec(&TinyDoc {
a: 1,
b: "x".to_owned(),
})
.expect("postcard");
let payload_crc = crc32c(&payload);
let header = DocumentHeader {
collection_id: 7,
type_version: 0,
payload_len: u32::try_from(payload.len()).expect("fits u32"),
payload_crc32c: payload_crc,
};
let mut bytes = Vec::with_capacity(DOC_HEADER_SIZE + payload.len());
header.write_to(&mut bytes);
bytes.extend_from_slice(&payload);
let err = decode::<TinyDoc>(&bytes, 7).expect_err("v0 stored, v1 reader");
assert!(matches!(
err,
Error::SchemaNotRegistered {
collection: "tiny",
version: 0,
}
));
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
struct EvolvingV1 {
a: u32,
}
impl Document for EvolvingV1 {
const COLLECTION: &'static str = "evolving";
const VERSION: u32 = 1;
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
struct EvolvingV2 {
a: u32,
b: String,
}
impl Document for EvolvingV2 {
const COLLECTION: &'static str = "evolving";
const VERSION: u32 = 2;
fn historical_schemas() -> Vec<(u32, crate::codec::schema::DynamicSchema)> {
vec![(
1,
crate::codec::schema::DynamicSchema::map([(
"a",
crate::codec::schema::DynamicSchema::U64,
)]),
)]
}
fn migrate(dynamic: crate::codec::Dynamic, from_version: u32) -> Result<Self> {
if from_version != 1 {
return Err(Error::SchemaMigrationNotImplemented {
collection: Self::COLLECTION,
from_version,
to_version: Self::VERSION,
});
}
let a = match dynamic.get("a") {
Some(crate::codec::Dynamic::U64(n)) => {
u32::try_from(*n).map_err(|_| Error::SchemaMigrationNotImplemented {
collection: Self::COLLECTION,
from_version,
to_version: Self::VERSION,
})?
}
_ => {
return Err(Error::SchemaMigrationNotImplemented {
collection: Self::COLLECTION,
from_version,
to_version: Self::VERSION,
});
}
};
Ok(EvolvingV2 {
a,
b: "<default>".to_owned(),
})
}
}
#[test]
fn migrate_override_lifts_v1_to_v2() {
let v1 = EvolvingV1 { a: 99 };
let payload = postcard::to_allocvec(&v1).expect("postcard");
let header = DocumentHeader {
collection_id: 13,
type_version: 1,
payload_len: u32::try_from(payload.len()).expect("fits u32"),
payload_crc32c: crc32c(&payload),
};
let mut record = Vec::with_capacity(DOC_HEADER_SIZE + payload.len());
header.write_to(&mut record);
record.extend_from_slice(&payload);
let decoded: EvolvingV2 = decode(&record, 13).expect("migrate succeeds");
assert_eq!(
decoded,
EvolvingV2 {
a: 99,
b: "<default>".to_owned(),
}
);
}
#[test]
fn current_version_does_not_route_through_migrate() {
let v2 = EvolvingV2 {
a: 7,
b: "in-band".to_owned(),
};
let bytes = encode(&v2, 13).expect("encode");
let back: EvolvingV2 = decode(&bytes, 13).expect("decode");
assert_eq!(back, v2);
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
struct UnregisteredV2 {
a: u32,
}
impl Document for UnregisteredV2 {
const COLLECTION: &'static str = "unregistered";
const VERSION: u32 = 2;
fn migrate(_dynamic: crate::codec::Dynamic, _from_version: u32) -> Result<Self> {
unimplemented!("not reached when no schema is registered")
}
}
#[test]
fn missing_history_entry_errors_schema_not_registered() {
let payload = postcard::to_allocvec(&UnregisteredV2 { a: 1 }).expect("postcard");
let header = DocumentHeader {
collection_id: 17,
type_version: 1,
payload_len: u32::try_from(payload.len()).expect("fits u32"),
payload_crc32c: crc32c(&payload),
};
let mut record = Vec::with_capacity(DOC_HEADER_SIZE + payload.len());
header.write_to(&mut record);
record.extend_from_slice(&payload);
let err = decode::<UnregisteredV2>(&record, 17).expect_err("unregistered");
assert!(matches!(
err,
Error::SchemaNotRegistered {
collection: "unregistered",
version: 1,
}
));
}
}