sync_engine/cdc/
mod.rs

1//! Change Data Capture (CDC) Stream support.
2//!
3//! Emits mutations to a Redis Stream for external replication agents.
4//! The stream key is `{redis_prefix}__local__:cdc` where `__local__` indicates
5//! node-local infrastructure that should not be replicated itself.
6//!
7//! # Stream Format
8//!
9//! ## PUT operation
10//! ```text
11//! XADD {prefix}__local__:cdc MAXLEN ~ 100000 *
12//!   op    "PUT"
13//!   key   "uk.nhs.patient.12345"
14//!   hash  "a1b2c3..."                # content_hash for dedup
15//!   data  <zstd(content)>            # compressed payload
16//!   meta  '{"content_type":"json","version":3,"updated_at":1735776000000}'
17//! ```
18//!
19//! ## DELETE operation
20//! ```text
21//! XADD {prefix}__local__:cdc MAXLEN ~ 100000 *
22//!   op    "DEL"
23//!   key   "uk.nhs.patient.12345"
24//! ```
25//!
26//! # Compression Strategy
27//!
28//! Data is zstd-compressed before writing to the stream, unless it already
29//! has zstd magic bytes (to avoid double-compression).
30
31use serde::{Serialize, Deserialize};
32use std::io::Read;
33
34/// zstd magic bytes: 0x28 0xB5 0x2F 0xFD
35const ZSTD_MAGIC: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD];
36
37/// CDC stream suffix (appended to redis_prefix)
38pub const CDC_STREAM_SUFFIX: &str = "__local__:cdc";
39
40/// CDC operation type
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum CdcOp {
43    Put,
44    Delete,
45}
46
47impl CdcOp {
48    /// Returns the string representation for the stream field
49    pub fn as_str(&self) -> &'static str {
50        match self {
51            CdcOp::Put => "PUT",
52            CdcOp::Delete => "DEL",
53        }
54    }
55}
56
57/// Metadata for CDC PUT entries
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct CdcMeta {
60    pub content_type: String,
61    pub version: u64,
62    pub updated_at: i64,
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub trace_parent: Option<String>,
65}
66
67/// A CDC entry ready to be written to the stream
68#[derive(Debug, Clone)]
69pub struct CdcEntry {
70    /// Operation type
71    pub op: CdcOp,
72    /// Object ID (key)
73    pub key: String,
74    /// Content hash for dedup (SHA256 of content) - only for PUT
75    pub hash: Option<String>,
76    /// Compressed data - only for PUT
77    pub data: Option<Vec<u8>>,
78    /// Metadata JSON - only for PUT
79    pub meta: Option<String>,
80}
81
82impl CdcEntry {
83    /// Create a PUT entry
84    pub fn put(
85        key: String,
86        hash: String,
87        content: &[u8],
88        content_type: &str,
89        version: u64,
90        updated_at: i64,
91        trace_parent: Option<String>,
92    ) -> Self {
93        let data = maybe_compress(content);
94        let meta = CdcMeta {
95            content_type: content_type.to_string(),
96            version,
97            updated_at,
98            trace_parent,
99        };
100        let meta_json = serde_json::to_string(&meta).unwrap_or_else(|_| "{}".to_string());
101
102        Self {
103            op: CdcOp::Put,
104            key,
105            hash: Some(hash),
106            data: Some(data),
107            meta: Some(meta_json),
108        }
109    }
110
111    /// Create a DELETE entry
112    pub fn delete(key: String) -> Self {
113        Self {
114            op: CdcOp::Delete,
115            key,
116            hash: None,
117            data: None,
118            meta: None,
119        }
120    }
121
122    /// Convert to Redis XADD field-value pairs
123    /// Returns Vec of (field, value) where value is either String or bytes
124    pub fn to_redis_fields(&self) -> Vec<(&'static str, CdcFieldValue)> {
125        let mut fields = vec![
126            ("op", CdcFieldValue::Str(self.op.as_str())),
127            ("key", CdcFieldValue::String(self.key.clone())),
128        ];
129
130        if let Some(ref hash) = self.hash {
131            fields.push(("hash", CdcFieldValue::String(hash.clone())));
132        }
133        if let Some(ref data) = self.data {
134            fields.push(("data", CdcFieldValue::Bytes(data.clone())));
135        }
136        if let Some(ref meta) = self.meta {
137            fields.push(("meta", CdcFieldValue::String(meta.clone())));
138        }
139
140        fields
141    }
142}
143
144/// Field value types for CDC entries
145#[derive(Debug, Clone)]
146pub enum CdcFieldValue {
147    Str(&'static str),
148    String(String),
149    Bytes(Vec<u8>),
150}
151
152impl CdcFieldValue {
153    /// Convert to bytes for Redis
154    pub fn as_bytes(&self) -> &[u8] {
155        match self {
156            CdcFieldValue::Str(s) => s.as_bytes(),
157            CdcFieldValue::String(s) => s.as_bytes(),
158            CdcFieldValue::Bytes(b) => b,
159        }
160    }
161}
162
163/// Compress data with zstd, unless it's already zstd-compressed.
164///
165/// Checks for zstd magic bytes at the start of the data to avoid
166/// double-compression. Returns original data if compression fails.
167pub fn maybe_compress(data: &[u8]) -> Vec<u8> {
168    // Skip if already zstd-compressed
169    if data.len() >= 4 && data[..4] == ZSTD_MAGIC {
170        return data.to_vec();
171    }
172
173    // Skip if too small to benefit from compression
174    if data.len() < 64 {
175        return data.to_vec();
176    }
177
178    // Compress with level 3 (good balance of speed/ratio)
179    match zstd::encode_all(data, 3) {
180        Ok(compressed) => {
181            // Only use compressed if it's actually smaller
182            if compressed.len() < data.len() {
183                compressed
184            } else {
185                data.to_vec()
186            }
187        }
188        Err(_) => data.to_vec(),
189    }
190}
191
192/// Decompress zstd data if it has the magic header, otherwise return as-is.
193///
194/// This is provided for consumers to use when reading from the stream.
195pub fn maybe_decompress(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
196    if data.len() >= 4 && data[..4] == ZSTD_MAGIC {
197        let mut decoder = zstd::Decoder::new(data)?;
198        let mut decompressed = Vec::new();
199        decoder.read_to_end(&mut decompressed)?;
200        Ok(decompressed)
201    } else {
202        Ok(data.to_vec())
203    }
204}
205
206/// Check if data appears to be zstd-compressed
207pub fn is_zstd_compressed(data: &[u8]) -> bool {
208    data.len() >= 4 && data[..4] == ZSTD_MAGIC
209}
210
211/// Build the full CDC stream key from an optional prefix
212pub fn cdc_stream_key(prefix: Option<&str>) -> String {
213    match prefix {
214        Some(p) => format!("{}{}", p, CDC_STREAM_SUFFIX),
215        None => CDC_STREAM_SUFFIX.to_string(),
216    }
217}
218
219#[cfg(test)]
220mod tests {
221    use super::*;
222
223    #[test]
224    fn test_maybe_compress_small_data() {
225        let small = b"hello";
226        let result = maybe_compress(small);
227        // Small data should not be compressed
228        assert_eq!(result, small.to_vec());
229    }
230
231    #[test]
232    fn test_maybe_compress_already_zstd() {
233        // Fake zstd header + some data
234        let mut fake_zstd = ZSTD_MAGIC.to_vec();
235        fake_zstd.extend_from_slice(b"already compressed data");
236        
237        let result = maybe_compress(&fake_zstd);
238        // Should pass through unchanged
239        assert_eq!(result, fake_zstd);
240    }
241
242    #[test]
243    fn test_maybe_compress_json() {
244        // JSON compresses well
245        let json = r#"{"name":"John Doe","email":"john@example.com","data":"some repeated data repeated data repeated data repeated data"}"#;
246        let result = maybe_compress(json.as_bytes());
247        
248        // Should be compressed (smaller)
249        assert!(result.len() < json.len());
250        // Should have zstd magic
251        assert!(is_zstd_compressed(&result));
252    }
253
254    #[test]
255    fn test_roundtrip_compression() {
256        let original = r#"{"key":"value","nested":{"a":1,"b":2},"array":[1,2,3,4,5]}"#.repeat(10);
257        let compressed = maybe_compress(original.as_bytes());
258        let decompressed = maybe_decompress(&compressed).unwrap();
259        
260        assert_eq!(decompressed, original.as_bytes());
261    }
262
263    #[test]
264    fn test_cdc_entry_put() {
265        let entry = CdcEntry::put(
266            "test.key".to_string(),
267            "abc123".to_string(),
268            b"test content that is long enough to compress well when repeated",
269            "json",
270            1,
271            1735776000000,
272            None,
273        );
274
275        assert_eq!(entry.op, CdcOp::Put);
276        assert_eq!(entry.key, "test.key");
277        assert_eq!(entry.hash, Some("abc123".to_string()));
278        assert!(entry.data.is_some());
279        assert!(entry.meta.is_some());
280        
281        let meta: CdcMeta = serde_json::from_str(entry.meta.as_ref().unwrap()).unwrap();
282        assert_eq!(meta.content_type, "json");
283        assert_eq!(meta.version, 1);
284    }
285
286    #[test]
287    fn test_cdc_entry_delete() {
288        let entry = CdcEntry::delete("test.key".to_string());
289
290        assert_eq!(entry.op, CdcOp::Delete);
291        assert_eq!(entry.key, "test.key");
292        assert!(entry.hash.is_none());
293        assert!(entry.data.is_none());
294        assert!(entry.meta.is_none());
295    }
296
297    #[test]
298    fn test_cdc_stream_key() {
299        assert_eq!(cdc_stream_key(None), "__local__:cdc");
300        assert_eq!(cdc_stream_key(Some("myapp:")), "myapp:__local__:cdc");
301        assert_eq!(cdc_stream_key(Some("")), "__local__:cdc");
302    }
303
304    #[test]
305    fn test_to_redis_fields_put() {
306        let entry = CdcEntry::put(
307            "test.key".to_string(),
308            "hash123".to_string(),
309            b"data",
310            "binary",
311            2,
312            1000,
313            Some("00-trace-span-01".to_string()),
314        );
315
316        let fields = entry.to_redis_fields();
317        assert_eq!(fields.len(), 5); // op, key, hash, data, meta
318        
319        assert_eq!(fields[0].0, "op");
320        assert_eq!(fields[1].0, "key");
321        assert_eq!(fields[2].0, "hash");
322        assert_eq!(fields[3].0, "data");
323        assert_eq!(fields[4].0, "meta");
324    }
325
326    #[test]
327    fn test_to_redis_fields_delete() {
328        let entry = CdcEntry::delete("test.key".to_string());
329
330        let fields = entry.to_redis_fields();
331        assert_eq!(fields.len(), 2); // op, key only
332        
333        assert_eq!(fields[0].0, "op");
334        assert_eq!(fields[1].0, "key");
335    }
336}