Skip to main content

nodedb_array/sync/
op.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Array CRDT operation types.
4//!
5//! Each mutation to the array engine produces one [`ArrayOp`]. Operations
6//! are the atomic unit of replication: they flow from the originating replica
7//! to Origin and fan out to all subscribed peers.
8
9use serde::{Deserialize, Serialize};
10
11use crate::error::{ArrayError, ArrayResult};
12use crate::sync::hlc::Hlc;
13use crate::types::cell_value::value::CellValue;
14use crate::types::coord::value::CoordValue;
15
16/// The kind of mutation an [`ArrayOp`] represents.
17#[derive(
18    Copy,
19    Clone,
20    Debug,
21    PartialEq,
22    Eq,
23    Serialize,
24    Deserialize,
25    zerompk::ToMessagePack,
26    zerompk::FromMessagePack,
27)]
28pub enum ArrayOpKind {
29    /// Insert or update a cell. `ArrayOp::attrs` must be `Some`.
30    Put,
31    /// Logical delete of a cell (soft tombstone). `ArrayOp::attrs` must be `None`.
32    Delete,
33    /// GDPR-grade erase of a cell (hard tombstone). `ArrayOp::attrs` must be `None`.
34    Erase,
35}
36
37/// Metadata carried by every array operation.
38///
39/// `system_from_ms` is redundant with `hlc.physical_ms` and is included
40/// separately for fast bitemporal index lookups that do not need to unpack
41/// the full HLC.
42#[derive(
43    Clone,
44    Debug,
45    PartialEq,
46    Serialize,
47    Deserialize,
48    zerompk::ToMessagePack,
49    zerompk::FromMessagePack,
50)]
51pub struct ArrayOpHeader {
52    /// Name of the target array collection.
53    pub array: String,
54    /// HLC timestamp of this operation at the originating replica.
55    pub hlc: Hlc,
56    /// HLC of the array schema that was in effect when this op was generated.
57    ///
58    /// Receivers gate application on `schema_hlc <= local_schema_hlc`.
59    pub schema_hlc: Hlc,
60    /// Valid-time start (milliseconds since Unix epoch). `-1` means "open".
61    pub valid_from_ms: i64,
62    /// Valid-time end (milliseconds since Unix epoch). `-1` means "open".
63    pub valid_until_ms: i64,
64    /// System-time start; mirrors `hlc.physical_ms` for fast bitemporal indexing.
65    pub system_from_ms: i64,
66}
67
68/// A single array CRDT operation.
69///
70/// Shape contract:
71/// - [`ArrayOpKind::Put`] — `attrs` must be `Some(_)`.
72/// - [`ArrayOpKind::Delete`] / [`ArrayOpKind::Erase`] — `attrs` must be `None`.
73///
74/// Use [`ArrayOp::validate_shape`] to enforce this invariant after construction.
75#[derive(
76    Clone,
77    Debug,
78    PartialEq,
79    Serialize,
80    Deserialize,
81    zerompk::ToMessagePack,
82    zerompk::FromMessagePack,
83)]
84pub struct ArrayOp {
85    /// Operation metadata (HLC, schema version, bitemporal bounds).
86    pub header: ArrayOpHeader,
87    /// Kind of mutation.
88    pub kind: ArrayOpKind,
89    /// N-dimensional coordinate identifying the target cell.
90    pub coord: Vec<CoordValue>,
91    /// Cell attribute values for a `Put`; `None` for `Delete` and `Erase`.
92    pub attrs: Option<Vec<CellValue>>,
93}
94
95impl ArrayOp {
96    /// Validate that `attrs` presence matches the operation kind.
97    ///
98    /// Returns [`ArrayError::InvalidOp`] if the shape contract is violated.
99    pub fn validate_shape(&self) -> ArrayResult<()> {
100        match self.kind {
101            ArrayOpKind::Put => {
102                if self.attrs.is_none() {
103                    return Err(ArrayError::InvalidOp {
104                        detail: "Put op must carry attrs".into(),
105                    });
106                }
107            }
108            ArrayOpKind::Delete | ArrayOpKind::Erase => {
109                if self.attrs.is_some() {
110                    return Err(ArrayError::InvalidOp {
111                        detail: format!("{:?} op must not carry attrs", self.kind),
112                    });
113                }
114            }
115        }
116        Ok(())
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123    use crate::sync::hlc::Hlc;
124    use crate::sync::replica_id::ReplicaId;
125    use crate::types::cell_value::value::CellValue;
126    use crate::types::coord::value::CoordValue;
127
128    fn dummy_hlc() -> Hlc {
129        Hlc::new(1_000, 0, ReplicaId::new(1)).unwrap()
130    }
131
132    fn dummy_header(array: &str) -> ArrayOpHeader {
133        ArrayOpHeader {
134            array: array.into(),
135            hlc: dummy_hlc(),
136            schema_hlc: dummy_hlc(),
137            valid_from_ms: 0,
138            valid_until_ms: -1,
139            system_from_ms: 1_000,
140        }
141    }
142
143    fn dummy_coord() -> Vec<CoordValue> {
144        vec![CoordValue::Int64(0)]
145    }
146
147    fn dummy_attrs() -> Option<Vec<CellValue>> {
148        Some(vec![CellValue::Null])
149    }
150
151    #[test]
152    fn put_requires_attrs() {
153        let op = ArrayOp {
154            header: dummy_header("t"),
155            kind: ArrayOpKind::Put,
156            coord: dummy_coord(),
157            attrs: None,
158        };
159        assert!(matches!(
160            op.validate_shape(),
161            Err(ArrayError::InvalidOp { .. })
162        ));
163    }
164
165    #[test]
166    fn delete_rejects_attrs() {
167        let op = ArrayOp {
168            header: dummy_header("t"),
169            kind: ArrayOpKind::Delete,
170            coord: dummy_coord(),
171            attrs: dummy_attrs(),
172        };
173        assert!(matches!(
174            op.validate_shape(),
175            Err(ArrayError::InvalidOp { .. })
176        ));
177    }
178
179    #[test]
180    fn erase_rejects_attrs() {
181        let op = ArrayOp {
182            header: dummy_header("t"),
183            kind: ArrayOpKind::Erase,
184            coord: dummy_coord(),
185            attrs: dummy_attrs(),
186        };
187        assert!(matches!(
188            op.validate_shape(),
189            Err(ArrayError::InvalidOp { .. })
190        ));
191    }
192
193    #[test]
194    fn valid_put_passes() {
195        let op = ArrayOp {
196            header: dummy_header("t"),
197            kind: ArrayOpKind::Put,
198            coord: dummy_coord(),
199            attrs: dummy_attrs(),
200        };
201        assert!(op.validate_shape().is_ok());
202    }
203
204    #[test]
205    fn valid_delete_passes() {
206        let op = ArrayOp {
207            header: dummy_header("t"),
208            kind: ArrayOpKind::Delete,
209            coord: dummy_coord(),
210            attrs: None,
211        };
212        assert!(op.validate_shape().is_ok());
213    }
214
215    #[test]
216    fn serialize_roundtrip() {
217        let op = ArrayOp {
218            header: dummy_header("test_array"),
219            kind: ArrayOpKind::Put,
220            coord: dummy_coord(),
221            attrs: dummy_attrs(),
222        };
223        let bytes = zerompk::to_msgpack_vec(&op).expect("serialize");
224        let back: ArrayOp = zerompk::from_msgpack(&bytes).expect("deserialize");
225        assert_eq!(op, back);
226    }
227}