obj_core/codec/mod.rs
1//! Document codec (L4) — per-document header + postcard payload.
2//!
3//! See `docs/format.md` § Document records for the authoritative
4//! byte-by-byte specification. This module is the reference
5//! implementation.
6//!
7//! The codec sits between the typed `Document` API (this module) and
8//! the B+tree (L3): every stored value in a collection's primary
9//! B-tree is the byte string produced by [`encode`], and every
10//! `Db::get` decodes through [`decode`]. The catalog (L5) supplies
11//! the `collection_id` that the per-document header pins; the
12//! catalog itself stores `CollectionDescriptor`s through this same
13//! codec (M5 issue #38).
14//!
15//! # Power-of-ten posture
16//!
17//! - **Rule 5.** Encode and decode are runtime boundaries: every
18//! header field is validated explicitly before any `postcard` call
19//! touches caller-controlled bytes. CRC32C, collection-id, and
20//! version-range checks are the three layers of defense in
21//! [`decode`].
22//! - **Rule 7.** No `unwrap` / `expect` on any error-bearing path;
23//! postcard errors propagate via the `?` operator into
24//! [`Error::Codec`].
25//! - **Rule 9.** Hot-path dispatch is static — all codec calls are
26//! monomorphised over `T: Document`. No `dyn` anywhere in this
27//! module.
28
29#![forbid(unsafe_code)]
30
31pub mod dynamic;
32pub mod header;
33pub mod migrate;
34pub mod schema;
35
36pub use crate::codec::dynamic::Dynamic;
37pub use crate::codec::header::{DocumentHeader, DOC_HEADER_SIZE, MAX_INLINE_DOC};
38pub use crate::codec::migrate::Migrate;
39pub use crate::codec::schema::{DynamicSchema, EnumVariantSchema, Schema, MAX_SCHEMA_DEPTH};
40
41use crate::error::{Error, Result};
42use crate::pager::checksum::crc32c;
43
44use serde::de::DeserializeOwned;
45use serde::Serialize;
46
47/// The trait every user document type implements.
48///
49/// `Document` types are `serde::Serialize + DeserializeOwned` so
50/// they round-trip through postcard. Each implementation provides
51/// two associated constants:
52///
53/// - [`COLLECTION`](Document::COLLECTION) — the collection name
54/// under which records of this type are stored. The catalog
55/// resolves it to a numeric `collection_id` at registration time;
56/// the codec takes that id as an argument to [`encode`]/[`decode`].
57/// - [`VERSION`](Document::VERSION) — the type's schema version.
58/// Stored in every record's header; the decoder routes a stored-
59/// version mismatch through [`Document::migrate`].
60///
61/// `Document` is `'static` so type-erased catalog rows can carry the
62/// collection name as `&'static str`. M5 ships hand-impls; M9
63/// introduces a `#[derive(obj::Document)]` proc macro.
64pub trait Document: Serialize + DeserializeOwned + 'static {
65 /// The collection name this document type stores into.
66 ///
67 /// Must be a stable, application-chosen identifier. The
68 /// catalog resolves it to a `collection_id` on first
69 /// registration; subsequent opens reuse the existing id.
70 const COLLECTION: &'static str;
71
72 /// The schema version of this `Document` implementation.
73 ///
74 /// Bump on any breaking change (added/removed/renamed fields,
75 /// changed semantics). The decoder enforces:
76 ///
77 /// - `header.type_version < VERSION` → dispatch through
78 /// [`Document::migrate`].
79 /// - `header.type_version == VERSION` → decode directly.
80 /// - `header.type_version > VERSION` →
81 /// [`Error::SchemaVersionFromFuture`].
82 const VERSION: u32;
83
84 /// Transform an older stored record into `Self`.
85 ///
86 /// The codec calls [`migrate`](Document::migrate) when a stored
87 /// record's `type_version` is strictly less than [`VERSION`](Document::VERSION).
88 /// `dynamic` is a structured [`Dynamic`] view of the older
89 /// record — the codec walks the on-disk payload through the
90 /// schema registered for `from_version` (see
91 /// [`historical_schemas`](Document::historical_schemas)) and
92 /// hands the resulting map-shaped `Dynamic` to this method.
93 /// Concrete overrides read the fields they care about with
94 /// [`Dynamic::get`](crate::codec::Dynamic::get) /
95 /// [`Dynamic::get_str`](crate::codec::Dynamic::get_str) /
96 /// [`Dynamic::deserialize`](crate::codec::Dynamic::deserialize)
97 /// and construct the target `Self`.
98 ///
99 /// `from_version` is the on-disk `type_version` — always `<
100 /// Self::VERSION` when this is invoked by the codec.
101 ///
102 /// # Default body
103 ///
104 /// Returns [`Error::SchemaMigrationNotImplemented`]. Real types
105 /// override this method to handle older versions.
106 ///
107 /// # Errors
108 ///
109 /// User overrides MAY return any [`Error`] variant. The default
110 /// returns [`Error::SchemaMigrationNotImplemented`].
111 fn migrate(_dynamic: crate::codec::Dynamic, from_version: u32) -> Result<Self> {
112 Err(Error::SchemaMigrationNotImplemented {
113 collection: Self::COLLECTION,
114 from_version,
115 to_version: Self::VERSION,
116 })
117 }
118
119 /// Schemas for stored records of OLDER `type_version`s than
120 /// [`VERSION`](Document::VERSION).
121 ///
122 /// Returns a list of `(version, schema)` pairs sorted strictly
123 /// ascending by version. The codec consults this list whenever
124 /// it observes `header.type_version < Self::VERSION`:
125 ///
126 /// 1. Look up the matching `version`.
127 /// 2. Walk the on-disk payload bytes through
128 /// [`Dynamic::from_postcard_bytes`](crate::codec::Dynamic::from_postcard_bytes)
129 /// using the registered `schema`.
130 /// 3. Hand the resulting structured `Dynamic` to
131 /// [`migrate`](Document::migrate) along with the stored
132 /// version.
133 ///
134 /// A miss (no entry for the stored version) surfaces as
135 /// [`Error::SchemaNotRegistered`].
136 /// The default body returns an empty list — a `Document` with
137 /// no `historical_schemas()` cannot migrate any older payload.
138 ///
139 /// # Ordering
140 ///
141 /// The returned slice MUST be sorted strictly ascending by
142 /// version. The codec debug-asserts on read; out-of-order
143 /// entries are a programming bug.
144 ///
145 /// # Power-of-ten posture
146 ///
147 /// - **Rule 9.** Static dispatch — concrete `Document`
148 /// implementations return their own `Vec<(u32, DynamicSchema)>`;
149 /// the codec never reaches for a `dyn Trait`.
150 /// - **Rule 7.** Empty default is intentional — a
151 /// `Document` that has never been versioned cannot have
152 /// historical schemas.
153 #[must_use]
154 fn historical_schemas() -> Vec<(u32, crate::codec::schema::DynamicSchema)> {
155 Vec::new()
156 }
157
158 /// Declared secondary indexes for this `Document` type.
159 ///
160 /// Default body returns the empty vector — no indexes. Override
161 /// to declare per-collection indexes; the catalog reconciler
162 /// (M7 #57) compares this list against the catalog's stored
163 /// descriptors on the FIRST `WriteTxn::collection::<T>()` call
164 /// per process per collection and:
165 ///
166 /// - declares specs absent from the catalog,
167 /// - flips active descriptors absent from this list to
168 /// `DroppedPending`,
169 /// - leaves unchanged matches alone (idempotent).
170 ///
171 /// The reconciler runs inside the user's WAL transaction so a
172 /// rolled-back txn leaves the catalog clean.
173 ///
174 /// `&self` is intentionally **not** taken — indexes are a
175 /// type-level property of the `Document`, not a per-instance
176 /// one.
177 #[must_use]
178 fn indexes() -> Vec<crate::index::IndexSpec> {
179 Vec::new()
180 }
181}
182
183/// Encode `doc` into the on-disk record format.
184///
185/// Layout: `DocumentHeader` (16 bytes) followed by
186/// `postcard::to_allocvec(doc)`. Returns the assembled bytes in a
187/// fresh `Vec<u8>`; allocation is unavoidable here because the
188/// payload length is not known until postcard runs.
189///
190/// # Errors
191///
192/// - [`Error::Codec`] if postcard encoding fails.
193/// - [`Error::DocumentTooLarge`] if the resulting record exceeds
194/// [`MAX_INLINE_DOC`] — overflow chains for oversize records are
195/// deferred to a later format-minor (see `docs/format.md`
196/// § Document records).
197pub fn encode<T: Document>(doc: &T, collection_id: u32) -> Result<Vec<u8>> {
198 let payload = postcard::to_allocvec(doc)?;
199 let payload_len = u32::try_from(payload.len()).map_err(|_| Error::DocumentTooLarge {
200 len: payload.len(),
201 max: MAX_INLINE_DOC,
202 })?;
203 let payload_crc32c = crc32c(&payload);
204 let header = DocumentHeader {
205 collection_id,
206 type_version: T::VERSION,
207 payload_len,
208 payload_crc32c,
209 };
210 let total = DOC_HEADER_SIZE
211 .checked_add(payload.len())
212 .ok_or(Error::DocumentTooLarge {
213 len: usize::MAX,
214 max: MAX_INLINE_DOC,
215 })?;
216 if total > MAX_INLINE_DOC {
217 return Err(Error::DocumentTooLarge {
218 len: total,
219 max: MAX_INLINE_DOC,
220 });
221 }
222 let mut out = Vec::with_capacity(total);
223 header.write_to(&mut out);
224 out.extend_from_slice(&payload);
225 debug_assert_eq!(out.len(), total, "encode: assembled size mismatch");
226 Ok(out)
227}
228
229/// Decode an on-disk record into a `T: Document` instance.
230///
231/// Validates the per-document header at the runtime boundary
232/// (power-of-ten Rule 5):
233///
234/// 1. `bytes.len() >= DOC_HEADER_SIZE`.
235/// 2. `header.collection_id == expected_collection_id`.
236/// 3. `bytes.len() == DOC_HEADER_SIZE + header.payload_len`.
237/// 4. CRC32C of the payload matches `header.payload_crc32c`.
238/// 5. `header.type_version <= T::VERSION` (no future versions).
239/// 6. If `header.type_version < T::VERSION`, the codec consults
240/// `T::historical_schemas()` for the matching version, walks
241/// the payload through that schema via
242/// [`Dynamic::from_postcard_bytes`](crate::codec::Dynamic::from_postcard_bytes),
243/// and dispatches into [`Migrate::migrate`]
244/// with the structured `Dynamic` (M10 #83). If no schema is
245/// registered for `header.type_version`, the codec returns
246/// [`Error::SchemaNotRegistered`] without invoking `migrate` —
247/// silent fallback hides schema-evolution bugs.
248/// Otherwise (versions match) the payload is decoded directly
249/// via `postcard::from_bytes::<T>(payload)`.
250///
251/// # Errors
252///
253/// - [`Error::Corruption`] with `page_id = 0` on a malformed header
254/// or CRC mismatch (the codec does not know the page id; callers
255/// that need a specific id should wrap or re-emit).
256/// - [`Error::CollectionIdMismatch`] on a collection-id mismatch.
257/// - [`Error::SchemaVersionFromFuture`] on a stored record newer
258/// than `T::VERSION`.
259/// - [`Error::SchemaNotRegistered`] when the stored
260/// `type_version` is older than `T::VERSION` and
261/// `T::historical_schemas()` has no entry for it.
262/// - [`Error::SchemaMigrationNotImplemented`] when the registered
263/// `Migrate::migrate` body returns the default error.
264/// - [`Error::SchemaTypeMismatch`] / [`Error::SchemaDepthExceeded`]
265/// on a schema / payload disagreement.
266/// - [`Error::Codec`] on postcard decode failures.
267pub fn decode<T: Document>(bytes: &[u8], expected_collection_id: u32) -> Result<T> {
268 let header = DocumentHeader::read_from(bytes)?;
269 if header.collection_id != expected_collection_id {
270 return Err(Error::CollectionIdMismatch {
271 expected: expected_collection_id,
272 found: header.collection_id,
273 });
274 }
275 let payload_len =
276 usize::try_from(header.payload_len).map_err(|_| Error::Corruption { page_id: 0 })?;
277 let total = DOC_HEADER_SIZE
278 .checked_add(payload_len)
279 .ok_or(Error::Corruption { page_id: 0 })?;
280 if bytes.len() != total {
281 return Err(Error::Corruption { page_id: 0 });
282 }
283 let payload = &bytes[DOC_HEADER_SIZE..total];
284 if crc32c(payload) != header.payload_crc32c {
285 return Err(Error::Corruption { page_id: 0 });
286 }
287 if header.type_version > T::VERSION {
288 return Err(Error::SchemaVersionFromFuture {
289 collection: T::COLLECTION,
290 from: header.type_version,
291 to: T::VERSION,
292 });
293 }
294 if header.type_version < T::VERSION {
295 // Migration dispatch (#36 → #83). The codec walks the
296 // payload bytes through the schema registered for
297 // `header.type_version` and hands the resulting structured
298 // `Dynamic` to `T::migrate`. Concrete types override
299 // `Document::migrate` to perform the actual conversion;
300 // the default body errors with
301 // `Error::SchemaMigrationNotImplemented`.
302 //
303 // Power-of-ten Rule 5: every entry in
304 // `historical_schemas()` is sorted ascending — assert here
305 // so a malformed registry surfaces during development.
306 let history = <T as Document>::historical_schemas();
307 debug_assert!(
308 history.windows(2).all(|w| w[0].0 < w[1].0),
309 "historical_schemas() must be strictly ascending by version",
310 );
311 let schema = history
312 .iter()
313 .find(|(v, _)| *v == header.type_version)
314 .map(|(_, s)| s)
315 .ok_or(Error::SchemaNotRegistered {
316 collection: T::COLLECTION,
317 version: header.type_version,
318 })?;
319 let dynamic = Dynamic::from_postcard_bytes(payload, schema)?;
320 return <T as Migrate>::migrate(dynamic, header.type_version);
321 }
322 postcard::from_bytes::<T>(payload).map_err(Error::from)
323}
324
325#[cfg(test)]
326mod tests {
327 use super::*;
328 use serde::{Deserialize, Serialize};
329
330 #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
331 struct TinyDoc {
332 a: u32,
333 b: String,
334 }
335
336 impl Document for TinyDoc {
337 const COLLECTION: &'static str = "tiny";
338 const VERSION: u32 = 1;
339 }
340
341 #[test]
342 fn round_trip_small_document() {
343 let d = TinyDoc {
344 a: 42,
345 b: "hello".to_owned(),
346 };
347 let bytes = encode(&d, 7).expect("encode");
348 let back: TinyDoc = decode(&bytes, 7).expect("decode");
349 assert_eq!(back, d);
350 }
351
352 #[test]
353 fn collection_id_mismatch_errors() {
354 let d = TinyDoc {
355 a: 1,
356 b: "x".to_owned(),
357 };
358 let bytes = encode(&d, 7).expect("encode");
359 let err = decode::<TinyDoc>(&bytes, 9).expect_err("mismatched id");
360 assert!(matches!(
361 err,
362 Error::CollectionIdMismatch {
363 expected: 9,
364 found: 7
365 }
366 ));
367 }
368
369 #[test]
370 fn crc_mismatch_errors() {
371 let d = TinyDoc {
372 a: 1,
373 b: "x".to_owned(),
374 };
375 let mut bytes = encode(&d, 7).expect("encode");
376 bytes[DOC_HEADER_SIZE] ^= 0xFF;
377 let err = decode::<TinyDoc>(&bytes, 7).expect_err("crc mismatch");
378 assert!(matches!(err, Error::Corruption { page_id: 0 }));
379 }
380
381 #[test]
382 fn truncated_payload_errors() {
383 let d = TinyDoc {
384 a: 1,
385 b: "x".to_owned(),
386 };
387 let bytes = encode(&d, 7).expect("encode");
388 let truncated = &bytes[..bytes.len() - 1];
389 let err = decode::<TinyDoc>(truncated, 7).expect_err("truncated");
390 assert!(matches!(err, Error::Corruption { page_id: 0 }));
391 }
392
393 #[test]
394 fn header_too_short_errors() {
395 let bytes = [0u8; DOC_HEADER_SIZE - 1];
396 let err = decode::<TinyDoc>(&bytes, 0).expect_err("short header");
397 assert!(matches!(err, Error::Corruption { page_id: 0 }));
398 }
399
400 #[test]
401 fn oversize_document_errors() {
402 #[derive(Serialize, Deserialize)]
403 struct Big {
404 blob: Vec<u8>,
405 }
406 impl Document for Big {
407 const COLLECTION: &'static str = "big";
408 const VERSION: u32 = 1;
409 }
410 let huge: Vec<u8> = vec![0xAB; MAX_INLINE_DOC + 64];
411 let big = Big { blob: huge };
412 let err = encode(&big, 1).expect_err("oversize");
413 match err {
414 Error::DocumentTooLarge { len, max } => {
415 assert!(len > max, "len {len} should exceed max {max}");
416 assert_eq!(max, MAX_INLINE_DOC);
417 }
418 other => panic!("expected DocumentTooLarge, got {other:?}"),
419 }
420 }
421
422 #[test]
423 fn future_version_errors() {
424 let d = TinyDoc {
425 a: 1,
426 b: "x".to_owned(),
427 };
428 let mut bytes = encode(&d, 7).expect("encode");
429 bytes[4..8].copy_from_slice(&99u32.to_le_bytes());
430 let err = decode::<TinyDoc>(&bytes, 7).expect_err("future");
431 assert!(matches!(
432 err,
433 Error::SchemaVersionFromFuture {
434 collection: "tiny",
435 from: 99,
436 to: 1
437 }
438 ));
439 }
440
441 #[test]
442 fn missing_schema_errors_schema_not_registered() {
443 // Hand-construct a v0 record (TinyDoc::VERSION = 1) so the
444 // decoder dispatches into the migration path. TinyDoc has
445 // no `historical_schemas()` override, so the codec surfaces
446 // SchemaNotRegistered before reaching `migrate`.
447 let payload = postcard::to_allocvec(&TinyDoc {
448 a: 1,
449 b: "x".to_owned(),
450 })
451 .expect("postcard");
452 let payload_crc = crc32c(&payload);
453 let header = DocumentHeader {
454 collection_id: 7,
455 type_version: 0,
456 payload_len: u32::try_from(payload.len()).expect("fits u32"),
457 payload_crc32c: payload_crc,
458 };
459 let mut bytes = Vec::with_capacity(DOC_HEADER_SIZE + payload.len());
460 header.write_to(&mut bytes);
461 bytes.extend_from_slice(&payload);
462 let err = decode::<TinyDoc>(&bytes, 7).expect_err("v0 stored, v1 reader");
463 assert!(matches!(
464 err,
465 Error::SchemaNotRegistered {
466 collection: "tiny",
467 version: 0,
468 }
469 ));
470 }
471
472 // Migration override test — a v2 doc that adds a new field,
473 // with a hand-impl Migrate (via the inherent
474 // Document::migrate override).
475 //
476 // M5 #36 had this test using `Dynamic::Bytes(payload)` as the
477 // migrate input. M10 #83 replaces that with the schema-driven
478 // structured Dynamic: the codec consults
479 // `historical_schemas()` to find the v1 schema and passes a
480 // structured `Dynamic::Map` to the override.
481 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
482 struct EvolvingV1 {
483 a: u32,
484 }
485
486 impl Document for EvolvingV1 {
487 const COLLECTION: &'static str = "evolving";
488 const VERSION: u32 = 1;
489 }
490
491 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
492 struct EvolvingV2 {
493 a: u32,
494 b: String,
495 }
496
497 impl Document for EvolvingV2 {
498 const COLLECTION: &'static str = "evolving";
499 const VERSION: u32 = 2;
500
501 fn historical_schemas() -> Vec<(u32, crate::codec::schema::DynamicSchema)> {
502 vec![(
503 1,
504 crate::codec::schema::DynamicSchema::map([(
505 "a",
506 crate::codec::schema::DynamicSchema::U64,
507 )]),
508 )]
509 }
510
511 fn migrate(dynamic: crate::codec::Dynamic, from_version: u32) -> Result<Self> {
512 if from_version != 1 {
513 return Err(Error::SchemaMigrationNotImplemented {
514 collection: Self::COLLECTION,
515 from_version,
516 to_version: Self::VERSION,
517 });
518 }
519 let a = match dynamic.get("a") {
520 Some(crate::codec::Dynamic::U64(n)) => {
521 u32::try_from(*n).map_err(|_| Error::SchemaMigrationNotImplemented {
522 collection: Self::COLLECTION,
523 from_version,
524 to_version: Self::VERSION,
525 })?
526 }
527 _ => {
528 return Err(Error::SchemaMigrationNotImplemented {
529 collection: Self::COLLECTION,
530 from_version,
531 to_version: Self::VERSION,
532 });
533 }
534 };
535 Ok(EvolvingV2 {
536 a,
537 b: "<default>".to_owned(),
538 })
539 }
540 }
541
542 #[test]
543 fn migrate_override_lifts_v1_to_v2() {
544 // 1. Encode a v1 record. We can't use `encode<EvolvingV1>`
545 // directly because EvolvingV2 occupies the "evolving"
546 // collection at compile time; instead we hand-build the
547 // record at type_version = 1 by re-using the codec's
548 // header layout.
549 let v1 = EvolvingV1 { a: 99 };
550 let payload = postcard::to_allocvec(&v1).expect("postcard");
551 let header = DocumentHeader {
552 collection_id: 13,
553 type_version: 1,
554 payload_len: u32::try_from(payload.len()).expect("fits u32"),
555 payload_crc32c: crc32c(&payload),
556 };
557 let mut record = Vec::with_capacity(DOC_HEADER_SIZE + payload.len());
558 header.write_to(&mut record);
559 record.extend_from_slice(&payload);
560
561 // 2. Decode as v2 — the codec spots type_version < VERSION
562 // and dispatches through EvolvingV2::migrate after
563 // walking the v1 schema.
564 let decoded: EvolvingV2 = decode(&record, 13).expect("migrate succeeds");
565 assert_eq!(
566 decoded,
567 EvolvingV2 {
568 a: 99,
569 b: "<default>".to_owned(),
570 }
571 );
572 }
573
574 #[test]
575 fn current_version_does_not_route_through_migrate() {
576 // Sanity check: a v2 record (matching EvolvingV2::VERSION) is
577 // decoded directly by postcard, NOT through `migrate`.
578 let v2 = EvolvingV2 {
579 a: 7,
580 b: "in-band".to_owned(),
581 };
582 let bytes = encode(&v2, 13).expect("encode");
583 let back: EvolvingV2 = decode(&bytes, 13).expect("decode");
584 assert_eq!(back, v2);
585 }
586
587 // M10 #83: unregistered historical version → SchemaNotRegistered.
588 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
589 struct UnregisteredV2 {
590 a: u32,
591 }
592
593 impl Document for UnregisteredV2 {
594 // Distinct collection name so the catalog dispatch does not
595 // collide with EvolvingV2.
596 const COLLECTION: &'static str = "unregistered";
597 const VERSION: u32 = 2;
598
599 // No historical_schemas() override — the default empty Vec
600 // applies. Decoding a v1 record must surface
601 // SchemaNotRegistered.
602 fn migrate(_dynamic: crate::codec::Dynamic, _from_version: u32) -> Result<Self> {
603 // Never reached — the codec errors before dispatch.
604 unimplemented!("not reached when no schema is registered")
605 }
606 }
607
608 #[test]
609 fn missing_history_entry_errors_schema_not_registered() {
610 let payload = postcard::to_allocvec(&UnregisteredV2 { a: 1 }).expect("postcard");
611 let header = DocumentHeader {
612 collection_id: 17,
613 type_version: 1,
614 payload_len: u32::try_from(payload.len()).expect("fits u32"),
615 payload_crc32c: crc32c(&payload),
616 };
617 let mut record = Vec::with_capacity(DOC_HEADER_SIZE + payload.len());
618 header.write_to(&mut record);
619 record.extend_from_slice(&payload);
620 let err = decode::<UnregisteredV2>(&record, 17).expect_err("unregistered");
621 assert!(matches!(
622 err,
623 Error::SchemaNotRegistered {
624 collection: "unregistered",
625 version: 1,
626 }
627 ));
628 }
629}