Skip to main content

reddb_wire/replication/
change_record.rs

1use serde_json::Value as JsonValue;
2
3use super::util::{hex_decode, hex_encode};
4
5pub const DEFAULT_REPLICATION_TERM: u64 = 1;
6pub type ChangeRecordJsonValue = JsonValue;
7
8pub fn parse_change_record_json_value(text: &str) -> Result<ChangeRecordJsonValue, String> {
9    serde_json::from_str(text).map_err(|err| err.to_string())
10}
11
12pub fn change_record_json_value_to_string(value: &ChangeRecordJsonValue) -> String {
13    serde_json::to_string(value).unwrap_or_else(|_| "null".to_string())
14}
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum ChangeOperation {
18    Insert,
19    Update,
20    Delete,
21    Refresh,
22}
23
24impl ChangeOperation {
25    #[allow(clippy::should_implement_trait)]
26    pub fn from_str(value: &str) -> Option<Self> {
27        Self::from_wire_str(value)
28    }
29
30    pub fn from_wire_str(value: &str) -> Option<Self> {
31        match value {
32            "insert" => Some(Self::Insert),
33            "update" => Some(Self::Update),
34            "delete" => Some(Self::Delete),
35            "refresh" => Some(Self::Refresh),
36            _ => None,
37        }
38    }
39
40    pub fn as_wire_str(&self) -> &'static str {
41        match self {
42            Self::Insert => "insert",
43            Self::Update => "update",
44            Self::Delete => "delete",
45            Self::Refresh => "refresh",
46        }
47    }
48
49    pub fn as_str(&self) -> &'static str {
50        self.as_wire_str()
51    }
52}
53
54#[derive(Debug, Clone)]
55pub struct ChangeRecord {
56    pub term: u64,
57    pub lsn: u64,
58    pub timestamp: u64,
59    pub operation: ChangeOperation,
60    pub collection: String,
61    pub entity_id: u64,
62    pub entity_kind: String,
63    pub entity_bytes: Option<Vec<u8>>,
64    pub metadata: Option<JsonValue>,
65    pub refresh_records: Option<Vec<Vec<u8>>>,
66    /// Issue #991 — stable identity of the range this user-data change
67    /// belongs to (`RangeId`, from the #989 ownership catalog), or `None`
68    /// for records that predate range replication. Carried so replicas and
69    /// recovery can route a change to its range and gate it against that
70    /// range's authority watermark.
71    pub range_id: Option<u64>,
72    /// Issue #991 — the owning range's `OwnershipEpoch` at the moment this
73    /// change was produced. The epoch bumps only when write authority moves
74    /// to a new owner, so a record whose epoch is behind the target range's
75    /// accepted epoch is a write from a deposed owner and is fenced. `None`
76    /// for legacy records and non-range-replicated changes.
77    pub ownership_epoch: Option<u64>,
78}
79
80impl ChangeRecord {
81    pub fn for_refresh(
82        lsn: u64,
83        timestamp: u64,
84        collection: impl Into<String>,
85        records: Vec<Vec<u8>>,
86    ) -> Self {
87        Self {
88            term: DEFAULT_REPLICATION_TERM,
89            lsn,
90            timestamp,
91            operation: ChangeOperation::Refresh,
92            collection: collection.into(),
93            entity_id: 0,
94            entity_kind: "refresh".to_string(),
95            entity_bytes: None,
96            metadata: None,
97            refresh_records: Some(records),
98            range_id: None,
99            ownership_epoch: None,
100        }
101    }
102
103    /// Stamp this record with the authority metadata of the range that owns
104    /// it (issue #991): the stable range identity and the owner's current
105    /// ownership epoch. The term is set independently via [`Self::with_term`].
106    pub fn with_range_authority(mut self, range_id: u64, ownership_epoch: u64) -> Self {
107        self.range_id = Some(range_id);
108        self.ownership_epoch = Some(ownership_epoch);
109        self
110    }
111
112    pub fn to_json_value(&self) -> JsonValue {
113        let mut object = serde_json::Map::new();
114        object.insert("term".to_string(), JsonValue::Number(self.term.into()));
115        object.insert("lsn".to_string(), JsonValue::Number(self.lsn.into()));
116        object.insert(
117            "timestamp".to_string(),
118            JsonValue::Number(self.timestamp.into()),
119        );
120        object.insert(
121            "operation".to_string(),
122            JsonValue::String(self.operation.as_wire_str().to_string()),
123        );
124        object.insert(
125            "collection".to_string(),
126            JsonValue::String(self.collection.clone()),
127        );
128        object.insert("rid".to_string(), JsonValue::Number(self.entity_id.into()));
129        object.insert(
130            "kind".to_string(),
131            JsonValue::String(public_item_kind(&self.entity_kind).to_string()),
132        );
133        if let Some(bytes) = &self.entity_bytes {
134            object.insert(
135                "entity_bytes_hex".to_string(),
136                JsonValue::String(hex_encode(bytes)),
137            );
138        }
139        if let Some(metadata) = &self.metadata {
140            object.insert("metadata".to_string(), metadata.clone());
141        }
142        if let Some(records) = &self.refresh_records {
143            let arr = records
144                .iter()
145                .map(|bytes| JsonValue::String(hex_encode(bytes)))
146                .collect();
147            object.insert("refresh_records_hex".to_string(), JsonValue::Array(arr));
148        }
149        // Issue #991 — range authority is omitted entirely when absent so a
150        // non-range-replicated record stays byte-for-byte the legacy shape.
151        if let Some(range_id) = self.range_id {
152            object.insert("range_id".to_string(), JsonValue::Number(range_id.into()));
153        }
154        if let Some(epoch) = self.ownership_epoch {
155            object.insert(
156                "ownership_epoch".to_string(),
157                JsonValue::Number(epoch.into()),
158            );
159        }
160        JsonValue::Object(object)
161    }
162
163    pub fn encode(&self) -> Vec<u8> {
164        serde_json::to_string(&self.to_json_value())
165            .unwrap_or_else(|_| "{}".to_string())
166            .into_bytes()
167    }
168
169    pub fn with_term(mut self, term: u64) -> Self {
170        self.term = term;
171        self
172    }
173
174    pub fn decode(bytes: &[u8]) -> Result<Self, String> {
175        let text = std::str::from_utf8(bytes).map_err(|err| err.to_string())?;
176        let value = serde_json::from_str::<JsonValue>(text).map_err(|err| err.to_string())?;
177        let operation = value
178            .get("operation")
179            .and_then(JsonValue::as_str)
180            .and_then(ChangeOperation::from_wire_str)
181            .ok_or_else(|| "invalid replication operation".to_string())?;
182        let entity_bytes = value
183            .get("entity_bytes_hex")
184            .and_then(JsonValue::as_str)
185            .map(|value| hex_decode_string("entity_bytes_hex", value))
186            .transpose()?;
187
188        Ok(Self {
189            term: value
190                .get("term")
191                .and_then(JsonValue::as_u64)
192                .unwrap_or(DEFAULT_REPLICATION_TERM),
193            lsn: value.get("lsn").and_then(JsonValue::as_u64).unwrap_or(0),
194            timestamp: value
195                .get("timestamp")
196                .and_then(JsonValue::as_u64)
197                .unwrap_or(0),
198            operation,
199            collection: value
200                .get("collection")
201                .and_then(JsonValue::as_str)
202                .unwrap_or_default()
203                .to_string(),
204            entity_id: value
205                .get("rid")
206                .or_else(|| value.get("entity_id"))
207                .and_then(JsonValue::as_u64)
208                .unwrap_or(0),
209            entity_kind: value
210                .get("kind")
211                .or_else(|| value.get("entity_kind"))
212                .and_then(JsonValue::as_str)
213                .unwrap_or("entity")
214                .to_string(),
215            entity_bytes,
216            metadata: value.get("metadata").cloned(),
217            refresh_records: match value.get("refresh_records_hex") {
218                Some(JsonValue::Array(items)) => {
219                    let mut out = Vec::with_capacity(items.len());
220                    for item in items {
221                        let hex_str = item
222                            .as_str()
223                            .ok_or_else(|| "refresh_records_hex entry not a string".to_string())?;
224                        out.push(hex_decode_string("refresh_records_hex", hex_str)?);
225                    }
226                    Some(out)
227                }
228                None | Some(JsonValue::Null) => None,
229                _ => return Err("refresh_records_hex is not an array".to_string()),
230            },
231            // Issue #991 — absent on legacy records (decodes to `None`), so
232            // old payloads keep round-tripping unchanged.
233            range_id: value.get("range_id").and_then(JsonValue::as_u64),
234            ownership_epoch: value.get("ownership_epoch").and_then(JsonValue::as_u64),
235        })
236    }
237}
238
239/// Issue #991 — the range-authority watermark a replica or recovery target
240/// holds for a single range: the minimum term and ownership epoch it will
241/// accept for that range's user-data changes.
242///
243/// The physical WAL stays the source of truth; this fence operates purely on
244/// the derived metadata each [`ChangeRecord`] carries. A record stamped for
245/// this range whose term or ownership epoch is behind the watermark is a write
246/// from a stale (deposed) owner or a superseded timeline and must be rejected
247/// before it is applied. Records for a *different* range, or records with no
248/// range identity at all (legacy / non-range-replicated), are not this fence's
249/// concern and pass through untouched.
250#[derive(Debug, Clone, Copy, PartialEq, Eq)]
251pub struct RangeAuthority {
252    pub range_id: u64,
253    pub min_term: u64,
254    pub min_ownership_epoch: u64,
255}
256
257/// Why a [`RangeAuthority`] rejected a record (issue #991).
258#[derive(Debug, Clone, Copy, PartialEq, Eq)]
259pub enum RangeAdmitError {
260    /// The record's term is behind the term this range has accepted — a
261    /// returning ex-owner streaming on a superseded timeline.
262    StaleTerm {
263        record_term: u64,
264        accepted_term: u64,
265    },
266    /// The record's ownership epoch is behind the range's accepted epoch — a
267    /// write produced by an owner that has since lost write authority.
268    StaleOwnershipEpoch {
269        record_epoch: u64,
270        accepted_epoch: u64,
271    },
272}
273
274impl RangeAuthority {
275    /// Decide whether `record` may be applied to this range. Only records
276    /// explicitly stamped for `self.range_id` are gated; everything else is
277    /// admitted. Term is checked before epoch so a stale timeline is reported
278    /// as such even when its epoch also lags.
279    pub fn admit(&self, record: &ChangeRecord) -> Result<(), RangeAdmitError> {
280        if record.range_id != Some(self.range_id) {
281            return Ok(());
282        }
283        if record.term < self.min_term {
284            return Err(RangeAdmitError::StaleTerm {
285                record_term: record.term,
286                accepted_term: self.min_term,
287            });
288        }
289        // A record stamped for this range but missing an epoch predates epoch
290        // fencing; admit it on term alone rather than fail closed on absence.
291        if let Some(epoch) = record.ownership_epoch {
292            if epoch < self.min_ownership_epoch {
293                return Err(RangeAdmitError::StaleOwnershipEpoch {
294                    record_epoch: epoch,
295                    accepted_epoch: self.min_ownership_epoch,
296                });
297            }
298        }
299        Ok(())
300    }
301}
302
303pub fn public_item_kind(entity_kind: &str) -> &'static str {
304    match entity_kind {
305        "table" | "entity" | "row" => "row",
306        "graph_node" | "node" => "node",
307        "graph_edge" | "edge" => "edge",
308        "kv" => "kv",
309        "document" => "document",
310        "vector" => "vector",
311        other if other.contains("kv") => "kv",
312        other if other.contains("document") => "document",
313        other if other.contains("vector") => "vector",
314        _ => "item",
315    }
316}
317
318fn hex_decode_string(field: &'static str, value: &str) -> Result<Vec<u8>, String> {
319    hex_decode(field, value).map_err(|err| err.to_string())
320}
321
322#[cfg(test)]
323mod tests {
324    use super::*;
325
326    #[test]
327    fn change_record_round_trips_json_wire_payload() {
328        let record = ChangeRecord {
329            term: 3,
330            lsn: 7,
331            timestamp: 1234,
332            operation: ChangeOperation::Update,
333            collection: "users".to_string(),
334            entity_id: 42,
335            entity_kind: "row".to_string(),
336            entity_bytes: Some(vec![1, 2, 3]),
337            metadata: Some(serde_json::json!({"role": "admin"})),
338            refresh_records: None,
339            range_id: None,
340            ownership_epoch: None,
341        };
342
343        let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
344
345        assert_eq!(decoded.term, record.term);
346        assert_eq!(decoded.lsn, record.lsn);
347        assert_eq!(decoded.collection, record.collection);
348        assert_eq!(decoded.entity_id, record.entity_id);
349        assert_eq!(decoded.entity_bytes, record.entity_bytes);
350        assert_eq!(decoded.metadata, record.metadata);
351    }
352
353    #[test]
354    fn range_authority_round_trips_on_the_json_wire() {
355        let record = ChangeRecord {
356            term: 5,
357            lsn: 12,
358            timestamp: 999,
359            operation: ChangeOperation::Insert,
360            collection: "orders".to_string(),
361            entity_id: 8,
362            entity_kind: "row".to_string(),
363            entity_bytes: Some(vec![9, 9]),
364            metadata: None,
365            refresh_records: None,
366            range_id: None,
367            ownership_epoch: None,
368        }
369        .with_range_authority(7, 3);
370
371        let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
372
373        assert_eq!(decoded.range_id, Some(7));
374        assert_eq!(decoded.ownership_epoch, Some(3));
375        assert_eq!(decoded.term, 5);
376    }
377
378    #[test]
379    fn legacy_record_without_range_authority_decodes_to_none() {
380        // A payload from before #991 carries neither field; decoding must
381        // leave both absent rather than fabricate a default that would later
382        // collide with a real range fence.
383        let legacy =
384            br#"{"term":2,"lsn":4,"timestamp":1,"operation":"insert","collection":"users","rid":1,"kind":"row"}"#;
385
386        let decoded = ChangeRecord::decode(legacy).expect("decode legacy");
387
388        assert_eq!(decoded.range_id, None);
389        assert_eq!(decoded.ownership_epoch, None);
390    }
391
392    #[test]
393    fn unstamped_record_omits_range_keys_from_the_wire() {
394        let record = ChangeRecord::for_refresh(1, 1, "mv", Vec::new());
395        let text = String::from_utf8(record.encode()).expect("utf8");
396        assert!(!text.contains("range_id"), "got {text}");
397        assert!(!text.contains("ownership_epoch"), "got {text}");
398    }
399
400    fn stamped(term: u64, range_id: u64, epoch: u64) -> ChangeRecord {
401        ChangeRecord {
402            term,
403            lsn: 1,
404            timestamp: 1,
405            operation: ChangeOperation::Insert,
406            collection: "c".to_string(),
407            entity_id: 1,
408            entity_kind: "row".to_string(),
409            entity_bytes: Some(vec![1]),
410            metadata: None,
411            refresh_records: None,
412            range_id: None,
413            ownership_epoch: None,
414        }
415        .with_range_authority(range_id, epoch)
416    }
417
418    #[test]
419    fn range_authority_admits_current_term_and_epoch() {
420        let fence = RangeAuthority {
421            range_id: 7,
422            min_term: 3,
423            min_ownership_epoch: 4,
424        };
425        assert_eq!(fence.admit(&stamped(3, 7, 4)), Ok(()));
426        assert_eq!(fence.admit(&stamped(9, 7, 9)), Ok(()));
427    }
428
429    #[test]
430    fn range_authority_rejects_stale_epoch_and_term() {
431        let fence = RangeAuthority {
432            range_id: 7,
433            min_term: 3,
434            min_ownership_epoch: 4,
435        };
436        assert_eq!(
437            fence.admit(&stamped(3, 7, 2)),
438            Err(RangeAdmitError::StaleOwnershipEpoch {
439                record_epoch: 2,
440                accepted_epoch: 4,
441            })
442        );
443        // Term is checked first, so a doubly-stale record reports the term.
444        assert_eq!(
445            fence.admit(&stamped(1, 7, 1)),
446            Err(RangeAdmitError::StaleTerm {
447                record_term: 1,
448                accepted_term: 3,
449            })
450        );
451    }
452
453    #[test]
454    fn range_authority_ignores_other_ranges_and_unstamped_records() {
455        let fence = RangeAuthority {
456            range_id: 7,
457            min_term: 5,
458            min_ownership_epoch: 5,
459        };
460        // Stale, but for a different range — not this fence's business.
461        assert_eq!(fence.admit(&stamped(1, 99, 1)), Ok(()));
462        // No range identity at all — legacy record passes through.
463        let mut legacy = stamped(1, 7, 1);
464        legacy.range_id = None;
465        legacy.ownership_epoch = None;
466        assert_eq!(fence.admit(&legacy), Ok(()));
467    }
468
469    #[test]
470    fn refresh_records_round_trip_without_reordering() {
471        let records = vec![vec![0x10, 0x20, 0x30], vec![0xAA, 0xBB], Vec::new()];
472        let record =
473            ChangeRecord::for_refresh(11, 99, "mv_orders_summary", records.clone()).with_term(4);
474
475        let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
476
477        assert_eq!(decoded.term, 4);
478        assert_eq!(decoded.operation, ChangeOperation::Refresh);
479        assert_eq!(decoded.collection, "mv_orders_summary");
480        assert_eq!(decoded.refresh_records.as_deref(), Some(&records[..]));
481    }
482
483    #[test]
484    fn legacy_change_record_defaults_term() {
485        let legacy =
486            br#"{"lsn":9,"timestamp":1,"operation":"delete","collection":"users","rid":5,"kind":"row"}"#;
487
488        let decoded = ChangeRecord::decode(legacy).expect("decode legacy record");
489
490        assert_eq!(decoded.term, DEFAULT_REPLICATION_TERM);
491        assert_eq!(decoded.lsn, 9);
492    }
493}