nodedb_array/wal.rs
1// SPDX-License-Identifier: Apache-2.0
2
3//! WAL record types for array CRDT sync operations.
4//!
5//! [`ArrayWalRecord`] is the durable on-disk representation of every
6//! array sync event that must survive process restart. Origin appends
7//! one record per inbound op before applying it to engine state;
8//! recovery replays the log in order.
9//!
10//! # HLC byte invariant
11//!
12//! `hlc_bytes` fields carry the 18-byte layout produced by
13//! [`crate::sync::Hlc::to_bytes()`]. Because zerompk does not derive
14//! impls for fixed-size `[u8; N]` arrays they are stored as `Vec<u8>`.
15//! The receiver must assert `hlc_bytes.len() == 18` and convert via
16//! `Hlc::from_bytes(&arr)` where `arr: [u8; 18]`.
17
18use serde::{Deserialize, Serialize};
19
20/// A single record written to the WAL for array CRDT sync events.
21#[derive(
22 Debug,
23 Clone,
24 PartialEq,
25 Serialize,
26 Deserialize,
27 zerompk::ToMessagePack,
28 zerompk::FromMessagePack,
29)]
30pub enum ArrayWalRecord {
31 /// A single cell op (Put, Delete, or Erase) received from a Lite peer.
32 ///
33 /// `op_payload` is a zerompk-encoded [`crate::sync::ArrayOp`] produced
34 /// by `nodedb_array::sync::op_codec::encode_op`. The WAL stores it
35 /// opaque to decouple the WAL crate from schema evolution in `ArrayOp`.
36 ///
37 /// `hlc_bytes` is the 18-byte HLC of the op (invariant: `len == 18`).
38 ApplyOp {
39 array: String,
40 op_payload: Vec<u8>,
41 hlc_bytes: Vec<u8>,
42 },
43
44 /// An array schema was updated (Loro CRDT snapshot imported from a peer).
45 ///
46 /// `loro_snapshot` is the raw bytes from `SchemaDoc::export_snapshot()`.
47 /// `schema_hlc_bytes` is the 18-byte HLC of the new schema version
48 /// (invariant: `len == 18`).
49 SchemaUpdate {
50 array: String,
51 loro_snapshot: Vec<u8>,
52 schema_hlc_bytes: Vec<u8>,
53 },
54
55 /// A tile snapshot was applied as the result of a catch-up sync stream.
56 ///
57 /// `snapshot_hlc_bytes` is the 18-byte HLC covering this snapshot
58 /// (invariant: `len == 18`). `chunks` holds the raw tile blobs —
59 /// each element is a zerompk-encoded `Vec<ArrayOp>` matching the
60 /// payload produced by `nodedb_array::sync::snapshot::encode_snapshot`.
61 Snapshot {
62 array: String,
63 snapshot_hlc_bytes: Vec<u8>,
64 coord_range_payload: Vec<u8>,
65 chunks: Vec<Vec<u8>>,
66 },
67
68 /// GC collapsed all ops below `collapsed_through_hlc_bytes` into a
69 /// snapshot. Ops with HLC < this value may be pruned from the op-log.
70 ///
71 /// `collapsed_through_hlc_bytes` is 18-byte HLC (invariant: `len == 18`).
72 GcCollapse {
73 array: String,
74 collapsed_through_hlc_bytes: Vec<u8>,
75 },
76}
77
78#[cfg(test)]
79mod tests {
80 use super::*;
81
82 fn zero_hlc() -> Vec<u8> {
83 vec![0u8; 18]
84 }
85
86 #[test]
87 fn apply_op_roundtrip() {
88 let record = ArrayWalRecord::ApplyOp {
89 array: "temperatures".into(),
90 op_payload: vec![1, 2, 3, 4],
91 hlc_bytes: zero_hlc(),
92 };
93 let encoded = zerompk::to_msgpack_vec(&record).expect("encode");
94 let decoded: ArrayWalRecord = zerompk::from_msgpack(&encoded).expect("decode");
95 assert_eq!(record, decoded);
96 }
97
98 #[test]
99 fn schema_update_roundtrip() {
100 let record = ArrayWalRecord::SchemaUpdate {
101 array: "metrics".into(),
102 loro_snapshot: vec![10, 20, 30],
103 schema_hlc_bytes: zero_hlc(),
104 };
105 let encoded = zerompk::to_msgpack_vec(&record).expect("encode");
106 let decoded: ArrayWalRecord = zerompk::from_msgpack(&encoded).expect("decode");
107 assert_eq!(record, decoded);
108 }
109
110 #[test]
111 fn snapshot_roundtrip() {
112 let record = ArrayWalRecord::Snapshot {
113 array: "coords".into(),
114 snapshot_hlc_bytes: zero_hlc(),
115 coord_range_payload: vec![5, 6, 7],
116 chunks: vec![vec![1, 2], vec![3, 4]],
117 };
118 let encoded = zerompk::to_msgpack_vec(&record).expect("encode");
119 let decoded: ArrayWalRecord = zerompk::from_msgpack(&encoded).expect("decode");
120 assert_eq!(record, decoded);
121 }
122
123 #[test]
124 fn gc_collapse_roundtrip() {
125 let record = ArrayWalRecord::GcCollapse {
126 array: "events".into(),
127 collapsed_through_hlc_bytes: zero_hlc(),
128 };
129 let encoded = zerompk::to_msgpack_vec(&record).expect("encode");
130 let decoded: ArrayWalRecord = zerompk::from_msgpack(&encoded).expect("decode");
131 assert_eq!(record, decoded);
132 }
133
134 #[test]
135 fn hlc_bytes_len_invariant() {
136 // Confirm the invariant: the zero_hlc sentinel is exactly 18 bytes.
137 assert_eq!(zero_hlc().len(), 18);
138 }
139}