nodedb_array/sync/
op_codec.rs1use crate::error::{ArrayError, ArrayResult};
9use crate::sync::op::ArrayOp;
10
11pub fn encode_op(op: &ArrayOp) -> ArrayResult<Vec<u8>> {
13 zerompk::to_msgpack_vec(op).map_err(|e| ArrayError::SegmentCorruption {
14 detail: format!("encode_op: {e}"),
15 })
16}
17
18pub fn decode_op(bytes: &[u8]) -> ArrayResult<ArrayOp> {
20 zerompk::from_msgpack(bytes).map_err(|e| ArrayError::SegmentCorruption {
21 detail: format!("decode_op: {e}"),
22 })
23}
24
25pub fn encode_op_batch(ops: &[ArrayOp]) -> ArrayResult<Vec<u8>> {
27 zerompk::to_msgpack_vec(&ops.to_vec()).map_err(|e| ArrayError::SegmentCorruption {
28 detail: format!("encode_op_batch: {e}"),
29 })
30}
31
32pub fn decode_op_batch(bytes: &[u8]) -> ArrayResult<Vec<ArrayOp>> {
35 zerompk::from_msgpack(bytes).map_err(|e| ArrayError::SegmentCorruption {
36 detail: format!("decode_op_batch: {e}"),
37 })
38}
39
40#[cfg(test)]
41mod tests {
42 use super::*;
43 use crate::sync::hlc::Hlc;
44 use crate::sync::op::{ArrayOpHeader, ArrayOpKind};
45 use crate::sync::replica_id::ReplicaId;
46 use crate::types::cell_value::value::CellValue;
47 use crate::types::coord::value::CoordValue;
48
49 fn hlc(ms: u64) -> Hlc {
50 Hlc::new(ms, 0, ReplicaId::new(1)).unwrap()
51 }
52
53 fn header(array: &str, ms: u64) -> ArrayOpHeader {
54 ArrayOpHeader {
55 array: array.into(),
56 hlc: hlc(ms),
57 schema_hlc: hlc(ms),
58 valid_from_ms: 0,
59 valid_until_ms: -1,
60 system_from_ms: ms as i64,
61 }
62 }
63
64 fn put_op(array: &str, ms: u64) -> ArrayOp {
65 ArrayOp {
66 header: header(array, ms),
67 kind: ArrayOpKind::Put,
68 coord: vec![CoordValue::Int64(1)],
69 attrs: Some(vec![CellValue::Null]),
70 }
71 }
72
73 fn delete_op(array: &str, ms: u64) -> ArrayOp {
74 ArrayOp {
75 header: header(array, ms),
76 kind: ArrayOpKind::Delete,
77 coord: vec![CoordValue::Int64(2)],
78 attrs: None,
79 }
80 }
81
82 fn erase_op(array: &str, ms: u64) -> ArrayOp {
83 ArrayOp {
84 header: header(array, ms),
85 kind: ArrayOpKind::Erase,
86 coord: vec![CoordValue::Int64(3)],
87 attrs: None,
88 }
89 }
90
91 #[test]
92 fn roundtrip_put() {
93 let op = put_op("arr", 100);
94 let bytes = encode_op(&op).unwrap();
95 let back = decode_op(&bytes).unwrap();
96 assert_eq!(op, back);
97 }
98
99 #[test]
100 fn roundtrip_delete() {
101 let op = delete_op("arr", 200);
102 let bytes = encode_op(&op).unwrap();
103 let back = decode_op(&bytes).unwrap();
104 assert_eq!(op, back);
105 }
106
107 #[test]
108 fn roundtrip_erase() {
109 let op = erase_op("arr", 300);
110 let bytes = encode_op(&op).unwrap();
111 let back = decode_op(&bytes).unwrap();
112 assert_eq!(op, back);
113 }
114
115 #[test]
116 fn roundtrip_batch_of_three() {
117 let ops = vec![put_op("a", 1), delete_op("b", 2), erase_op("c", 3)];
118 let bytes = encode_op_batch(&ops).unwrap();
119 let back = decode_op_batch(&bytes).unwrap();
120 assert_eq!(ops, back);
121 }
122
123 #[test]
124 fn decode_garbage_errors() {
125 let garbage = b"this is not valid msgpack \xff\xfe";
126 assert!(
127 decode_op(garbage).is_err(),
128 "decode_op should error on garbage input"
129 );
130 assert!(
131 decode_op_batch(garbage).is_err(),
132 "decode_op_batch should error on garbage input"
133 );
134 }
135}