1use serde_json::Value as JsonValue;
2
3use super::util::{hex_decode, hex_encode};
4
5pub const DEFAULT_REPLICATION_TERM: u64 = 1;
6pub type ChangeRecordJsonValue = JsonValue;
7
8pub fn parse_change_record_json_value(text: &str) -> Result<ChangeRecordJsonValue, String> {
9 serde_json::from_str(text).map_err(|err| err.to_string())
10}
11
12pub fn change_record_json_value_to_string(value: &ChangeRecordJsonValue) -> String {
13 serde_json::to_string(value).unwrap_or_else(|_| "null".to_string())
14}
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum ChangeOperation {
18 Insert,
19 Update,
20 Delete,
21 Refresh,
22}
23
24impl ChangeOperation {
25 #[allow(clippy::should_implement_trait)]
26 pub fn from_str(value: &str) -> Option<Self> {
27 Self::from_wire_str(value)
28 }
29
30 pub fn from_wire_str(value: &str) -> Option<Self> {
31 match value {
32 "insert" => Some(Self::Insert),
33 "update" => Some(Self::Update),
34 "delete" => Some(Self::Delete),
35 "refresh" => Some(Self::Refresh),
36 _ => None,
37 }
38 }
39
40 pub fn as_wire_str(&self) -> &'static str {
41 match self {
42 Self::Insert => "insert",
43 Self::Update => "update",
44 Self::Delete => "delete",
45 Self::Refresh => "refresh",
46 }
47 }
48
49 pub fn as_str(&self) -> &'static str {
50 self.as_wire_str()
51 }
52}
53
54#[derive(Debug, Clone)]
55pub struct ChangeRecord {
56 pub term: u64,
57 pub lsn: u64,
58 pub timestamp: u64,
59 pub operation: ChangeOperation,
60 pub collection: String,
61 pub entity_id: u64,
62 pub entity_kind: String,
63 pub entity_bytes: Option<Vec<u8>>,
64 pub metadata: Option<JsonValue>,
65 pub refresh_records: Option<Vec<Vec<u8>>>,
66}
67
68impl ChangeRecord {
69 pub fn for_refresh(
70 lsn: u64,
71 timestamp: u64,
72 collection: impl Into<String>,
73 records: Vec<Vec<u8>>,
74 ) -> Self {
75 Self {
76 term: DEFAULT_REPLICATION_TERM,
77 lsn,
78 timestamp,
79 operation: ChangeOperation::Refresh,
80 collection: collection.into(),
81 entity_id: 0,
82 entity_kind: "refresh".to_string(),
83 entity_bytes: None,
84 metadata: None,
85 refresh_records: Some(records),
86 }
87 }
88
89 pub fn to_json_value(&self) -> JsonValue {
90 let mut object = serde_json::Map::new();
91 object.insert("term".to_string(), JsonValue::Number(self.term.into()));
92 object.insert("lsn".to_string(), JsonValue::Number(self.lsn.into()));
93 object.insert(
94 "timestamp".to_string(),
95 JsonValue::Number(self.timestamp.into()),
96 );
97 object.insert(
98 "operation".to_string(),
99 JsonValue::String(self.operation.as_wire_str().to_string()),
100 );
101 object.insert(
102 "collection".to_string(),
103 JsonValue::String(self.collection.clone()),
104 );
105 object.insert("rid".to_string(), JsonValue::Number(self.entity_id.into()));
106 object.insert(
107 "kind".to_string(),
108 JsonValue::String(public_item_kind(&self.entity_kind).to_string()),
109 );
110 if let Some(bytes) = &self.entity_bytes {
111 object.insert(
112 "entity_bytes_hex".to_string(),
113 JsonValue::String(hex_encode(bytes)),
114 );
115 }
116 if let Some(metadata) = &self.metadata {
117 object.insert("metadata".to_string(), metadata.clone());
118 }
119 if let Some(records) = &self.refresh_records {
120 let arr = records
121 .iter()
122 .map(|bytes| JsonValue::String(hex_encode(bytes)))
123 .collect();
124 object.insert("refresh_records_hex".to_string(), JsonValue::Array(arr));
125 }
126 JsonValue::Object(object)
127 }
128
129 pub fn encode(&self) -> Vec<u8> {
130 serde_json::to_string(&self.to_json_value())
131 .unwrap_or_else(|_| "{}".to_string())
132 .into_bytes()
133 }
134
135 pub fn with_term(mut self, term: u64) -> Self {
136 self.term = term;
137 self
138 }
139
140 pub fn decode(bytes: &[u8]) -> Result<Self, String> {
141 let text = std::str::from_utf8(bytes).map_err(|err| err.to_string())?;
142 let value = serde_json::from_str::<JsonValue>(text).map_err(|err| err.to_string())?;
143 let operation = value
144 .get("operation")
145 .and_then(JsonValue::as_str)
146 .and_then(ChangeOperation::from_wire_str)
147 .ok_or_else(|| "invalid replication operation".to_string())?;
148 let entity_bytes = value
149 .get("entity_bytes_hex")
150 .and_then(JsonValue::as_str)
151 .map(|value| hex_decode_string("entity_bytes_hex", value))
152 .transpose()?;
153
154 Ok(Self {
155 term: value
156 .get("term")
157 .and_then(JsonValue::as_u64)
158 .unwrap_or(DEFAULT_REPLICATION_TERM),
159 lsn: value.get("lsn").and_then(JsonValue::as_u64).unwrap_or(0),
160 timestamp: value
161 .get("timestamp")
162 .and_then(JsonValue::as_u64)
163 .unwrap_or(0),
164 operation,
165 collection: value
166 .get("collection")
167 .and_then(JsonValue::as_str)
168 .unwrap_or_default()
169 .to_string(),
170 entity_id: value
171 .get("rid")
172 .or_else(|| value.get("entity_id"))
173 .and_then(JsonValue::as_u64)
174 .unwrap_or(0),
175 entity_kind: value
176 .get("kind")
177 .or_else(|| value.get("entity_kind"))
178 .and_then(JsonValue::as_str)
179 .unwrap_or("entity")
180 .to_string(),
181 entity_bytes,
182 metadata: value.get("metadata").cloned(),
183 refresh_records: match value.get("refresh_records_hex") {
184 Some(JsonValue::Array(items)) => {
185 let mut out = Vec::with_capacity(items.len());
186 for item in items {
187 let hex_str = item
188 .as_str()
189 .ok_or_else(|| "refresh_records_hex entry not a string".to_string())?;
190 out.push(hex_decode_string("refresh_records_hex", hex_str)?);
191 }
192 Some(out)
193 }
194 None | Some(JsonValue::Null) => None,
195 _ => return Err("refresh_records_hex is not an array".to_string()),
196 },
197 })
198 }
199}
200
201pub fn public_item_kind(entity_kind: &str) -> &'static str {
202 match entity_kind {
203 "table" | "entity" | "row" => "row",
204 "graph_node" | "node" => "node",
205 "graph_edge" | "edge" => "edge",
206 "kv" => "kv",
207 "document" => "document",
208 "vector" => "vector",
209 other if other.contains("kv") => "kv",
210 other if other.contains("document") => "document",
211 other if other.contains("vector") => "vector",
212 _ => "item",
213 }
214}
215
216fn hex_decode_string(field: &'static str, value: &str) -> Result<Vec<u8>, String> {
217 hex_decode(field, value).map_err(|err| err.to_string())
218}
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223
224 #[test]
225 fn change_record_round_trips_json_wire_payload() {
226 let record = ChangeRecord {
227 term: 3,
228 lsn: 7,
229 timestamp: 1234,
230 operation: ChangeOperation::Update,
231 collection: "users".to_string(),
232 entity_id: 42,
233 entity_kind: "row".to_string(),
234 entity_bytes: Some(vec![1, 2, 3]),
235 metadata: Some(serde_json::json!({"role": "admin"})),
236 refresh_records: None,
237 };
238
239 let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
240
241 assert_eq!(decoded.term, record.term);
242 assert_eq!(decoded.lsn, record.lsn);
243 assert_eq!(decoded.collection, record.collection);
244 assert_eq!(decoded.entity_id, record.entity_id);
245 assert_eq!(decoded.entity_bytes, record.entity_bytes);
246 assert_eq!(decoded.metadata, record.metadata);
247 }
248
249 #[test]
250 fn refresh_records_round_trip_without_reordering() {
251 let records = vec![vec![0x10, 0x20, 0x30], vec![0xAA, 0xBB], Vec::new()];
252 let record =
253 ChangeRecord::for_refresh(11, 99, "mv_orders_summary", records.clone()).with_term(4);
254
255 let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
256
257 assert_eq!(decoded.term, 4);
258 assert_eq!(decoded.operation, ChangeOperation::Refresh);
259 assert_eq!(decoded.collection, "mv_orders_summary");
260 assert_eq!(decoded.refresh_records.as_deref(), Some(&records[..]));
261 }
262
263 #[test]
264 fn legacy_change_record_defaults_term() {
265 let legacy =
266 br#"{"lsn":9,"timestamp":1,"operation":"delete","collection":"users","rid":5,"kind":"row"}"#;
267
268 let decoded = ChangeRecord::decode(legacy).expect("decode legacy record");
269
270 assert_eq!(decoded.term, DEFAULT_REPLICATION_TERM);
271 assert_eq!(decoded.lsn, 9);
272 }
273}