Skip to main content

obj_core/codec/
mod.rs

1//! Document codec (L4) — per-document header + postcard payload.
2//!
3//! See `docs/format.md` § Document records for the authoritative
4//! byte-by-byte specification. This module is the reference
5//! implementation.
6//!
7//! The codec sits between the typed `Document` API (this module) and
8//! the B+tree (L3): every stored value in a collection's primary
9//! B-tree is the byte string produced by [`encode`], and every
10//! `Db::get` decodes through [`decode`]. The catalog (L5) supplies
11//! the `collection_id` that the per-document header pins; the
12//! catalog itself stores `CollectionDescriptor`s through this same
13//! codec (M5 issue #38).
14//!
15//! # Power-of-ten posture
16//!
17//! - **Rule 5.** Encode and decode are runtime boundaries: every
18//!   header field is validated explicitly before any `postcard` call
19//!   touches caller-controlled bytes. CRC32C, collection-id, and
20//!   version-range checks are the three layers of defense in
21//!   [`decode`].
22//! - **Rule 7.** No `unwrap` / `expect` on any error-bearing path;
23//!   postcard errors propagate via the `?` operator into
24//!   [`Error::Codec`].
25//! - **Rule 9.** Hot-path dispatch is static — all codec calls are
26//!   monomorphised over `T: Document`. No `dyn` anywhere in this
27//!   module.
28
29#![forbid(unsafe_code)]
30
31pub mod dynamic;
32pub mod header;
33pub mod migrate;
34pub mod schema;
35
36pub use crate::codec::dynamic::Dynamic;
37pub use crate::codec::header::{DocumentHeader, DOC_HEADER_SIZE, MAX_INLINE_DOC};
38pub use crate::codec::migrate::Migrate;
39pub use crate::codec::schema::{DynamicSchema, EnumVariantSchema, Schema, MAX_SCHEMA_DEPTH};
40
41use crate::error::{Error, Result};
42use crate::pager::checksum::crc32c;
43
44use serde::de::DeserializeOwned;
45use serde::Serialize;
46
47/// The trait every user document type implements.
48///
49/// `Document` types are `serde::Serialize + DeserializeOwned` so
50/// they round-trip through postcard. Each implementation provides
51/// two associated constants:
52///
53/// - [`COLLECTION`](Document::COLLECTION) — the collection name
54///   under which records of this type are stored. The catalog
55///   resolves it to a numeric `collection_id` at registration time;
56///   the codec takes that id as an argument to [`encode`]/[`decode`].
57/// - [`VERSION`](Document::VERSION) — the type's schema version.
58///   Stored in every record's header; the decoder routes a stored-
59///   version mismatch through [`Document::migrate`].
60///
61/// `Document` is `'static` so type-erased catalog rows can carry the
62/// collection name as `&'static str`. M5 ships hand-impls; M9
63/// introduces a `#[derive(obj::Document)]` proc macro.
64pub trait Document: Serialize + DeserializeOwned + 'static {
65    /// The collection name this document type stores into.
66    ///
67    /// Must be a stable, application-chosen identifier. The
68    /// catalog resolves it to a `collection_id` on first
69    /// registration; subsequent opens reuse the existing id.
70    const COLLECTION: &'static str;
71
72    /// The schema version of this `Document` implementation.
73    ///
74    /// Bump on any breaking change (added/removed/renamed fields,
75    /// changed semantics). The decoder enforces:
76    ///
77    /// - `header.type_version < VERSION` → dispatch through
78    ///   [`Document::migrate`].
79    /// - `header.type_version == VERSION` → decode directly.
80    /// - `header.type_version > VERSION` →
81    ///   [`Error::SchemaVersionFromFuture`].
82    const VERSION: u32;
83
84    /// Transform an older stored record into `Self`.
85    ///
86    /// The codec calls [`migrate`](Document::migrate) when a stored
87    /// record's `type_version` is strictly less than [`VERSION`](Document::VERSION).
88    /// `dynamic` is a structured [`Dynamic`] view of the older
89    /// record — the codec walks the on-disk payload through the
90    /// schema registered for `from_version` (see
91    /// [`historical_schemas`](Document::historical_schemas)) and
92    /// hands the resulting map-shaped `Dynamic` to this method.
93    /// Concrete overrides read the fields they care about with
94    /// [`Dynamic::get`](crate::codec::Dynamic::get) /
95    /// [`Dynamic::get_str`](crate::codec::Dynamic::get_str) /
96    /// [`Dynamic::deserialize`](crate::codec::Dynamic::deserialize)
97    /// and construct the target `Self`.
98    ///
99    /// `from_version` is the on-disk `type_version` — always `<
100    /// Self::VERSION` when this is invoked by the codec.
101    ///
102    /// # Default body
103    ///
104    /// Returns [`Error::SchemaMigrationNotImplemented`]. Real types
105    /// override this method to handle older versions.
106    ///
107    /// # Errors
108    ///
109    /// User overrides MAY return any [`Error`] variant. The default
110    /// returns [`Error::SchemaMigrationNotImplemented`].
111    fn migrate(_dynamic: crate::codec::Dynamic, from_version: u32) -> Result<Self> {
112        Err(Error::SchemaMigrationNotImplemented {
113            collection: Self::COLLECTION,
114            from_version,
115            to_version: Self::VERSION,
116        })
117    }
118
119    /// Schemas for stored records of OLDER `type_version`s than
120    /// [`VERSION`](Document::VERSION).
121    ///
122    /// Returns a list of `(version, schema)` pairs sorted strictly
123    /// ascending by version.  The codec consults this list whenever
124    /// it observes `header.type_version < Self::VERSION`:
125    ///
126    /// 1. Look up the matching `version`.
127    /// 2. Walk the on-disk payload bytes through
128    ///    [`Dynamic::from_postcard_bytes`](crate::codec::Dynamic::from_postcard_bytes)
129    ///    using the registered `schema`.
130    /// 3. Hand the resulting structured `Dynamic` to
131    ///    [`migrate`](Document::migrate) along with the stored
132    ///    version.
133    ///
134    /// A miss (no entry for the stored version) surfaces as
135    /// [`Error::SchemaNotRegistered`].
136    /// The default body returns an empty list — a `Document` with
137    /// no `historical_schemas()` cannot migrate any older payload.
138    ///
139    /// # Ordering
140    ///
141    /// The returned slice MUST be sorted strictly ascending by
142    /// version. The codec debug-asserts on read; out-of-order
143    /// entries are a programming bug.
144    ///
145    /// # Power-of-ten posture
146    ///
147    /// - **Rule 9.** Static dispatch — concrete `Document`
148    ///   implementations return their own `Vec<(u32, DynamicSchema)>`;
149    ///   the codec never reaches for a `dyn Trait`.
150    /// - **Rule 7.** Empty default is intentional — a
151    ///   `Document` that has never been versioned cannot have
152    ///   historical schemas.
153    #[must_use]
154    fn historical_schemas() -> Vec<(u32, crate::codec::schema::DynamicSchema)> {
155        Vec::new()
156    }
157
158    /// Declared secondary indexes for this `Document` type.
159    ///
160    /// Default body returns the empty vector — no indexes. Override
161    /// to declare per-collection indexes; the catalog reconciler
162    /// (M7 #57) compares this list against the catalog's stored
163    /// descriptors on the FIRST `WriteTxn::collection::<T>()` call
164    /// per process per collection and:
165    ///
166    /// - declares specs absent from the catalog,
167    /// - flips active descriptors absent from this list to
168    ///   `DroppedPending`,
169    /// - leaves unchanged matches alone (idempotent).
170    ///
171    /// The reconciler runs inside the user's WAL transaction so a
172    /// rolled-back txn leaves the catalog clean.
173    ///
174    /// `&self` is intentionally **not** taken — indexes are a
175    /// type-level property of the `Document`, not a per-instance
176    /// one.
177    #[must_use]
178    fn indexes() -> Vec<crate::index::IndexSpec> {
179        Vec::new()
180    }
181}
182
183/// Encode `doc` into the on-disk record format.
184///
185/// Layout: `DocumentHeader` (16 bytes) followed by
186/// `postcard::to_allocvec(doc)`. Returns the assembled bytes in a
187/// fresh `Vec<u8>`; allocation is unavoidable here because the
188/// payload length is not known until postcard runs.
189///
190/// # Errors
191///
192/// - [`Error::Codec`] if postcard encoding fails.
193/// - [`Error::DocumentTooLarge`] if the resulting record exceeds
194///   [`MAX_INLINE_DOC`] — overflow chains for oversize records are
195///   deferred to a later format-minor (see `docs/format.md`
196///   § Document records).
197pub fn encode<T: Document>(doc: &T, collection_id: u32) -> Result<Vec<u8>> {
198    let payload = postcard::to_allocvec(doc)?;
199    let payload_len = u32::try_from(payload.len()).map_err(|_| Error::DocumentTooLarge {
200        len: payload.len(),
201        max: MAX_INLINE_DOC,
202    })?;
203    let payload_crc32c = crc32c(&payload);
204    let header = DocumentHeader {
205        collection_id,
206        type_version: T::VERSION,
207        payload_len,
208        payload_crc32c,
209    };
210    let total = DOC_HEADER_SIZE
211        .checked_add(payload.len())
212        .ok_or(Error::DocumentTooLarge {
213            len: usize::MAX,
214            max: MAX_INLINE_DOC,
215        })?;
216    if total > MAX_INLINE_DOC {
217        return Err(Error::DocumentTooLarge {
218            len: total,
219            max: MAX_INLINE_DOC,
220        });
221    }
222    let mut out = Vec::with_capacity(total);
223    header.write_to(&mut out);
224    out.extend_from_slice(&payload);
225    debug_assert_eq!(out.len(), total, "encode: assembled size mismatch");
226    Ok(out)
227}
228
229/// Decode an on-disk record into a `T: Document` instance.
230///
231/// Validates the per-document header at the runtime boundary
232/// (power-of-ten Rule 5):
233///
234/// 1. `bytes.len() >= DOC_HEADER_SIZE`.
235/// 2. `header.collection_id == expected_collection_id`.
236/// 3. `bytes.len() == DOC_HEADER_SIZE + header.payload_len`.
237/// 4. CRC32C of the payload matches `header.payload_crc32c`.
238/// 5. `header.type_version <= T::VERSION` (no future versions).
239/// 6. If `header.type_version < T::VERSION`, the codec consults
240///    `T::historical_schemas()` for the matching version, walks
241///    the payload through that schema via
242///    [`Dynamic::from_postcard_bytes`](crate::codec::Dynamic::from_postcard_bytes),
243///    and dispatches into [`Migrate::migrate`]
244///    with the structured `Dynamic` (M10 #83). If no schema is
245///    registered for `header.type_version`, the codec returns
246///    [`Error::SchemaNotRegistered`] without invoking `migrate` —
247///    silent fallback hides schema-evolution bugs.
248///    Otherwise (versions match) the payload is decoded directly
249///    via `postcard::from_bytes::<T>(payload)`.
250///
251/// # Errors
252///
253/// - [`Error::Corruption`] with `page_id = 0` on a malformed header
254///   or CRC mismatch (the codec does not know the page id; callers
255///   that need a specific id should wrap or re-emit).
256/// - [`Error::CollectionIdMismatch`] on a collection-id mismatch.
257/// - [`Error::SchemaVersionFromFuture`] on a stored record newer
258///   than `T::VERSION`.
259/// - [`Error::SchemaNotRegistered`] when the stored
260///   `type_version` is older than `T::VERSION` and
261///   `T::historical_schemas()` has no entry for it.
262/// - [`Error::SchemaMigrationNotImplemented`] when the registered
263///   `Migrate::migrate` body returns the default error.
264/// - [`Error::SchemaTypeMismatch`] / [`Error::SchemaDepthExceeded`]
265///   on a schema / payload disagreement.
266/// - [`Error::Codec`] on postcard decode failures.
267pub fn decode<T: Document>(bytes: &[u8], expected_collection_id: u32) -> Result<T> {
268    let header = DocumentHeader::read_from(bytes)?;
269    if header.collection_id != expected_collection_id {
270        return Err(Error::CollectionIdMismatch {
271            expected: expected_collection_id,
272            found: header.collection_id,
273        });
274    }
275    let payload_len =
276        usize::try_from(header.payload_len).map_err(|_| Error::Corruption { page_id: 0 })?;
277    let total = DOC_HEADER_SIZE
278        .checked_add(payload_len)
279        .ok_or(Error::Corruption { page_id: 0 })?;
280    if bytes.len() != total {
281        return Err(Error::Corruption { page_id: 0 });
282    }
283    let payload = &bytes[DOC_HEADER_SIZE..total];
284    if crc32c(payload) != header.payload_crc32c {
285        return Err(Error::Corruption { page_id: 0 });
286    }
287    if header.type_version > T::VERSION {
288        return Err(Error::SchemaVersionFromFuture {
289            collection: T::COLLECTION,
290            from: header.type_version,
291            to: T::VERSION,
292        });
293    }
294    if header.type_version < T::VERSION {
295        // Migration dispatch (#36 → #83). The codec walks the
296        // payload bytes through the schema registered for
297        // `header.type_version` and hands the resulting structured
298        // `Dynamic` to `T::migrate`. Concrete types override
299        // `Document::migrate` to perform the actual conversion;
300        // the default body errors with
301        // `Error::SchemaMigrationNotImplemented`.
302        //
303        // Power-of-ten Rule 5: every entry in
304        // `historical_schemas()` is sorted ascending — assert here
305        // so a malformed registry surfaces during development.
306        let history = <T as Document>::historical_schemas();
307        debug_assert!(
308            history.windows(2).all(|w| w[0].0 < w[1].0),
309            "historical_schemas() must be strictly ascending by version",
310        );
311        let schema = history
312            .iter()
313            .find(|(v, _)| *v == header.type_version)
314            .map(|(_, s)| s)
315            .ok_or(Error::SchemaNotRegistered {
316                collection: T::COLLECTION,
317                version: header.type_version,
318            })?;
319        let dynamic = Dynamic::from_postcard_bytes(payload, schema)?;
320        return <T as Migrate>::migrate(dynamic, header.type_version);
321    }
322    postcard::from_bytes::<T>(payload).map_err(Error::from)
323}
324
325#[cfg(test)]
326mod tests {
327    use super::*;
328    use serde::{Deserialize, Serialize};
329
330    #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
331    struct TinyDoc {
332        a: u32,
333        b: String,
334    }
335
336    impl Document for TinyDoc {
337        const COLLECTION: &'static str = "tiny";
338        const VERSION: u32 = 1;
339    }
340
341    #[test]
342    fn round_trip_small_document() {
343        let d = TinyDoc {
344            a: 42,
345            b: "hello".to_owned(),
346        };
347        let bytes = encode(&d, 7).expect("encode");
348        let back: TinyDoc = decode(&bytes, 7).expect("decode");
349        assert_eq!(back, d);
350    }
351
352    #[test]
353    fn collection_id_mismatch_errors() {
354        let d = TinyDoc {
355            a: 1,
356            b: "x".to_owned(),
357        };
358        let bytes = encode(&d, 7).expect("encode");
359        let err = decode::<TinyDoc>(&bytes, 9).expect_err("mismatched id");
360        assert!(matches!(
361            err,
362            Error::CollectionIdMismatch {
363                expected: 9,
364                found: 7
365            }
366        ));
367    }
368
369    #[test]
370    fn crc_mismatch_errors() {
371        let d = TinyDoc {
372            a: 1,
373            b: "x".to_owned(),
374        };
375        let mut bytes = encode(&d, 7).expect("encode");
376        bytes[DOC_HEADER_SIZE] ^= 0xFF;
377        let err = decode::<TinyDoc>(&bytes, 7).expect_err("crc mismatch");
378        assert!(matches!(err, Error::Corruption { page_id: 0 }));
379    }
380
381    #[test]
382    fn truncated_payload_errors() {
383        let d = TinyDoc {
384            a: 1,
385            b: "x".to_owned(),
386        };
387        let bytes = encode(&d, 7).expect("encode");
388        let truncated = &bytes[..bytes.len() - 1];
389        let err = decode::<TinyDoc>(truncated, 7).expect_err("truncated");
390        assert!(matches!(err, Error::Corruption { page_id: 0 }));
391    }
392
393    #[test]
394    fn header_too_short_errors() {
395        let bytes = [0u8; DOC_HEADER_SIZE - 1];
396        let err = decode::<TinyDoc>(&bytes, 0).expect_err("short header");
397        assert!(matches!(err, Error::Corruption { page_id: 0 }));
398    }
399
400    #[test]
401    fn oversize_document_errors() {
402        #[derive(Serialize, Deserialize)]
403        struct Big {
404            blob: Vec<u8>,
405        }
406        impl Document for Big {
407            const COLLECTION: &'static str = "big";
408            const VERSION: u32 = 1;
409        }
410        let huge: Vec<u8> = vec![0xAB; MAX_INLINE_DOC + 64];
411        let big = Big { blob: huge };
412        let err = encode(&big, 1).expect_err("oversize");
413        match err {
414            Error::DocumentTooLarge { len, max } => {
415                assert!(len > max, "len {len} should exceed max {max}");
416                assert_eq!(max, MAX_INLINE_DOC);
417            }
418            other => panic!("expected DocumentTooLarge, got {other:?}"),
419        }
420    }
421
422    #[test]
423    fn future_version_errors() {
424        let d = TinyDoc {
425            a: 1,
426            b: "x".to_owned(),
427        };
428        let mut bytes = encode(&d, 7).expect("encode");
429        bytes[4..8].copy_from_slice(&99u32.to_le_bytes());
430        let err = decode::<TinyDoc>(&bytes, 7).expect_err("future");
431        assert!(matches!(
432            err,
433            Error::SchemaVersionFromFuture {
434                collection: "tiny",
435                from: 99,
436                to: 1
437            }
438        ));
439    }
440
441    #[test]
442    fn missing_schema_errors_schema_not_registered() {
443        // Hand-construct a v0 record (TinyDoc::VERSION = 1) so the
444        // decoder dispatches into the migration path. TinyDoc has
445        // no `historical_schemas()` override, so the codec surfaces
446        // SchemaNotRegistered before reaching `migrate`.
447        let payload = postcard::to_allocvec(&TinyDoc {
448            a: 1,
449            b: "x".to_owned(),
450        })
451        .expect("postcard");
452        let payload_crc = crc32c(&payload);
453        let header = DocumentHeader {
454            collection_id: 7,
455            type_version: 0,
456            payload_len: u32::try_from(payload.len()).expect("fits u32"),
457            payload_crc32c: payload_crc,
458        };
459        let mut bytes = Vec::with_capacity(DOC_HEADER_SIZE + payload.len());
460        header.write_to(&mut bytes);
461        bytes.extend_from_slice(&payload);
462        let err = decode::<TinyDoc>(&bytes, 7).expect_err("v0 stored, v1 reader");
463        assert!(matches!(
464            err,
465            Error::SchemaNotRegistered {
466                collection: "tiny",
467                version: 0,
468            }
469        ));
470    }
471
472    // Migration override test — a v2 doc that adds a new field,
473    // with a hand-impl Migrate (via the inherent
474    // Document::migrate override).
475    //
476    // M5 #36 had this test using `Dynamic::Bytes(payload)` as the
477    // migrate input.  M10 #83 replaces that with the schema-driven
478    // structured Dynamic: the codec consults
479    // `historical_schemas()` to find the v1 schema and passes a
480    // structured `Dynamic::Map` to the override.
481    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
482    struct EvolvingV1 {
483        a: u32,
484    }
485
486    impl Document for EvolvingV1 {
487        const COLLECTION: &'static str = "evolving";
488        const VERSION: u32 = 1;
489    }
490
491    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
492    struct EvolvingV2 {
493        a: u32,
494        b: String,
495    }
496
497    impl Document for EvolvingV2 {
498        const COLLECTION: &'static str = "evolving";
499        const VERSION: u32 = 2;
500
501        fn historical_schemas() -> Vec<(u32, crate::codec::schema::DynamicSchema)> {
502            vec![(
503                1,
504                crate::codec::schema::DynamicSchema::map([(
505                    "a",
506                    crate::codec::schema::DynamicSchema::U64,
507                )]),
508            )]
509        }
510
511        fn migrate(dynamic: crate::codec::Dynamic, from_version: u32) -> Result<Self> {
512            if from_version != 1 {
513                return Err(Error::SchemaMigrationNotImplemented {
514                    collection: Self::COLLECTION,
515                    from_version,
516                    to_version: Self::VERSION,
517                });
518            }
519            let a = match dynamic.get("a") {
520                Some(crate::codec::Dynamic::U64(n)) => {
521                    u32::try_from(*n).map_err(|_| Error::SchemaMigrationNotImplemented {
522                        collection: Self::COLLECTION,
523                        from_version,
524                        to_version: Self::VERSION,
525                    })?
526                }
527                _ => {
528                    return Err(Error::SchemaMigrationNotImplemented {
529                        collection: Self::COLLECTION,
530                        from_version,
531                        to_version: Self::VERSION,
532                    });
533                }
534            };
535            Ok(EvolvingV2 {
536                a,
537                b: "<default>".to_owned(),
538            })
539        }
540    }
541
542    #[test]
543    fn migrate_override_lifts_v1_to_v2() {
544        // 1. Encode a v1 record. We can't use `encode<EvolvingV1>`
545        //    directly because EvolvingV2 occupies the "evolving"
546        //    collection at compile time; instead we hand-build the
547        //    record at type_version = 1 by re-using the codec's
548        //    header layout.
549        let v1 = EvolvingV1 { a: 99 };
550        let payload = postcard::to_allocvec(&v1).expect("postcard");
551        let header = DocumentHeader {
552            collection_id: 13,
553            type_version: 1,
554            payload_len: u32::try_from(payload.len()).expect("fits u32"),
555            payload_crc32c: crc32c(&payload),
556        };
557        let mut record = Vec::with_capacity(DOC_HEADER_SIZE + payload.len());
558        header.write_to(&mut record);
559        record.extend_from_slice(&payload);
560
561        // 2. Decode as v2 — the codec spots type_version < VERSION
562        //    and dispatches through EvolvingV2::migrate after
563        //    walking the v1 schema.
564        let decoded: EvolvingV2 = decode(&record, 13).expect("migrate succeeds");
565        assert_eq!(
566            decoded,
567            EvolvingV2 {
568                a: 99,
569                b: "<default>".to_owned(),
570            }
571        );
572    }
573
574    #[test]
575    fn current_version_does_not_route_through_migrate() {
576        // Sanity check: a v2 record (matching EvolvingV2::VERSION) is
577        // decoded directly by postcard, NOT through `migrate`.
578        let v2 = EvolvingV2 {
579            a: 7,
580            b: "in-band".to_owned(),
581        };
582        let bytes = encode(&v2, 13).expect("encode");
583        let back: EvolvingV2 = decode(&bytes, 13).expect("decode");
584        assert_eq!(back, v2);
585    }
586
587    // M10 #83: unregistered historical version → SchemaNotRegistered.
588    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
589    struct UnregisteredV2 {
590        a: u32,
591    }
592
593    impl Document for UnregisteredV2 {
594        // Distinct collection name so the catalog dispatch does not
595        // collide with EvolvingV2.
596        const COLLECTION: &'static str = "unregistered";
597        const VERSION: u32 = 2;
598
599        // No historical_schemas() override — the default empty Vec
600        // applies.  Decoding a v1 record must surface
601        // SchemaNotRegistered.
602        fn migrate(_dynamic: crate::codec::Dynamic, _from_version: u32) -> Result<Self> {
603            // Never reached — the codec errors before dispatch.
604            unimplemented!("not reached when no schema is registered")
605        }
606    }
607
608    #[test]
609    fn missing_history_entry_errors_schema_not_registered() {
610        let payload = postcard::to_allocvec(&UnregisteredV2 { a: 1 }).expect("postcard");
611        let header = DocumentHeader {
612            collection_id: 17,
613            type_version: 1,
614            payload_len: u32::try_from(payload.len()).expect("fits u32"),
615            payload_crc32c: crc32c(&payload),
616        };
617        let mut record = Vec::with_capacity(DOC_HEADER_SIZE + payload.len());
618        header.write_to(&mut record);
619        record.extend_from_slice(&payload);
620        let err = decode::<UnregisteredV2>(&record, 17).expect_err("unregistered");
621        assert!(matches!(
622            err,
623            Error::SchemaNotRegistered {
624                collection: "unregistered",
625                version: 1,
626            }
627        ));
628    }
629}