Skip to main content

nodedb_array/
wal.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! WAL record types for array CRDT sync operations.
4//!
5//! [`ArrayWalRecord`] is the durable on-disk representation of every
6//! array sync event that must survive process restart. Origin appends
7//! one record per inbound op before applying it to engine state;
8//! recovery replays the log in order.
9//!
10//! # HLC byte invariant
11//!
12//! `hlc_bytes` fields carry the 18-byte layout produced by
13//! [`crate::sync::Hlc::to_bytes()`]. Because zerompk does not derive
14//! impls for fixed-size `[u8; N]` arrays they are stored as `Vec<u8>`.
15//! The receiver must assert `hlc_bytes.len() == 18` and convert via
16//! `Hlc::from_bytes(&arr)` where `arr: [u8; 18]`.
17
18use serde::{Deserialize, Serialize};
19
20/// A single record written to the WAL for array CRDT sync events.
21#[derive(
22    Debug,
23    Clone,
24    PartialEq,
25    Serialize,
26    Deserialize,
27    zerompk::ToMessagePack,
28    zerompk::FromMessagePack,
29)]
30pub enum ArrayWalRecord {
31    /// A single cell op (Put, Delete, or Erase) received from a Lite peer.
32    ///
33    /// `op_payload` is a zerompk-encoded [`crate::sync::ArrayOp`] produced
34    /// by `nodedb_array::sync::op_codec::encode_op`. The WAL stores it
35    /// opaque to decouple the WAL crate from schema evolution in `ArrayOp`.
36    ///
37    /// `hlc_bytes` is the 18-byte HLC of the op (invariant: `len == 18`).
38    ApplyOp {
39        array: String,
40        op_payload: Vec<u8>,
41        hlc_bytes: Vec<u8>,
42    },
43
44    /// An array schema was updated (Loro CRDT snapshot imported from a peer).
45    ///
46    /// `loro_snapshot` is the raw bytes from `SchemaDoc::export_snapshot()`.
47    /// `schema_hlc_bytes` is the 18-byte HLC of the new schema version
48    /// (invariant: `len == 18`).
49    SchemaUpdate {
50        array: String,
51        loro_snapshot: Vec<u8>,
52        schema_hlc_bytes: Vec<u8>,
53    },
54
55    /// A tile snapshot was applied as the result of a catch-up sync stream.
56    ///
57    /// `snapshot_hlc_bytes` is the 18-byte HLC covering this snapshot
58    /// (invariant: `len == 18`). `chunks` holds the raw tile blobs —
59    /// each element is a zerompk-encoded `Vec<ArrayOp>` matching the
60    /// payload produced by `nodedb_array::sync::snapshot::encode_snapshot`.
61    Snapshot {
62        array: String,
63        snapshot_hlc_bytes: Vec<u8>,
64        coord_range_payload: Vec<u8>,
65        chunks: Vec<Vec<u8>>,
66    },
67
68    /// GC collapsed all ops below `collapsed_through_hlc_bytes` into a
69    /// snapshot. Ops with HLC < this value may be pruned from the op-log.
70    ///
71    /// `collapsed_through_hlc_bytes` is 18-byte HLC (invariant: `len == 18`).
72    GcCollapse {
73        array: String,
74        collapsed_through_hlc_bytes: Vec<u8>,
75    },
76}
77
78#[cfg(test)]
79mod tests {
80    use super::*;
81
82    fn zero_hlc() -> Vec<u8> {
83        vec![0u8; 18]
84    }
85
86    #[test]
87    fn apply_op_roundtrip() {
88        let record = ArrayWalRecord::ApplyOp {
89            array: "temperatures".into(),
90            op_payload: vec![1, 2, 3, 4],
91            hlc_bytes: zero_hlc(),
92        };
93        let encoded = zerompk::to_msgpack_vec(&record).expect("encode");
94        let decoded: ArrayWalRecord = zerompk::from_msgpack(&encoded).expect("decode");
95        assert_eq!(record, decoded);
96    }
97
98    #[test]
99    fn schema_update_roundtrip() {
100        let record = ArrayWalRecord::SchemaUpdate {
101            array: "metrics".into(),
102            loro_snapshot: vec![10, 20, 30],
103            schema_hlc_bytes: zero_hlc(),
104        };
105        let encoded = zerompk::to_msgpack_vec(&record).expect("encode");
106        let decoded: ArrayWalRecord = zerompk::from_msgpack(&encoded).expect("decode");
107        assert_eq!(record, decoded);
108    }
109
110    #[test]
111    fn snapshot_roundtrip() {
112        let record = ArrayWalRecord::Snapshot {
113            array: "coords".into(),
114            snapshot_hlc_bytes: zero_hlc(),
115            coord_range_payload: vec![5, 6, 7],
116            chunks: vec![vec![1, 2], vec![3, 4]],
117        };
118        let encoded = zerompk::to_msgpack_vec(&record).expect("encode");
119        let decoded: ArrayWalRecord = zerompk::from_msgpack(&encoded).expect("decode");
120        assert_eq!(record, decoded);
121    }
122
123    #[test]
124    fn gc_collapse_roundtrip() {
125        let record = ArrayWalRecord::GcCollapse {
126            array: "events".into(),
127            collapsed_through_hlc_bytes: zero_hlc(),
128        };
129        let encoded = zerompk::to_msgpack_vec(&record).expect("encode");
130        let decoded: ArrayWalRecord = zerompk::from_msgpack(&encoded).expect("decode");
131        assert_eq!(record, decoded);
132    }
133
134    #[test]
135    fn hlc_bytes_len_invariant() {
136        // Confirm the invariant: the zero_hlc sentinel is exactly 18 bytes.
137        assert_eq!(zero_hlc().len(), 18);
138    }
139}