sync_engine/cdc/
mod.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Change Data Capture (CDC) Stream support.
5//!
6//! Emits mutations to a Redis Stream for external replication agents.
7//! The stream key is `{redis_prefix}:cdc` - each node writes to its own stream.
8//! Replication agents tail remote peers' CDC streams to replicate changes.
9//!
10//! # Stream Format
11//!
12//! ## PUT operation
13//! ```text
14//! XADD {prefix}:cdc MAXLEN ~ 100000 *
15//!   op    "PUT"
16//!   key   "uk.nhs.patient.12345"
17//!   hash  "a1b2c3..."                # content_hash for dedup
18//!   data  <zstd(content)>            # compressed payload
19//!   meta  '{"content_type":"json","version":3,"updated_at":1735776000000}'
20//! ```
21//!
22//! ## DELETE operation
23//! ```text
24//! XADD {prefix}:cdc MAXLEN ~ 100000 *
25//!   op    "DEL"
26//!   key   "uk.nhs.patient.12345"
27//! ```
28//!
29//! # Compression Strategy
30//!
31//! Data is zstd-compressed before writing to the stream, unless it already
32//! has zstd magic bytes (to avoid double-compression).
33
34use serde::{Serialize, Deserialize};
35use std::io::Read;
36
37/// zstd magic bytes: 0x28 0xB5 0x2F 0xFD
38const ZSTD_MAGIC: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD];
39
40/// CDC stream key suffix (appended to redis_prefix).
41/// Convention: redis_prefix includes trailing colon (e.g., "redsqrl:").
42/// Result: "redsqrl:cdc" or just "cdc" if no prefix.
43pub const CDC_STREAM_SUFFIX: &str = "cdc";
44
45/// CDC operation type
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum CdcOp {
48    Put,
49    Delete,
50}
51
52impl CdcOp {
53    /// Returns the string representation for the stream field
54    pub fn as_str(&self) -> &'static str {
55        match self {
56            CdcOp::Put => "PUT",
57            CdcOp::Delete => "DEL",
58        }
59    }
60}
61
62/// Metadata for CDC PUT entries
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct CdcMeta {
65    pub content_type: String,
66    pub version: u64,
67    pub updated_at: i64,
68    #[serde(skip_serializing_if = "Option::is_none")]
69    pub trace_parent: Option<String>,
70}
71
72/// A CDC entry ready to be written to the stream
73#[derive(Debug, Clone)]
74pub struct CdcEntry {
75    /// Operation type
76    pub op: CdcOp,
77    /// Object ID (key)
78    pub key: String,
79    /// Content hash for dedup (SHA256 of content) - only for PUT
80    pub hash: Option<String>,
81    /// Compressed data - only for PUT
82    pub data: Option<Vec<u8>>,
83    /// Metadata JSON - only for PUT
84    pub meta: Option<String>,
85}
86
87impl CdcEntry {
88    /// Create a PUT entry
89    pub fn put(
90        key: String,
91        hash: String,
92        content: &[u8],
93        content_type: &str,
94        version: u64,
95        updated_at: i64,
96        trace_parent: Option<String>,
97    ) -> Self {
98        let data = maybe_compress(content);
99        let meta = CdcMeta {
100            content_type: content_type.to_string(),
101            version,
102            updated_at,
103            trace_parent,
104        };
105        let meta_json = serde_json::to_string(&meta).unwrap_or_else(|_| "{}".to_string());
106
107        Self {
108            op: CdcOp::Put,
109            key,
110            hash: Some(hash),
111            data: Some(data),
112            meta: Some(meta_json),
113        }
114    }
115
116    /// Create a DELETE entry
117    pub fn delete(key: String) -> Self {
118        Self {
119            op: CdcOp::Delete,
120            key,
121            hash: None,
122            data: None,
123            meta: None,
124        }
125    }
126
127    /// Convert to Redis XADD field-value pairs
128    /// Returns Vec of (field, value) where value is either String or bytes
129    pub fn to_redis_fields(&self) -> Vec<(&'static str, CdcFieldValue)> {
130        let mut fields = vec![
131            ("op", CdcFieldValue::Str(self.op.as_str())),
132            ("key", CdcFieldValue::String(self.key.clone())),
133        ];
134
135        if let Some(ref hash) = self.hash {
136            fields.push(("hash", CdcFieldValue::String(hash.clone())));
137        }
138        if let Some(ref data) = self.data {
139            fields.push(("data", CdcFieldValue::Bytes(data.clone())));
140        }
141        if let Some(ref meta) = self.meta {
142            fields.push(("meta", CdcFieldValue::String(meta.clone())));
143        }
144
145        fields
146    }
147}
148
149/// Field value types for CDC entries
150#[derive(Debug, Clone)]
151pub enum CdcFieldValue {
152    Str(&'static str),
153    String(String),
154    Bytes(Vec<u8>),
155}
156
157impl CdcFieldValue {
158    /// Convert to bytes for Redis
159    pub fn as_bytes(&self) -> &[u8] {
160        match self {
161            CdcFieldValue::Str(s) => s.as_bytes(),
162            CdcFieldValue::String(s) => s.as_bytes(),
163            CdcFieldValue::Bytes(b) => b,
164        }
165    }
166}
167
168/// Compress data with zstd, unless it's already zstd-compressed.
169///
170/// Checks for zstd magic bytes at the start of the data to avoid
171/// double-compression. Returns original data if compression fails.
172pub fn maybe_compress(data: &[u8]) -> Vec<u8> {
173    // Skip if already zstd-compressed
174    if data.len() >= 4 && data[..4] == ZSTD_MAGIC {
175        return data.to_vec();
176    }
177
178    // Skip if too small to benefit from compression
179    if data.len() < 64 {
180        return data.to_vec();
181    }
182
183    // Compress with level 3 (good balance of speed/ratio)
184    match zstd::encode_all(data, 3) {
185        Ok(compressed) => {
186            // Only use compressed if it's actually smaller
187            if compressed.len() < data.len() {
188                compressed
189            } else {
190                data.to_vec()
191            }
192        }
193        Err(_) => data.to_vec(),
194    }
195}
196
197/// Decompress zstd data if it has the magic header, otherwise return as-is.
198///
199/// This is provided for consumers to use when reading from the stream.
200pub fn maybe_decompress(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
201    if data.len() >= 4 && data[..4] == ZSTD_MAGIC {
202        let mut decoder = zstd::Decoder::new(data)?;
203        let mut decompressed = Vec::new();
204        decoder.read_to_end(&mut decompressed)?;
205        Ok(decompressed)
206    } else {
207        Ok(data.to_vec())
208    }
209}
210
211/// Check if data appears to be zstd-compressed
212pub fn is_zstd_compressed(data: &[u8]) -> bool {
213    data.len() >= 4 && data[..4] == ZSTD_MAGIC
214}
215
216/// Build the full CDC stream key from an optional prefix
217pub fn cdc_stream_key(prefix: Option<&str>) -> String {
218    match prefix {
219        Some(p) => format!("{}{}", p, CDC_STREAM_SUFFIX),
220        None => CDC_STREAM_SUFFIX.to_string(),
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227
228    #[test]
229    fn test_maybe_compress_small_data() {
230        let small = b"hello";
231        let result = maybe_compress(small);
232        // Small data should not be compressed
233        assert_eq!(result, small.to_vec());
234    }
235
236    #[test]
237    fn test_maybe_compress_already_zstd() {
238        // Fake zstd header + some data
239        let mut fake_zstd = ZSTD_MAGIC.to_vec();
240        fake_zstd.extend_from_slice(b"already compressed data");
241        
242        let result = maybe_compress(&fake_zstd);
243        // Should pass through unchanged
244        assert_eq!(result, fake_zstd);
245    }
246
247    #[test]
248    fn test_maybe_compress_json() {
249        // JSON compresses well
250        let json = r#"{"name":"John Doe","email":"john@example.com","data":"some repeated data repeated data repeated data repeated data"}"#;
251        let result = maybe_compress(json.as_bytes());
252        
253        // Should be compressed (smaller)
254        assert!(result.len() < json.len());
255        // Should have zstd magic
256        assert!(is_zstd_compressed(&result));
257    }
258
259    #[test]
260    fn test_roundtrip_compression() {
261        let original = r#"{"key":"value","nested":{"a":1,"b":2},"array":[1,2,3,4,5]}"#.repeat(10);
262        let compressed = maybe_compress(original.as_bytes());
263        let decompressed = maybe_decompress(&compressed).unwrap();
264        
265        assert_eq!(decompressed, original.as_bytes());
266    }
267
268    #[test]
269    fn test_cdc_entry_put() {
270        let entry = CdcEntry::put(
271            "test.key".to_string(),
272            "abc123".to_string(),
273            b"test content that is long enough to compress well when repeated",
274            "json",
275            1,
276            1735776000000,
277            None,
278        );
279
280        assert_eq!(entry.op, CdcOp::Put);
281        assert_eq!(entry.key, "test.key");
282        assert_eq!(entry.hash, Some("abc123".to_string()));
283        assert!(entry.data.is_some());
284        assert!(entry.meta.is_some());
285        
286        let meta: CdcMeta = serde_json::from_str(entry.meta.as_ref().unwrap()).unwrap();
287        assert_eq!(meta.content_type, "json");
288        assert_eq!(meta.version, 1);
289    }
290
291    #[test]
292    fn test_cdc_entry_delete() {
293        let entry = CdcEntry::delete("test.key".to_string());
294
295        assert_eq!(entry.op, CdcOp::Delete);
296        assert_eq!(entry.key, "test.key");
297        assert!(entry.hash.is_none());
298        assert!(entry.data.is_none());
299        assert!(entry.meta.is_none());
300    }
301
302    #[test]
303    fn test_cdc_stream_key() {
304        // No prefix -> just "cdc"
305        assert_eq!(cdc_stream_key(None), "cdc");
306        // Prefix with trailing colon -> "myapp:cdc"
307        assert_eq!(cdc_stream_key(Some("myapp:")), "myapp:cdc");
308        // Empty prefix -> just "cdc"
309        assert_eq!(cdc_stream_key(Some("")), "cdc");
310    }
311
312    #[test]
313    fn test_to_redis_fields_put() {
314        let entry = CdcEntry::put(
315            "test.key".to_string(),
316            "hash123".to_string(),
317            b"data",
318            "binary",
319            2,
320            1000,
321            Some("00-trace-span-01".to_string()),
322        );
323
324        let fields = entry.to_redis_fields();
325        assert_eq!(fields.len(), 5); // op, key, hash, data, meta
326        
327        assert_eq!(fields[0].0, "op");
328        assert_eq!(fields[1].0, "key");
329        assert_eq!(fields[2].0, "hash");
330        assert_eq!(fields[3].0, "data");
331        assert_eq!(fields[4].0, "meta");
332    }
333
334    #[test]
335    fn test_to_redis_fields_delete() {
336        let entry = CdcEntry::delete("test.key".to_string());
337
338        let fields = entry.to_redis_fields();
339        assert_eq!(fields.len(), 2); // op, key only
340        
341        assert_eq!(fields[0].0, "op");
342        assert_eq!(fields[1].0, "key");
343    }
344}