1use serde::{Serialize, Deserialize};
35use std::io::Read;
36
37const ZSTD_MAGIC: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD];
39
40pub const CDC_STREAM_SUFFIX: &str = "__local__:cdc";
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum CdcOp {
46 Put,
47 Delete,
48}
49
50impl CdcOp {
51 pub fn as_str(&self) -> &'static str {
53 match self {
54 CdcOp::Put => "PUT",
55 CdcOp::Delete => "DEL",
56 }
57 }
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct CdcMeta {
63 pub content_type: String,
64 pub version: u64,
65 pub updated_at: i64,
66 #[serde(skip_serializing_if = "Option::is_none")]
67 pub trace_parent: Option<String>,
68}
69
70#[derive(Debug, Clone)]
72pub struct CdcEntry {
73 pub op: CdcOp,
75 pub key: String,
77 pub hash: Option<String>,
79 pub data: Option<Vec<u8>>,
81 pub meta: Option<String>,
83}
84
85impl CdcEntry {
86 pub fn put(
88 key: String,
89 hash: String,
90 content: &[u8],
91 content_type: &str,
92 version: u64,
93 updated_at: i64,
94 trace_parent: Option<String>,
95 ) -> Self {
96 let data = maybe_compress(content);
97 let meta = CdcMeta {
98 content_type: content_type.to_string(),
99 version,
100 updated_at,
101 trace_parent,
102 };
103 let meta_json = serde_json::to_string(&meta).unwrap_or_else(|_| "{}".to_string());
104
105 Self {
106 op: CdcOp::Put,
107 key,
108 hash: Some(hash),
109 data: Some(data),
110 meta: Some(meta_json),
111 }
112 }
113
114 pub fn delete(key: String) -> Self {
116 Self {
117 op: CdcOp::Delete,
118 key,
119 hash: None,
120 data: None,
121 meta: None,
122 }
123 }
124
125 pub fn to_redis_fields(&self) -> Vec<(&'static str, CdcFieldValue)> {
128 let mut fields = vec![
129 ("op", CdcFieldValue::Str(self.op.as_str())),
130 ("key", CdcFieldValue::String(self.key.clone())),
131 ];
132
133 if let Some(ref hash) = self.hash {
134 fields.push(("hash", CdcFieldValue::String(hash.clone())));
135 }
136 if let Some(ref data) = self.data {
137 fields.push(("data", CdcFieldValue::Bytes(data.clone())));
138 }
139 if let Some(ref meta) = self.meta {
140 fields.push(("meta", CdcFieldValue::String(meta.clone())));
141 }
142
143 fields
144 }
145}
146
147#[derive(Debug, Clone)]
149pub enum CdcFieldValue {
150 Str(&'static str),
151 String(String),
152 Bytes(Vec<u8>),
153}
154
155impl CdcFieldValue {
156 pub fn as_bytes(&self) -> &[u8] {
158 match self {
159 CdcFieldValue::Str(s) => s.as_bytes(),
160 CdcFieldValue::String(s) => s.as_bytes(),
161 CdcFieldValue::Bytes(b) => b,
162 }
163 }
164}
165
166pub fn maybe_compress(data: &[u8]) -> Vec<u8> {
171 if data.len() >= 4 && data[..4] == ZSTD_MAGIC {
173 return data.to_vec();
174 }
175
176 if data.len() < 64 {
178 return data.to_vec();
179 }
180
181 match zstd::encode_all(data, 3) {
183 Ok(compressed) => {
184 if compressed.len() < data.len() {
186 compressed
187 } else {
188 data.to_vec()
189 }
190 }
191 Err(_) => data.to_vec(),
192 }
193}
194
195pub fn maybe_decompress(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
199 if data.len() >= 4 && data[..4] == ZSTD_MAGIC {
200 let mut decoder = zstd::Decoder::new(data)?;
201 let mut decompressed = Vec::new();
202 decoder.read_to_end(&mut decompressed)?;
203 Ok(decompressed)
204 } else {
205 Ok(data.to_vec())
206 }
207}
208
209pub fn is_zstd_compressed(data: &[u8]) -> bool {
211 data.len() >= 4 && data[..4] == ZSTD_MAGIC
212}
213
214pub fn cdc_stream_key(prefix: Option<&str>) -> String {
216 match prefix {
217 Some(p) => format!("{}{}", p, CDC_STREAM_SUFFIX),
218 None => CDC_STREAM_SUFFIX.to_string(),
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225
226 #[test]
227 fn test_maybe_compress_small_data() {
228 let small = b"hello";
229 let result = maybe_compress(small);
230 assert_eq!(result, small.to_vec());
232 }
233
234 #[test]
235 fn test_maybe_compress_already_zstd() {
236 let mut fake_zstd = ZSTD_MAGIC.to_vec();
238 fake_zstd.extend_from_slice(b"already compressed data");
239
240 let result = maybe_compress(&fake_zstd);
241 assert_eq!(result, fake_zstd);
243 }
244
245 #[test]
246 fn test_maybe_compress_json() {
247 let json = r#"{"name":"John Doe","email":"john@example.com","data":"some repeated data repeated data repeated data repeated data"}"#;
249 let result = maybe_compress(json.as_bytes());
250
251 assert!(result.len() < json.len());
253 assert!(is_zstd_compressed(&result));
255 }
256
257 #[test]
258 fn test_roundtrip_compression() {
259 let original = r#"{"key":"value","nested":{"a":1,"b":2},"array":[1,2,3,4,5]}"#.repeat(10);
260 let compressed = maybe_compress(original.as_bytes());
261 let decompressed = maybe_decompress(&compressed).unwrap();
262
263 assert_eq!(decompressed, original.as_bytes());
264 }
265
266 #[test]
267 fn test_cdc_entry_put() {
268 let entry = CdcEntry::put(
269 "test.key".to_string(),
270 "abc123".to_string(),
271 b"test content that is long enough to compress well when repeated",
272 "json",
273 1,
274 1735776000000,
275 None,
276 );
277
278 assert_eq!(entry.op, CdcOp::Put);
279 assert_eq!(entry.key, "test.key");
280 assert_eq!(entry.hash, Some("abc123".to_string()));
281 assert!(entry.data.is_some());
282 assert!(entry.meta.is_some());
283
284 let meta: CdcMeta = serde_json::from_str(entry.meta.as_ref().unwrap()).unwrap();
285 assert_eq!(meta.content_type, "json");
286 assert_eq!(meta.version, 1);
287 }
288
289 #[test]
290 fn test_cdc_entry_delete() {
291 let entry = CdcEntry::delete("test.key".to_string());
292
293 assert_eq!(entry.op, CdcOp::Delete);
294 assert_eq!(entry.key, "test.key");
295 assert!(entry.hash.is_none());
296 assert!(entry.data.is_none());
297 assert!(entry.meta.is_none());
298 }
299
300 #[test]
301 fn test_cdc_stream_key() {
302 assert_eq!(cdc_stream_key(None), "__local__:cdc");
303 assert_eq!(cdc_stream_key(Some("myapp:")), "myapp:__local__:cdc");
304 assert_eq!(cdc_stream_key(Some("")), "__local__:cdc");
305 }
306
307 #[test]
308 fn test_to_redis_fields_put() {
309 let entry = CdcEntry::put(
310 "test.key".to_string(),
311 "hash123".to_string(),
312 b"data",
313 "binary",
314 2,
315 1000,
316 Some("00-trace-span-01".to_string()),
317 );
318
319 let fields = entry.to_redis_fields();
320 assert_eq!(fields.len(), 5); assert_eq!(fields[0].0, "op");
323 assert_eq!(fields[1].0, "key");
324 assert_eq!(fields[2].0, "hash");
325 assert_eq!(fields[3].0, "data");
326 assert_eq!(fields[4].0, "meta");
327 }
328
329 #[test]
330 fn test_to_redis_fields_delete() {
331 let entry = CdcEntry::delete("test.key".to_string());
332
333 let fields = entry.to_redis_fields();
334 assert_eq!(fields.len(), 2); assert_eq!(fields[0].0, "op");
337 assert_eq!(fields[1].0, "key");
338 }
339}