Skip to main content

nodedb_array/sync/
schema_crdt.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Loro-backed CRDT schema document for arrays.
4//!
5//! [`SchemaDoc`] wraps a [`LoroDoc`] to give each array a CRDT-replicated
6//! schema. The schema is stored as a MessagePack blob under the key
7//! `"content"` in a root Loro map named `"root"`. HLC tracking ensures that
8//! schema changes can be causally ordered with cell ops.
9//!
10//! This is the minimum surface needed for the initial Array sync. ALTER
11//! ARRAY support (incremental dimension/attribute add, domain expansion)
12//! will build on top of the `replace_schema` path exposed here.
13
14use loro::{LoroDoc, LoroMap, LoroValue};
15
16use crate::error::{ArrayError, ArrayResult};
17use crate::schema::array_schema::ArraySchema;
18use crate::sync::hlc::{Hlc, HlcGenerator};
19use crate::sync::replica_id::ReplicaId;
20
21/// Envelope version for the Loro snapshot bytes produced and consumed by this
22/// module.
23///
24/// Format: `[LORO_FORMAT_VERSION: u8][loro snapshot bytes…]`
25///
26/// Increment this constant (and add a migration path) whenever the Loro
27/// snapshot wire format changes in a backward-incompatible way. The version is
28/// checked on every import so that snapshots from an older binary are rejected
29/// with a clear error rather than silently corrupting state.
30pub const LORO_FORMAT_VERSION: u8 = 1;
31
32/// Loro-backed CRDT document tracking a single array's schema.
33///
34/// The schema is stored as a MessagePack blob at root map key `"content"`.
35/// `schema_hlc` is the HLC of the most-recent schema write on this replica;
36/// it is compared against the `schema_hlc` embedded in each [`ArrayOp`]
37/// header to gate op application.
38///
39/// `LoroDoc` is not `Clone`, so [`SchemaDoc`] is not `Clone` either.
40pub struct SchemaDoc {
41    doc: LoroDoc,
42    schema_hlc: Hlc,
43    replica_id: ReplicaId,
44}
45
46impl SchemaDoc {
47    /// Create an empty schema doc for `replica_id`.
48    ///
49    /// `schema_hlc` starts at `Hlc::ZERO`. Call [`SchemaDoc::from_schema`]
50    /// or [`SchemaDoc::import_snapshot`] to populate it.
51    pub fn new(replica_id: ReplicaId) -> Self {
52        Self {
53            doc: LoroDoc::new(),
54            schema_hlc: Hlc::ZERO,
55            replica_id,
56        }
57    }
58
59    /// Construct a schema doc pre-populated with `schema`.
60    ///
61    /// The schema is encoded as MessagePack and stored under
62    /// `root["content"]`. `generator.next()` is called to assign the initial
63    /// `schema_hlc`.
64    pub fn from_schema(
65        replica_id: ReplicaId,
66        schema: &ArraySchema,
67        generator: &HlcGenerator,
68    ) -> ArrayResult<Self> {
69        let mut doc_self = Self::new(replica_id);
70        doc_self.write_schema_to_doc(schema)?;
71        doc_self.schema_hlc = generator.next()?;
72        Ok(doc_self)
73    }
74
75    /// Return the current schema HLC.
76    pub fn schema_hlc(&self) -> Hlc {
77        self.schema_hlc
78    }
79
80    /// Return the replica ID of this doc.
81    pub fn replica_id(&self) -> ReplicaId {
82        self.replica_id
83    }
84
85    /// Decode the stored schema from the Loro doc.
86    ///
87    /// Reads the MessagePack blob at `root["content"]` and decodes it via
88    /// zerompk. Errors map to [`ArrayError::SegmentCorruption`].
89    pub fn to_schema(&self) -> ArrayResult<ArraySchema> {
90        let root = self.doc.get_map("root");
91        let bytes = self.read_content_bytes(&root)?;
92        zerompk::from_msgpack(&bytes).map_err(|e| ArrayError::SegmentCorruption {
93            detail: format!("schema decode failed: {e}"),
94        })
95    }
96
97    /// Export the full Loro snapshot as an enveloped byte buffer.
98    ///
99    /// The returned bytes have the format:
100    /// `[LORO_FORMAT_VERSION: u8][loro snapshot bytes…]`
101    ///
102    /// Pass the result to [`SchemaDoc::import_snapshot`] (or
103    /// [`SchemaDoc::import_snapshot_replicated`]) on another replica to
104    /// converge schema state.
105    pub fn export_snapshot(&self) -> ArrayResult<Vec<u8>> {
106        let loro_bytes = self.doc.export(loro::ExportMode::Snapshot).map_err(|e| {
107            ArrayError::SegmentCorruption {
108                detail: format!("loro snapshot export failed: {e}"),
109            }
110        })?;
111        let mut envelope = Vec::with_capacity(1 + loro_bytes.len());
112        envelope.push(LORO_FORMAT_VERSION);
113        envelope.extend_from_slice(&loro_bytes);
114        Ok(envelope)
115    }
116
117    /// Import a Loro snapshot from a remote replica.
118    ///
119    /// After merging the snapshot, `generator.observe(remote_hlc)` is called
120    /// so the local generator incorporates the remote clock. A fresh
121    /// `schema_hlc` is then generated via `generator.next()` so that any
122    /// subsequent local writes have an HLC strictly greater than
123    /// `remote_hlc`.
124    pub fn import_snapshot(
125        &mut self,
126        bytes: &[u8],
127        remote_hlc: Hlc,
128        generator: &HlcGenerator,
129    ) -> ArrayResult<()> {
130        let loro_bytes = strip_envelope(bytes)?;
131        self.doc
132            .import(loro_bytes)
133            .map_err(|e| ArrayError::LoroError {
134                detail: format!("loro import failed: {e}"),
135            })?;
136        generator.observe(remote_hlc)?;
137        self.schema_hlc = generator.next()?;
138        Ok(())
139    }
140
141    /// Import a Loro snapshot from a Raft-committed entry.
142    ///
143    /// Unlike [`import_snapshot`], this method sets `schema_hlc` to exactly
144    /// `remote_hlc` rather than bumping past it. This is the correct
145    /// behaviour for Raft replication: every replica must converge to the
146    /// *same* `schema_hlc` after applying the same log entry so that
147    /// schema-gating checks on ops are consistent across the cluster.
148    ///
149    /// Call this only from the distributed applier (Raft commit path), not
150    /// from the CRDT sync path where bumping is required.
151    pub fn import_snapshot_replicated(
152        &mut self,
153        bytes: &[u8],
154        committed_hlc: Hlc,
155    ) -> ArrayResult<()> {
156        let loro_bytes = strip_envelope(bytes)?;
157        self.doc
158            .import(loro_bytes)
159            .map_err(|e| ArrayError::LoroError {
160                detail: format!("loro import (replicated) failed: {e}"),
161            })?;
162        self.schema_hlc = committed_hlc;
163        Ok(())
164    }
165
166    /// Replace the stored schema with `schema`.
167    ///
168    /// Re-encodes the schema as MessagePack and overwrites `root["content"]`.
169    /// Bumps `schema_hlc` via `generator.next()`.
170    ///
171    /// This is the stub entry point for Phase F ALTER ARRAY support.
172    /// Incremental dim/attr add will build on this path.
173    pub fn replace_schema(
174        &mut self,
175        schema: &ArraySchema,
176        generator: &HlcGenerator,
177    ) -> ArrayResult<()> {
178        self.write_schema_to_doc(schema)?;
179        self.schema_hlc = generator.next()?;
180        Ok(())
181    }
182
183    // ─── Internal helpers ────────────────────────────────────────────────────
184
185    fn write_schema_to_doc(&self, schema: &ArraySchema) -> ArrayResult<()> {
186        let schema_bytes =
187            zerompk::to_msgpack_vec(schema).map_err(|e| ArrayError::SegmentCorruption {
188                detail: format!("schema encode failed: {e}"),
189            })?;
190        let root: LoroMap = self.doc.get_map("root");
191        root.insert("content", LoroValue::Binary(schema_bytes.into()))
192            .map_err(|e| ArrayError::LoroError {
193                detail: format!("loro map insert failed: {e}"),
194            })?;
195        Ok(())
196    }
197
198    fn read_content_bytes(&self, root: &LoroMap) -> ArrayResult<Vec<u8>> {
199        match root.get("content") {
200            Some(loro::ValueOrContainer::Value(LoroValue::Binary(b))) => Ok(b.to_vec()),
201            Some(other) => Err(ArrayError::SegmentCorruption {
202                detail: format!("expected Binary at root[\"content\"], got {:?}", other),
203            }),
204            None => Err(ArrayError::SegmentCorruption {
205                detail: "root[\"content\"] not found".into(),
206            }),
207        }
208    }
209}
210
211// ─── Envelope helpers ─────────────────────────────────────────────────────────
212
213/// Strip the one-byte version prefix from an enveloped snapshot buffer.
214///
215/// Returns a slice into `bytes` starting after the version byte, or an error
216/// if the buffer is empty or the version does not match [`LORO_FORMAT_VERSION`].
217fn strip_envelope(bytes: &[u8]) -> ArrayResult<&[u8]> {
218    match bytes.first() {
219        None => Err(ArrayError::SegmentCorruption {
220            detail: "loro snapshot envelope is empty".into(),
221        }),
222        Some(&v) if v != LORO_FORMAT_VERSION => Err(ArrayError::LoroSnapshotVersionMismatch {
223            expected: LORO_FORMAT_VERSION,
224            got: v,
225        }),
226        Some(_) => Ok(&bytes[1..]),
227    }
228}
229
230// ─── Tests ───────────────────────────────────────────────────────────────────
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use crate::schema::array_schema::ArraySchema;
236    use crate::schema::attr_spec::{AttrSpec, AttrType};
237    use crate::schema::cell_order::{CellOrder, TileOrder};
238    use crate::schema::dim_spec::{DimSpec, DimType};
239    use crate::sync::hlc::HlcGenerator;
240    use crate::sync::replica_id::ReplicaId;
241    use crate::types::domain::{Domain, DomainBound};
242
243    fn replica(id: u64) -> ReplicaId {
244        ReplicaId::new(id)
245    }
246
247    fn generator(id: u64) -> HlcGenerator {
248        HlcGenerator::new(replica(id))
249    }
250
251    fn simple_schema(name: &str) -> ArraySchema {
252        ArraySchema {
253            name: name.into(),
254            dims: vec![DimSpec::new(
255                "x",
256                DimType::Int64,
257                Domain::new(DomainBound::Int64(0), DomainBound::Int64(99)),
258            )],
259            attrs: vec![AttrSpec::new("v", AttrType::Float64, true)],
260            tile_extents: vec![10],
261            cell_order: CellOrder::RowMajor,
262            tile_order: TileOrder::RowMajor,
263        }
264    }
265
266    #[test]
267    fn from_schema_then_to_schema_roundtrips() {
268        let g = generator(1);
269        let schema = simple_schema("arr");
270        let doc = SchemaDoc::from_schema(replica(1), &schema, &g).unwrap();
271        let back = doc.to_schema().unwrap();
272        assert_eq!(schema, back);
273        assert!(doc.schema_hlc() > Hlc::ZERO);
274    }
275
276    #[test]
277    fn replace_schema_bumps_hlc() {
278        let g = generator(1);
279        let schema = simple_schema("arr");
280        let mut doc = SchemaDoc::from_schema(replica(1), &schema, &g).unwrap();
281        let hlc_before = doc.schema_hlc();
282
283        let schema2 = simple_schema("arr2");
284        doc.replace_schema(&schema2, &g).unwrap();
285        assert!(doc.schema_hlc() > hlc_before);
286        assert_eq!(doc.to_schema().unwrap(), schema2);
287    }
288
289    #[test]
290    fn export_then_import_converges() {
291        let g_a = generator(1);
292        let schema = simple_schema("shared");
293        let doc_a = SchemaDoc::from_schema(replica(1), &schema, &g_a).unwrap();
294        let snapshot = doc_a.export_snapshot().unwrap();
295
296        let g_b = generator(2);
297        let mut doc_b = SchemaDoc::new(replica(2));
298        doc_b
299            .import_snapshot(&snapshot, doc_a.schema_hlc(), &g_b)
300            .unwrap();
301
302        assert_eq!(doc_a.to_schema().unwrap(), doc_b.to_schema().unwrap());
303    }
304
305    #[test]
306    fn import_observes_remote_hlc() {
307        let g_a = generator(1);
308        let schema = simple_schema("x");
309        let doc_a = SchemaDoc::from_schema(replica(1), &schema, &g_a).unwrap();
310        let remote_hlc = doc_a.schema_hlc();
311        let snapshot = doc_a.export_snapshot().unwrap();
312
313        let g_b = generator(2);
314        let mut doc_b = SchemaDoc::new(replica(2));
315        doc_b.import_snapshot(&snapshot, remote_hlc, &g_b).unwrap();
316
317        // After import, any new local write must produce hlc > remote_hlc.
318        doc_b.replace_schema(&simple_schema("x2"), &g_b).unwrap();
319        assert!(doc_b.schema_hlc() > remote_hlc);
320    }
321
322    #[test]
323    fn import_garbage_errors() {
324        let g = generator(1);
325        let mut doc = SchemaDoc::new(replica(1));
326        // Prefix with a valid version byte so we get past the envelope check
327        // and into Loro's own parser — which should reject the payload.
328        let mut bad = vec![LORO_FORMAT_VERSION];
329        bad.extend_from_slice(b"not valid loro data");
330        let result = doc.import_snapshot(&bad, Hlc::ZERO, &g);
331        assert!(result.is_err());
332    }
333
334    #[test]
335    fn export_snapshot_has_version_prefix() {
336        let g = generator(1);
337        let schema = simple_schema("arr");
338        let doc = SchemaDoc::from_schema(replica(1), &schema, &g).unwrap();
339        let snapshot = doc.export_snapshot().unwrap();
340        assert!(!snapshot.is_empty());
341        assert_eq!(snapshot[0], LORO_FORMAT_VERSION);
342    }
343
344    #[test]
345    fn import_snapshot_rejects_wrong_version() {
346        let g_a = generator(1);
347        let schema = simple_schema("v");
348        let doc_a = SchemaDoc::from_schema(replica(1), &schema, &g_a).unwrap();
349        let mut snapshot = doc_a.export_snapshot().unwrap();
350
351        // Corrupt the version byte.
352        snapshot[0] = LORO_FORMAT_VERSION.wrapping_add(1);
353
354        let g_b = generator(2);
355        let mut doc_b = SchemaDoc::new(replica(2));
356        let err = doc_b
357            .import_snapshot(&snapshot, doc_a.schema_hlc(), &g_b)
358            .unwrap_err();
359        assert!(
360            matches!(
361                err,
362                ArrayError::LoroSnapshotVersionMismatch { expected, got }
363                    if expected == LORO_FORMAT_VERSION && got == LORO_FORMAT_VERSION.wrapping_add(1)
364            ),
365            "unexpected error: {err}"
366        );
367    }
368
369    #[test]
370    fn import_snapshot_replicated_rejects_wrong_version() {
371        let g_a = generator(1);
372        let schema = simple_schema("v2");
373        let doc_a = SchemaDoc::from_schema(replica(1), &schema, &g_a).unwrap();
374        let mut snapshot = doc_a.export_snapshot().unwrap();
375
376        snapshot[0] = 0; // version 0 has never existed
377
378        let mut doc_b = SchemaDoc::new(replica(2));
379        let err = doc_b
380            .import_snapshot_replicated(&snapshot, doc_a.schema_hlc())
381            .unwrap_err();
382        assert!(
383            matches!(
384                err,
385                ArrayError::LoroSnapshotVersionMismatch { expected, got }
386                    if expected == LORO_FORMAT_VERSION && got == 0
387            ),
388            "unexpected error: {err}"
389        );
390    }
391}