Skip to main content

nodedb_array/sync/
op_codec.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! MessagePack encode/decode for [`ArrayOp`] and batches thereof.
4//!
5//! Uses `zerompk` (the workspace's MessagePack codec) so that encoding is
6//! consistent with the rest of the internal transport layer.
7
8use crate::error::{ArrayError, ArrayResult};
9use crate::sync::op::ArrayOp;
10
11/// Encode a single [`ArrayOp`] to MessagePack bytes.
12pub fn encode_op(op: &ArrayOp) -> ArrayResult<Vec<u8>> {
13    zerompk::to_msgpack_vec(op).map_err(|e| ArrayError::SegmentCorruption {
14        detail: format!("encode_op: {e}"),
15    })
16}
17
18/// Decode a single [`ArrayOp`] from MessagePack bytes.
19pub fn decode_op(bytes: &[u8]) -> ArrayResult<ArrayOp> {
20    zerompk::from_msgpack(bytes).map_err(|e| ArrayError::SegmentCorruption {
21        detail: format!("decode_op: {e}"),
22    })
23}
24
25/// Encode a slice of [`ArrayOp`]s as a single MessagePack value (array).
26pub fn encode_op_batch(ops: &[ArrayOp]) -> ArrayResult<Vec<u8>> {
27    zerompk::to_msgpack_vec(&ops.to_vec()).map_err(|e| ArrayError::SegmentCorruption {
28        detail: format!("encode_op_batch: {e}"),
29    })
30}
31
32/// Decode a `Vec<ArrayOp>` from MessagePack bytes produced by
33/// [`encode_op_batch`].
34pub fn decode_op_batch(bytes: &[u8]) -> ArrayResult<Vec<ArrayOp>> {
35    zerompk::from_msgpack(bytes).map_err(|e| ArrayError::SegmentCorruption {
36        detail: format!("decode_op_batch: {e}"),
37    })
38}
39
40#[cfg(test)]
41mod tests {
42    use super::*;
43    use crate::sync::hlc::Hlc;
44    use crate::sync::op::{ArrayOpHeader, ArrayOpKind};
45    use crate::sync::replica_id::ReplicaId;
46    use crate::types::cell_value::value::CellValue;
47    use crate::types::coord::value::CoordValue;
48
49    fn hlc(ms: u64) -> Hlc {
50        Hlc::new(ms, 0, ReplicaId::new(1)).unwrap()
51    }
52
53    fn header(array: &str, ms: u64) -> ArrayOpHeader {
54        ArrayOpHeader {
55            array: array.into(),
56            hlc: hlc(ms),
57            schema_hlc: hlc(ms),
58            valid_from_ms: 0,
59            valid_until_ms: -1,
60            system_from_ms: ms as i64,
61        }
62    }
63
64    fn put_op(array: &str, ms: u64) -> ArrayOp {
65        ArrayOp {
66            header: header(array, ms),
67            kind: ArrayOpKind::Put,
68            coord: vec![CoordValue::Int64(1)],
69            attrs: Some(vec![CellValue::Null]),
70        }
71    }
72
73    fn delete_op(array: &str, ms: u64) -> ArrayOp {
74        ArrayOp {
75            header: header(array, ms),
76            kind: ArrayOpKind::Delete,
77            coord: vec![CoordValue::Int64(2)],
78            attrs: None,
79        }
80    }
81
82    fn erase_op(array: &str, ms: u64) -> ArrayOp {
83        ArrayOp {
84            header: header(array, ms),
85            kind: ArrayOpKind::Erase,
86            coord: vec![CoordValue::Int64(3)],
87            attrs: None,
88        }
89    }
90
91    #[test]
92    fn roundtrip_put() {
93        let op = put_op("arr", 100);
94        let bytes = encode_op(&op).unwrap();
95        let back = decode_op(&bytes).unwrap();
96        assert_eq!(op, back);
97    }
98
99    #[test]
100    fn roundtrip_delete() {
101        let op = delete_op("arr", 200);
102        let bytes = encode_op(&op).unwrap();
103        let back = decode_op(&bytes).unwrap();
104        assert_eq!(op, back);
105    }
106
107    #[test]
108    fn roundtrip_erase() {
109        let op = erase_op("arr", 300);
110        let bytes = encode_op(&op).unwrap();
111        let back = decode_op(&bytes).unwrap();
112        assert_eq!(op, back);
113    }
114
115    #[test]
116    fn roundtrip_batch_of_three() {
117        let ops = vec![put_op("a", 1), delete_op("b", 2), erase_op("c", 3)];
118        let bytes = encode_op_batch(&ops).unwrap();
119        let back = decode_op_batch(&bytes).unwrap();
120        assert_eq!(ops, back);
121    }
122
123    #[test]
124    fn decode_garbage_errors() {
125        let garbage = b"this is not valid msgpack \xff\xfe";
126        assert!(
127            decode_op(garbage).is_err(),
128            "decode_op should error on garbage input"
129        );
130        assert!(
131            decode_op_batch(garbage).is_err(),
132            "decode_op_batch should error on garbage input"
133        );
134    }
135}