Skip to main content

reddb_wire/replication/
change_record.rs

1use serde_json::Value as JsonValue;
2
3use super::util::{hex_decode, hex_encode};
4
5pub const DEFAULT_REPLICATION_TERM: u64 = 1;
6pub type ChangeRecordJsonValue = JsonValue;
7
8pub fn parse_change_record_json_value(text: &str) -> Result<ChangeRecordJsonValue, String> {
9    serde_json::from_str(text).map_err(|err| err.to_string())
10}
11
12pub fn change_record_json_value_to_string(value: &ChangeRecordJsonValue) -> String {
13    serde_json::to_string(value).unwrap_or_else(|_| "null".to_string())
14}
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum ChangeOperation {
18    Insert,
19    Update,
20    Delete,
21    Refresh,
22}
23
24impl ChangeOperation {
25    #[allow(clippy::should_implement_trait)]
26    pub fn from_str(value: &str) -> Option<Self> {
27        Self::from_wire_str(value)
28    }
29
30    pub fn from_wire_str(value: &str) -> Option<Self> {
31        match value {
32            "insert" => Some(Self::Insert),
33            "update" => Some(Self::Update),
34            "delete" => Some(Self::Delete),
35            "refresh" => Some(Self::Refresh),
36            _ => None,
37        }
38    }
39
40    pub fn as_wire_str(&self) -> &'static str {
41        match self {
42            Self::Insert => "insert",
43            Self::Update => "update",
44            Self::Delete => "delete",
45            Self::Refresh => "refresh",
46        }
47    }
48
49    pub fn as_str(&self) -> &'static str {
50        self.as_wire_str()
51    }
52}
53
54#[derive(Debug, Clone)]
55pub struct ChangeRecord {
56    pub term: u64,
57    pub lsn: u64,
58    pub timestamp: u64,
59    pub operation: ChangeOperation,
60    pub collection: String,
61    pub entity_id: u64,
62    pub entity_kind: String,
63    pub entity_bytes: Option<Vec<u8>>,
64    pub metadata: Option<JsonValue>,
65    pub refresh_records: Option<Vec<Vec<u8>>>,
66}
67
68impl ChangeRecord {
69    pub fn for_refresh(
70        lsn: u64,
71        timestamp: u64,
72        collection: impl Into<String>,
73        records: Vec<Vec<u8>>,
74    ) -> Self {
75        Self {
76            term: DEFAULT_REPLICATION_TERM,
77            lsn,
78            timestamp,
79            operation: ChangeOperation::Refresh,
80            collection: collection.into(),
81            entity_id: 0,
82            entity_kind: "refresh".to_string(),
83            entity_bytes: None,
84            metadata: None,
85            refresh_records: Some(records),
86        }
87    }
88
89    pub fn to_json_value(&self) -> JsonValue {
90        let mut object = serde_json::Map::new();
91        object.insert("term".to_string(), JsonValue::Number(self.term.into()));
92        object.insert("lsn".to_string(), JsonValue::Number(self.lsn.into()));
93        object.insert(
94            "timestamp".to_string(),
95            JsonValue::Number(self.timestamp.into()),
96        );
97        object.insert(
98            "operation".to_string(),
99            JsonValue::String(self.operation.as_wire_str().to_string()),
100        );
101        object.insert(
102            "collection".to_string(),
103            JsonValue::String(self.collection.clone()),
104        );
105        object.insert("rid".to_string(), JsonValue::Number(self.entity_id.into()));
106        object.insert(
107            "kind".to_string(),
108            JsonValue::String(public_item_kind(&self.entity_kind).to_string()),
109        );
110        if let Some(bytes) = &self.entity_bytes {
111            object.insert(
112                "entity_bytes_hex".to_string(),
113                JsonValue::String(hex_encode(bytes)),
114            );
115        }
116        if let Some(metadata) = &self.metadata {
117            object.insert("metadata".to_string(), metadata.clone());
118        }
119        if let Some(records) = &self.refresh_records {
120            let arr = records
121                .iter()
122                .map(|bytes| JsonValue::String(hex_encode(bytes)))
123                .collect();
124            object.insert("refresh_records_hex".to_string(), JsonValue::Array(arr));
125        }
126        JsonValue::Object(object)
127    }
128
129    pub fn encode(&self) -> Vec<u8> {
130        serde_json::to_string(&self.to_json_value())
131            .unwrap_or_else(|_| "{}".to_string())
132            .into_bytes()
133    }
134
135    pub fn with_term(mut self, term: u64) -> Self {
136        self.term = term;
137        self
138    }
139
140    pub fn decode(bytes: &[u8]) -> Result<Self, String> {
141        let text = std::str::from_utf8(bytes).map_err(|err| err.to_string())?;
142        let value = serde_json::from_str::<JsonValue>(text).map_err(|err| err.to_string())?;
143        let operation = value
144            .get("operation")
145            .and_then(JsonValue::as_str)
146            .and_then(ChangeOperation::from_wire_str)
147            .ok_or_else(|| "invalid replication operation".to_string())?;
148        let entity_bytes = value
149            .get("entity_bytes_hex")
150            .and_then(JsonValue::as_str)
151            .map(|value| hex_decode_string("entity_bytes_hex", value))
152            .transpose()?;
153
154        Ok(Self {
155            term: value
156                .get("term")
157                .and_then(JsonValue::as_u64)
158                .unwrap_or(DEFAULT_REPLICATION_TERM),
159            lsn: value.get("lsn").and_then(JsonValue::as_u64).unwrap_or(0),
160            timestamp: value
161                .get("timestamp")
162                .and_then(JsonValue::as_u64)
163                .unwrap_or(0),
164            operation,
165            collection: value
166                .get("collection")
167                .and_then(JsonValue::as_str)
168                .unwrap_or_default()
169                .to_string(),
170            entity_id: value
171                .get("rid")
172                .or_else(|| value.get("entity_id"))
173                .and_then(JsonValue::as_u64)
174                .unwrap_or(0),
175            entity_kind: value
176                .get("kind")
177                .or_else(|| value.get("entity_kind"))
178                .and_then(JsonValue::as_str)
179                .unwrap_or("entity")
180                .to_string(),
181            entity_bytes,
182            metadata: value.get("metadata").cloned(),
183            refresh_records: match value.get("refresh_records_hex") {
184                Some(JsonValue::Array(items)) => {
185                    let mut out = Vec::with_capacity(items.len());
186                    for item in items {
187                        let hex_str = item
188                            .as_str()
189                            .ok_or_else(|| "refresh_records_hex entry not a string".to_string())?;
190                        out.push(hex_decode_string("refresh_records_hex", hex_str)?);
191                    }
192                    Some(out)
193                }
194                None | Some(JsonValue::Null) => None,
195                _ => return Err("refresh_records_hex is not an array".to_string()),
196            },
197        })
198    }
199}
200
201pub fn public_item_kind(entity_kind: &str) -> &'static str {
202    match entity_kind {
203        "table" | "entity" | "row" => "row",
204        "graph_node" | "node" => "node",
205        "graph_edge" | "edge" => "edge",
206        "kv" => "kv",
207        "document" => "document",
208        "vector" => "vector",
209        other if other.contains("kv") => "kv",
210        other if other.contains("document") => "document",
211        other if other.contains("vector") => "vector",
212        _ => "item",
213    }
214}
215
216fn hex_decode_string(field: &'static str, value: &str) -> Result<Vec<u8>, String> {
217    hex_decode(field, value).map_err(|err| err.to_string())
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223
224    #[test]
225    fn change_record_round_trips_json_wire_payload() {
226        let record = ChangeRecord {
227            term: 3,
228            lsn: 7,
229            timestamp: 1234,
230            operation: ChangeOperation::Update,
231            collection: "users".to_string(),
232            entity_id: 42,
233            entity_kind: "row".to_string(),
234            entity_bytes: Some(vec![1, 2, 3]),
235            metadata: Some(serde_json::json!({"role": "admin"})),
236            refresh_records: None,
237        };
238
239        let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
240
241        assert_eq!(decoded.term, record.term);
242        assert_eq!(decoded.lsn, record.lsn);
243        assert_eq!(decoded.collection, record.collection);
244        assert_eq!(decoded.entity_id, record.entity_id);
245        assert_eq!(decoded.entity_bytes, record.entity_bytes);
246        assert_eq!(decoded.metadata, record.metadata);
247    }
248
249    #[test]
250    fn refresh_records_round_trip_without_reordering() {
251        let records = vec![vec![0x10, 0x20, 0x30], vec![0xAA, 0xBB], Vec::new()];
252        let record =
253            ChangeRecord::for_refresh(11, 99, "mv_orders_summary", records.clone()).with_term(4);
254
255        let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
256
257        assert_eq!(decoded.term, 4);
258        assert_eq!(decoded.operation, ChangeOperation::Refresh);
259        assert_eq!(decoded.collection, "mv_orders_summary");
260        assert_eq!(decoded.refresh_records.as_deref(), Some(&records[..]));
261    }
262
263    #[test]
264    fn legacy_change_record_defaults_term() {
265        let legacy =
266            br#"{"lsn":9,"timestamp":1,"operation":"delete","collection":"users","rid":5,"kind":"row"}"#;
267
268        let decoded = ChangeRecord::decode(legacy).expect("decode legacy record");
269
270        assert_eq!(decoded.term, DEFAULT_REPLICATION_TERM);
271        assert_eq!(decoded.lsn, 9);
272    }
273}