sync_engine/cdc/
mod.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Change Data Capture (CDC) Stream support.
5//!
6//! Emits mutations to a Redis Stream for external replication agents.
7//! The stream key is `{redis_prefix}__local__:cdc` where `__local__` indicates
8//! node-local infrastructure that should not be replicated itself.
9//!
10//! # Stream Format
11//!
12//! ## PUT operation
13//! ```text
14//! XADD {prefix}__local__:cdc MAXLEN ~ 100000 *
15//!   op    "PUT"
16//!   key   "uk.nhs.patient.12345"
17//!   hash  "a1b2c3..."                # content_hash for dedup
18//!   data  <zstd(content)>            # compressed payload
19//!   meta  '{"content_type":"json","version":3,"updated_at":1735776000000}'
20//! ```
21//!
22//! ## DELETE operation
23//! ```text
24//! XADD {prefix}__local__:cdc MAXLEN ~ 100000 *
25//!   op    "DEL"
26//!   key   "uk.nhs.patient.12345"
27//! ```
28//!
29//! # Compression Strategy
30//!
31//! Data is zstd-compressed before writing to the stream, unless it already
32//! has zstd magic bytes (to avoid double-compression).
33
34use serde::{Serialize, Deserialize};
35use std::io::Read;
36
37/// zstd magic bytes: 0x28 0xB5 0x2F 0xFD
38const ZSTD_MAGIC: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD];
39
40/// CDC stream suffix (appended to redis_prefix)
41pub const CDC_STREAM_SUFFIX: &str = "__local__:cdc";
42
43/// CDC operation type
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum CdcOp {
46    Put,
47    Delete,
48}
49
50impl CdcOp {
51    /// Returns the string representation for the stream field
52    pub fn as_str(&self) -> &'static str {
53        match self {
54            CdcOp::Put => "PUT",
55            CdcOp::Delete => "DEL",
56        }
57    }
58}
59
60/// Metadata for CDC PUT entries
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct CdcMeta {
63    pub content_type: String,
64    pub version: u64,
65    pub updated_at: i64,
66    #[serde(skip_serializing_if = "Option::is_none")]
67    pub trace_parent: Option<String>,
68}
69
70/// A CDC entry ready to be written to the stream
71#[derive(Debug, Clone)]
72pub struct CdcEntry {
73    /// Operation type
74    pub op: CdcOp,
75    /// Object ID (key)
76    pub key: String,
77    /// Content hash for dedup (SHA256 of content) - only for PUT
78    pub hash: Option<String>,
79    /// Compressed data - only for PUT
80    pub data: Option<Vec<u8>>,
81    /// Metadata JSON - only for PUT
82    pub meta: Option<String>,
83}
84
85impl CdcEntry {
86    /// Create a PUT entry
87    pub fn put(
88        key: String,
89        hash: String,
90        content: &[u8],
91        content_type: &str,
92        version: u64,
93        updated_at: i64,
94        trace_parent: Option<String>,
95    ) -> Self {
96        let data = maybe_compress(content);
97        let meta = CdcMeta {
98            content_type: content_type.to_string(),
99            version,
100            updated_at,
101            trace_parent,
102        };
103        let meta_json = serde_json::to_string(&meta).unwrap_or_else(|_| "{}".to_string());
104
105        Self {
106            op: CdcOp::Put,
107            key,
108            hash: Some(hash),
109            data: Some(data),
110            meta: Some(meta_json),
111        }
112    }
113
114    /// Create a DELETE entry
115    pub fn delete(key: String) -> Self {
116        Self {
117            op: CdcOp::Delete,
118            key,
119            hash: None,
120            data: None,
121            meta: None,
122        }
123    }
124
125    /// Convert to Redis XADD field-value pairs
126    /// Returns Vec of (field, value) where value is either String or bytes
127    pub fn to_redis_fields(&self) -> Vec<(&'static str, CdcFieldValue)> {
128        let mut fields = vec![
129            ("op", CdcFieldValue::Str(self.op.as_str())),
130            ("key", CdcFieldValue::String(self.key.clone())),
131        ];
132
133        if let Some(ref hash) = self.hash {
134            fields.push(("hash", CdcFieldValue::String(hash.clone())));
135        }
136        if let Some(ref data) = self.data {
137            fields.push(("data", CdcFieldValue::Bytes(data.clone())));
138        }
139        if let Some(ref meta) = self.meta {
140            fields.push(("meta", CdcFieldValue::String(meta.clone())));
141        }
142
143        fields
144    }
145}
146
147/// Field value types for CDC entries
148#[derive(Debug, Clone)]
149pub enum CdcFieldValue {
150    Str(&'static str),
151    String(String),
152    Bytes(Vec<u8>),
153}
154
155impl CdcFieldValue {
156    /// Convert to bytes for Redis
157    pub fn as_bytes(&self) -> &[u8] {
158        match self {
159            CdcFieldValue::Str(s) => s.as_bytes(),
160            CdcFieldValue::String(s) => s.as_bytes(),
161            CdcFieldValue::Bytes(b) => b,
162        }
163    }
164}
165
166/// Compress data with zstd, unless it's already zstd-compressed.
167///
168/// Checks for zstd magic bytes at the start of the data to avoid
169/// double-compression. Returns original data if compression fails.
170pub fn maybe_compress(data: &[u8]) -> Vec<u8> {
171    // Skip if already zstd-compressed
172    if data.len() >= 4 && data[..4] == ZSTD_MAGIC {
173        return data.to_vec();
174    }
175
176    // Skip if too small to benefit from compression
177    if data.len() < 64 {
178        return data.to_vec();
179    }
180
181    // Compress with level 3 (good balance of speed/ratio)
182    match zstd::encode_all(data, 3) {
183        Ok(compressed) => {
184            // Only use compressed if it's actually smaller
185            if compressed.len() < data.len() {
186                compressed
187            } else {
188                data.to_vec()
189            }
190        }
191        Err(_) => data.to_vec(),
192    }
193}
194
195/// Decompress zstd data if it has the magic header, otherwise return as-is.
196///
197/// This is provided for consumers to use when reading from the stream.
198pub fn maybe_decompress(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
199    if data.len() >= 4 && data[..4] == ZSTD_MAGIC {
200        let mut decoder = zstd::Decoder::new(data)?;
201        let mut decompressed = Vec::new();
202        decoder.read_to_end(&mut decompressed)?;
203        Ok(decompressed)
204    } else {
205        Ok(data.to_vec())
206    }
207}
208
209/// Check if data appears to be zstd-compressed
210pub fn is_zstd_compressed(data: &[u8]) -> bool {
211    data.len() >= 4 && data[..4] == ZSTD_MAGIC
212}
213
214/// Build the full CDC stream key from an optional prefix
215pub fn cdc_stream_key(prefix: Option<&str>) -> String {
216    match prefix {
217        Some(p) => format!("{}{}", p, CDC_STREAM_SUFFIX),
218        None => CDC_STREAM_SUFFIX.to_string(),
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225
226    #[test]
227    fn test_maybe_compress_small_data() {
228        let small = b"hello";
229        let result = maybe_compress(small);
230        // Small data should not be compressed
231        assert_eq!(result, small.to_vec());
232    }
233
234    #[test]
235    fn test_maybe_compress_already_zstd() {
236        // Fake zstd header + some data
237        let mut fake_zstd = ZSTD_MAGIC.to_vec();
238        fake_zstd.extend_from_slice(b"already compressed data");
239        
240        let result = maybe_compress(&fake_zstd);
241        // Should pass through unchanged
242        assert_eq!(result, fake_zstd);
243    }
244
245    #[test]
246    fn test_maybe_compress_json() {
247        // JSON compresses well
248        let json = r#"{"name":"John Doe","email":"john@example.com","data":"some repeated data repeated data repeated data repeated data"}"#;
249        let result = maybe_compress(json.as_bytes());
250        
251        // Should be compressed (smaller)
252        assert!(result.len() < json.len());
253        // Should have zstd magic
254        assert!(is_zstd_compressed(&result));
255    }
256
257    #[test]
258    fn test_roundtrip_compression() {
259        let original = r#"{"key":"value","nested":{"a":1,"b":2},"array":[1,2,3,4,5]}"#.repeat(10);
260        let compressed = maybe_compress(original.as_bytes());
261        let decompressed = maybe_decompress(&compressed).unwrap();
262        
263        assert_eq!(decompressed, original.as_bytes());
264    }
265
266    #[test]
267    fn test_cdc_entry_put() {
268        let entry = CdcEntry::put(
269            "test.key".to_string(),
270            "abc123".to_string(),
271            b"test content that is long enough to compress well when repeated",
272            "json",
273            1,
274            1735776000000,
275            None,
276        );
277
278        assert_eq!(entry.op, CdcOp::Put);
279        assert_eq!(entry.key, "test.key");
280        assert_eq!(entry.hash, Some("abc123".to_string()));
281        assert!(entry.data.is_some());
282        assert!(entry.meta.is_some());
283        
284        let meta: CdcMeta = serde_json::from_str(entry.meta.as_ref().unwrap()).unwrap();
285        assert_eq!(meta.content_type, "json");
286        assert_eq!(meta.version, 1);
287    }
288
289    #[test]
290    fn test_cdc_entry_delete() {
291        let entry = CdcEntry::delete("test.key".to_string());
292
293        assert_eq!(entry.op, CdcOp::Delete);
294        assert_eq!(entry.key, "test.key");
295        assert!(entry.hash.is_none());
296        assert!(entry.data.is_none());
297        assert!(entry.meta.is_none());
298    }
299
300    #[test]
301    fn test_cdc_stream_key() {
302        assert_eq!(cdc_stream_key(None), "__local__:cdc");
303        assert_eq!(cdc_stream_key(Some("myapp:")), "myapp:__local__:cdc");
304        assert_eq!(cdc_stream_key(Some("")), "__local__:cdc");
305    }
306
307    #[test]
308    fn test_to_redis_fields_put() {
309        let entry = CdcEntry::put(
310            "test.key".to_string(),
311            "hash123".to_string(),
312            b"data",
313            "binary",
314            2,
315            1000,
316            Some("00-trace-span-01".to_string()),
317        );
318
319        let fields = entry.to_redis_fields();
320        assert_eq!(fields.len(), 5); // op, key, hash, data, meta
321        
322        assert_eq!(fields[0].0, "op");
323        assert_eq!(fields[1].0, "key");
324        assert_eq!(fields[2].0, "hash");
325        assert_eq!(fields[3].0, "data");
326        assert_eq!(fields[4].0, "meta");
327    }
328
329    #[test]
330    fn test_to_redis_fields_delete() {
331        let entry = CdcEntry::delete("test.key".to_string());
332
333        let fields = entry.to_redis_fields();
334        assert_eq!(fields.len(), 2); // op, key only
335        
336        assert_eq!(fields[0].0, "op");
337        assert_eq!(fields[1].0, "key");
338    }
339}