1use serde_json::Value as JsonValue;
2
3use super::util::{hex_decode, hex_encode};
4
5pub const DEFAULT_REPLICATION_TERM: u64 = 1;
6pub type ChangeRecordJsonValue = JsonValue;
7
8pub fn parse_change_record_json_value(text: &str) -> Result<ChangeRecordJsonValue, String> {
9 serde_json::from_str(text).map_err(|err| err.to_string())
10}
11
12pub fn change_record_json_value_to_string(value: &ChangeRecordJsonValue) -> String {
13 serde_json::to_string(value).unwrap_or_else(|_| "null".to_string())
14}
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum ChangeOperation {
18 Insert,
19 Update,
20 Delete,
21 Refresh,
22}
23
24impl ChangeOperation {
25 #[allow(clippy::should_implement_trait)]
26 pub fn from_str(value: &str) -> Option<Self> {
27 Self::from_wire_str(value)
28 }
29
30 pub fn from_wire_str(value: &str) -> Option<Self> {
31 match value {
32 "insert" => Some(Self::Insert),
33 "update" => Some(Self::Update),
34 "delete" => Some(Self::Delete),
35 "refresh" => Some(Self::Refresh),
36 _ => None,
37 }
38 }
39
40 pub fn as_wire_str(&self) -> &'static str {
41 match self {
42 Self::Insert => "insert",
43 Self::Update => "update",
44 Self::Delete => "delete",
45 Self::Refresh => "refresh",
46 }
47 }
48
49 pub fn as_str(&self) -> &'static str {
50 self.as_wire_str()
51 }
52}
53
54#[derive(Debug, Clone)]
55pub struct ChangeRecord {
56 pub term: u64,
57 pub lsn: u64,
58 pub timestamp: u64,
59 pub operation: ChangeOperation,
60 pub collection: String,
61 pub entity_id: u64,
62 pub entity_kind: String,
63 pub entity_bytes: Option<Vec<u8>>,
64 pub metadata: Option<JsonValue>,
65 pub refresh_records: Option<Vec<Vec<u8>>>,
66 pub range_id: Option<u64>,
72 pub ownership_epoch: Option<u64>,
78}
79
80impl ChangeRecord {
81 pub fn for_refresh(
82 lsn: u64,
83 timestamp: u64,
84 collection: impl Into<String>,
85 records: Vec<Vec<u8>>,
86 ) -> Self {
87 Self {
88 term: DEFAULT_REPLICATION_TERM,
89 lsn,
90 timestamp,
91 operation: ChangeOperation::Refresh,
92 collection: collection.into(),
93 entity_id: 0,
94 entity_kind: "refresh".to_string(),
95 entity_bytes: None,
96 metadata: None,
97 refresh_records: Some(records),
98 range_id: None,
99 ownership_epoch: None,
100 }
101 }
102
103 pub fn with_range_authority(mut self, range_id: u64, ownership_epoch: u64) -> Self {
107 self.range_id = Some(range_id);
108 self.ownership_epoch = Some(ownership_epoch);
109 self
110 }
111
112 pub fn to_json_value(&self) -> JsonValue {
113 let mut object = serde_json::Map::new();
114 object.insert("term".to_string(), JsonValue::Number(self.term.into()));
115 object.insert("lsn".to_string(), JsonValue::Number(self.lsn.into()));
116 object.insert(
117 "timestamp".to_string(),
118 JsonValue::Number(self.timestamp.into()),
119 );
120 object.insert(
121 "operation".to_string(),
122 JsonValue::String(self.operation.as_wire_str().to_string()),
123 );
124 object.insert(
125 "collection".to_string(),
126 JsonValue::String(self.collection.clone()),
127 );
128 object.insert("rid".to_string(), JsonValue::Number(self.entity_id.into()));
129 object.insert(
130 "kind".to_string(),
131 JsonValue::String(public_item_kind(&self.entity_kind).to_string()),
132 );
133 if let Some(bytes) = &self.entity_bytes {
134 object.insert(
135 "entity_bytes_hex".to_string(),
136 JsonValue::String(hex_encode(bytes)),
137 );
138 }
139 if let Some(metadata) = &self.metadata {
140 object.insert("metadata".to_string(), metadata.clone());
141 }
142 if let Some(records) = &self.refresh_records {
143 let arr = records
144 .iter()
145 .map(|bytes| JsonValue::String(hex_encode(bytes)))
146 .collect();
147 object.insert("refresh_records_hex".to_string(), JsonValue::Array(arr));
148 }
149 if let Some(range_id) = self.range_id {
152 object.insert("range_id".to_string(), JsonValue::Number(range_id.into()));
153 }
154 if let Some(epoch) = self.ownership_epoch {
155 object.insert(
156 "ownership_epoch".to_string(),
157 JsonValue::Number(epoch.into()),
158 );
159 }
160 JsonValue::Object(object)
161 }
162
163 pub fn encode(&self) -> Vec<u8> {
164 serde_json::to_string(&self.to_json_value())
165 .unwrap_or_else(|_| "{}".to_string())
166 .into_bytes()
167 }
168
169 pub fn with_term(mut self, term: u64) -> Self {
170 self.term = term;
171 self
172 }
173
174 pub fn decode(bytes: &[u8]) -> Result<Self, String> {
175 let text = std::str::from_utf8(bytes).map_err(|err| err.to_string())?;
176 let value = serde_json::from_str::<JsonValue>(text).map_err(|err| err.to_string())?;
177 let operation = value
178 .get("operation")
179 .and_then(JsonValue::as_str)
180 .and_then(ChangeOperation::from_wire_str)
181 .ok_or_else(|| "invalid replication operation".to_string())?;
182 let entity_bytes = value
183 .get("entity_bytes_hex")
184 .and_then(JsonValue::as_str)
185 .map(|value| hex_decode_string("entity_bytes_hex", value))
186 .transpose()?;
187
188 Ok(Self {
189 term: value
190 .get("term")
191 .and_then(JsonValue::as_u64)
192 .unwrap_or(DEFAULT_REPLICATION_TERM),
193 lsn: value.get("lsn").and_then(JsonValue::as_u64).unwrap_or(0),
194 timestamp: value
195 .get("timestamp")
196 .and_then(JsonValue::as_u64)
197 .unwrap_or(0),
198 operation,
199 collection: value
200 .get("collection")
201 .and_then(JsonValue::as_str)
202 .unwrap_or_default()
203 .to_string(),
204 entity_id: value
205 .get("rid")
206 .or_else(|| value.get("entity_id"))
207 .and_then(JsonValue::as_u64)
208 .unwrap_or(0),
209 entity_kind: value
210 .get("kind")
211 .or_else(|| value.get("entity_kind"))
212 .and_then(JsonValue::as_str)
213 .unwrap_or("entity")
214 .to_string(),
215 entity_bytes,
216 metadata: value.get("metadata").cloned(),
217 refresh_records: match value.get("refresh_records_hex") {
218 Some(JsonValue::Array(items)) => {
219 let mut out = Vec::with_capacity(items.len());
220 for item in items {
221 let hex_str = item
222 .as_str()
223 .ok_or_else(|| "refresh_records_hex entry not a string".to_string())?;
224 out.push(hex_decode_string("refresh_records_hex", hex_str)?);
225 }
226 Some(out)
227 }
228 None | Some(JsonValue::Null) => None,
229 _ => return Err("refresh_records_hex is not an array".to_string()),
230 },
231 range_id: value.get("range_id").and_then(JsonValue::as_u64),
234 ownership_epoch: value.get("ownership_epoch").and_then(JsonValue::as_u64),
235 })
236 }
237}
238
239#[derive(Debug, Clone, Copy, PartialEq, Eq)]
251pub struct RangeAuthority {
252 pub range_id: u64,
253 pub min_term: u64,
254 pub min_ownership_epoch: u64,
255}
256
257#[derive(Debug, Clone, Copy, PartialEq, Eq)]
259pub enum RangeAdmitError {
260 StaleTerm {
263 record_term: u64,
264 accepted_term: u64,
265 },
266 StaleOwnershipEpoch {
269 record_epoch: u64,
270 accepted_epoch: u64,
271 },
272}
273
274impl RangeAuthority {
275 pub fn admit(&self, record: &ChangeRecord) -> Result<(), RangeAdmitError> {
280 if record.range_id != Some(self.range_id) {
281 return Ok(());
282 }
283 if record.term < self.min_term {
284 return Err(RangeAdmitError::StaleTerm {
285 record_term: record.term,
286 accepted_term: self.min_term,
287 });
288 }
289 if let Some(epoch) = record.ownership_epoch {
292 if epoch < self.min_ownership_epoch {
293 return Err(RangeAdmitError::StaleOwnershipEpoch {
294 record_epoch: epoch,
295 accepted_epoch: self.min_ownership_epoch,
296 });
297 }
298 }
299 Ok(())
300 }
301}
302
303pub fn public_item_kind(entity_kind: &str) -> &'static str {
304 match entity_kind {
305 "table" | "entity" | "row" => "row",
306 "graph_node" | "node" => "node",
307 "graph_edge" | "edge" => "edge",
308 "kv" => "kv",
309 "document" => "document",
310 "vector" => "vector",
311 other if other.contains("kv") => "kv",
312 other if other.contains("document") => "document",
313 other if other.contains("vector") => "vector",
314 _ => "item",
315 }
316}
317
318fn hex_decode_string(field: &'static str, value: &str) -> Result<Vec<u8>, String> {
319 hex_decode(field, value).map_err(|err| err.to_string())
320}
321
322#[cfg(test)]
323mod tests {
324 use super::*;
325
326 #[test]
327 fn change_record_round_trips_json_wire_payload() {
328 let record = ChangeRecord {
329 term: 3,
330 lsn: 7,
331 timestamp: 1234,
332 operation: ChangeOperation::Update,
333 collection: "users".to_string(),
334 entity_id: 42,
335 entity_kind: "row".to_string(),
336 entity_bytes: Some(vec![1, 2, 3]),
337 metadata: Some(serde_json::json!({"role": "admin"})),
338 refresh_records: None,
339 range_id: None,
340 ownership_epoch: None,
341 };
342
343 let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
344
345 assert_eq!(decoded.term, record.term);
346 assert_eq!(decoded.lsn, record.lsn);
347 assert_eq!(decoded.collection, record.collection);
348 assert_eq!(decoded.entity_id, record.entity_id);
349 assert_eq!(decoded.entity_bytes, record.entity_bytes);
350 assert_eq!(decoded.metadata, record.metadata);
351 }
352
353 #[test]
354 fn range_authority_round_trips_on_the_json_wire() {
355 let record = ChangeRecord {
356 term: 5,
357 lsn: 12,
358 timestamp: 999,
359 operation: ChangeOperation::Insert,
360 collection: "orders".to_string(),
361 entity_id: 8,
362 entity_kind: "row".to_string(),
363 entity_bytes: Some(vec![9, 9]),
364 metadata: None,
365 refresh_records: None,
366 range_id: None,
367 ownership_epoch: None,
368 }
369 .with_range_authority(7, 3);
370
371 let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
372
373 assert_eq!(decoded.range_id, Some(7));
374 assert_eq!(decoded.ownership_epoch, Some(3));
375 assert_eq!(decoded.term, 5);
376 }
377
378 #[test]
379 fn legacy_record_without_range_authority_decodes_to_none() {
380 let legacy =
384 br#"{"term":2,"lsn":4,"timestamp":1,"operation":"insert","collection":"users","rid":1,"kind":"row"}"#;
385
386 let decoded = ChangeRecord::decode(legacy).expect("decode legacy");
387
388 assert_eq!(decoded.range_id, None);
389 assert_eq!(decoded.ownership_epoch, None);
390 }
391
392 #[test]
393 fn unstamped_record_omits_range_keys_from_the_wire() {
394 let record = ChangeRecord::for_refresh(1, 1, "mv", Vec::new());
395 let text = String::from_utf8(record.encode()).expect("utf8");
396 assert!(!text.contains("range_id"), "got {text}");
397 assert!(!text.contains("ownership_epoch"), "got {text}");
398 }
399
400 fn stamped(term: u64, range_id: u64, epoch: u64) -> ChangeRecord {
401 ChangeRecord {
402 term,
403 lsn: 1,
404 timestamp: 1,
405 operation: ChangeOperation::Insert,
406 collection: "c".to_string(),
407 entity_id: 1,
408 entity_kind: "row".to_string(),
409 entity_bytes: Some(vec![1]),
410 metadata: None,
411 refresh_records: None,
412 range_id: None,
413 ownership_epoch: None,
414 }
415 .with_range_authority(range_id, epoch)
416 }
417
418 #[test]
419 fn range_authority_admits_current_term_and_epoch() {
420 let fence = RangeAuthority {
421 range_id: 7,
422 min_term: 3,
423 min_ownership_epoch: 4,
424 };
425 assert_eq!(fence.admit(&stamped(3, 7, 4)), Ok(()));
426 assert_eq!(fence.admit(&stamped(9, 7, 9)), Ok(()));
427 }
428
429 #[test]
430 fn range_authority_rejects_stale_epoch_and_term() {
431 let fence = RangeAuthority {
432 range_id: 7,
433 min_term: 3,
434 min_ownership_epoch: 4,
435 };
436 assert_eq!(
437 fence.admit(&stamped(3, 7, 2)),
438 Err(RangeAdmitError::StaleOwnershipEpoch {
439 record_epoch: 2,
440 accepted_epoch: 4,
441 })
442 );
443 assert_eq!(
445 fence.admit(&stamped(1, 7, 1)),
446 Err(RangeAdmitError::StaleTerm {
447 record_term: 1,
448 accepted_term: 3,
449 })
450 );
451 }
452
453 #[test]
454 fn range_authority_ignores_other_ranges_and_unstamped_records() {
455 let fence = RangeAuthority {
456 range_id: 7,
457 min_term: 5,
458 min_ownership_epoch: 5,
459 };
460 assert_eq!(fence.admit(&stamped(1, 99, 1)), Ok(()));
462 let mut legacy = stamped(1, 7, 1);
464 legacy.range_id = None;
465 legacy.ownership_epoch = None;
466 assert_eq!(fence.admit(&legacy), Ok(()));
467 }
468
469 #[test]
470 fn refresh_records_round_trip_without_reordering() {
471 let records = vec![vec![0x10, 0x20, 0x30], vec![0xAA, 0xBB], Vec::new()];
472 let record =
473 ChangeRecord::for_refresh(11, 99, "mv_orders_summary", records.clone()).with_term(4);
474
475 let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
476
477 assert_eq!(decoded.term, 4);
478 assert_eq!(decoded.operation, ChangeOperation::Refresh);
479 assert_eq!(decoded.collection, "mv_orders_summary");
480 assert_eq!(decoded.refresh_records.as_deref(), Some(&records[..]));
481 }
482
483 #[test]
484 fn legacy_change_record_defaults_term() {
485 let legacy =
486 br#"{"lsn":9,"timestamp":1,"operation":"delete","collection":"users","rid":5,"kind":"row"}"#;
487
488 let decoded = ChangeRecord::decode(legacy).expect("decode legacy record");
489
490 assert_eq!(decoded.term, DEFAULT_REPLICATION_TERM);
491 assert_eq!(decoded.lsn, 9);
492 }
493}