1use serde::{Serialize, Deserialize};
32use std::io::Read;
33
34const ZSTD_MAGIC: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD];
36
37pub const CDC_STREAM_SUFFIX: &str = "__local__:cdc";
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum CdcOp {
43 Put,
44 Delete,
45}
46
47impl CdcOp {
48 pub fn as_str(&self) -> &'static str {
50 match self {
51 CdcOp::Put => "PUT",
52 CdcOp::Delete => "DEL",
53 }
54 }
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct CdcMeta {
60 pub content_type: String,
61 pub version: u64,
62 pub updated_at: i64,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 pub trace_parent: Option<String>,
65}
66
67#[derive(Debug, Clone)]
69pub struct CdcEntry {
70 pub op: CdcOp,
72 pub key: String,
74 pub hash: Option<String>,
76 pub data: Option<Vec<u8>>,
78 pub meta: Option<String>,
80}
81
82impl CdcEntry {
83 pub fn put(
85 key: String,
86 hash: String,
87 content: &[u8],
88 content_type: &str,
89 version: u64,
90 updated_at: i64,
91 trace_parent: Option<String>,
92 ) -> Self {
93 let data = maybe_compress(content);
94 let meta = CdcMeta {
95 content_type: content_type.to_string(),
96 version,
97 updated_at,
98 trace_parent,
99 };
100 let meta_json = serde_json::to_string(&meta).unwrap_or_else(|_| "{}".to_string());
101
102 Self {
103 op: CdcOp::Put,
104 key,
105 hash: Some(hash),
106 data: Some(data),
107 meta: Some(meta_json),
108 }
109 }
110
111 pub fn delete(key: String) -> Self {
113 Self {
114 op: CdcOp::Delete,
115 key,
116 hash: None,
117 data: None,
118 meta: None,
119 }
120 }
121
122 pub fn to_redis_fields(&self) -> Vec<(&'static str, CdcFieldValue)> {
125 let mut fields = vec![
126 ("op", CdcFieldValue::Str(self.op.as_str())),
127 ("key", CdcFieldValue::String(self.key.clone())),
128 ];
129
130 if let Some(ref hash) = self.hash {
131 fields.push(("hash", CdcFieldValue::String(hash.clone())));
132 }
133 if let Some(ref data) = self.data {
134 fields.push(("data", CdcFieldValue::Bytes(data.clone())));
135 }
136 if let Some(ref meta) = self.meta {
137 fields.push(("meta", CdcFieldValue::String(meta.clone())));
138 }
139
140 fields
141 }
142}
143
144#[derive(Debug, Clone)]
146pub enum CdcFieldValue {
147 Str(&'static str),
148 String(String),
149 Bytes(Vec<u8>),
150}
151
152impl CdcFieldValue {
153 pub fn as_bytes(&self) -> &[u8] {
155 match self {
156 CdcFieldValue::Str(s) => s.as_bytes(),
157 CdcFieldValue::String(s) => s.as_bytes(),
158 CdcFieldValue::Bytes(b) => b,
159 }
160 }
161}
162
163pub fn maybe_compress(data: &[u8]) -> Vec<u8> {
168 if data.len() >= 4 && data[..4] == ZSTD_MAGIC {
170 return data.to_vec();
171 }
172
173 if data.len() < 64 {
175 return data.to_vec();
176 }
177
178 match zstd::encode_all(data, 3) {
180 Ok(compressed) => {
181 if compressed.len() < data.len() {
183 compressed
184 } else {
185 data.to_vec()
186 }
187 }
188 Err(_) => data.to_vec(),
189 }
190}
191
192pub fn maybe_decompress(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
196 if data.len() >= 4 && data[..4] == ZSTD_MAGIC {
197 let mut decoder = zstd::Decoder::new(data)?;
198 let mut decompressed = Vec::new();
199 decoder.read_to_end(&mut decompressed)?;
200 Ok(decompressed)
201 } else {
202 Ok(data.to_vec())
203 }
204}
205
206pub fn is_zstd_compressed(data: &[u8]) -> bool {
208 data.len() >= 4 && data[..4] == ZSTD_MAGIC
209}
210
211pub fn cdc_stream_key(prefix: Option<&str>) -> String {
213 match prefix {
214 Some(p) => format!("{}{}", p, CDC_STREAM_SUFFIX),
215 None => CDC_STREAM_SUFFIX.to_string(),
216 }
217}
218
219#[cfg(test)]
220mod tests {
221 use super::*;
222
223 #[test]
224 fn test_maybe_compress_small_data() {
225 let small = b"hello";
226 let result = maybe_compress(small);
227 assert_eq!(result, small.to_vec());
229 }
230
231 #[test]
232 fn test_maybe_compress_already_zstd() {
233 let mut fake_zstd = ZSTD_MAGIC.to_vec();
235 fake_zstd.extend_from_slice(b"already compressed data");
236
237 let result = maybe_compress(&fake_zstd);
238 assert_eq!(result, fake_zstd);
240 }
241
242 #[test]
243 fn test_maybe_compress_json() {
244 let json = r#"{"name":"John Doe","email":"john@example.com","data":"some repeated data repeated data repeated data repeated data"}"#;
246 let result = maybe_compress(json.as_bytes());
247
248 assert!(result.len() < json.len());
250 assert!(is_zstd_compressed(&result));
252 }
253
254 #[test]
255 fn test_roundtrip_compression() {
256 let original = r#"{"key":"value","nested":{"a":1,"b":2},"array":[1,2,3,4,5]}"#.repeat(10);
257 let compressed = maybe_compress(original.as_bytes());
258 let decompressed = maybe_decompress(&compressed).unwrap();
259
260 assert_eq!(decompressed, original.as_bytes());
261 }
262
263 #[test]
264 fn test_cdc_entry_put() {
265 let entry = CdcEntry::put(
266 "test.key".to_string(),
267 "abc123".to_string(),
268 b"test content that is long enough to compress well when repeated",
269 "json",
270 1,
271 1735776000000,
272 None,
273 );
274
275 assert_eq!(entry.op, CdcOp::Put);
276 assert_eq!(entry.key, "test.key");
277 assert_eq!(entry.hash, Some("abc123".to_string()));
278 assert!(entry.data.is_some());
279 assert!(entry.meta.is_some());
280
281 let meta: CdcMeta = serde_json::from_str(entry.meta.as_ref().unwrap()).unwrap();
282 assert_eq!(meta.content_type, "json");
283 assert_eq!(meta.version, 1);
284 }
285
286 #[test]
287 fn test_cdc_entry_delete() {
288 let entry = CdcEntry::delete("test.key".to_string());
289
290 assert_eq!(entry.op, CdcOp::Delete);
291 assert_eq!(entry.key, "test.key");
292 assert!(entry.hash.is_none());
293 assert!(entry.data.is_none());
294 assert!(entry.meta.is_none());
295 }
296
297 #[test]
298 fn test_cdc_stream_key() {
299 assert_eq!(cdc_stream_key(None), "__local__:cdc");
300 assert_eq!(cdc_stream_key(Some("myapp:")), "myapp:__local__:cdc");
301 assert_eq!(cdc_stream_key(Some("")), "__local__:cdc");
302 }
303
304 #[test]
305 fn test_to_redis_fields_put() {
306 let entry = CdcEntry::put(
307 "test.key".to_string(),
308 "hash123".to_string(),
309 b"data",
310 "binary",
311 2,
312 1000,
313 Some("00-trace-span-01".to_string()),
314 );
315
316 let fields = entry.to_redis_fields();
317 assert_eq!(fields.len(), 5); assert_eq!(fields[0].0, "op");
320 assert_eq!(fields[1].0, "key");
321 assert_eq!(fields[2].0, "hash");
322 assert_eq!(fields[3].0, "data");
323 assert_eq!(fields[4].0, "meta");
324 }
325
326 #[test]
327 fn test_to_redis_fields_delete() {
328 let entry = CdcEntry::delete("test.key".to_string());
329
330 let fields = entry.to_redis_fields();
331 assert_eq!(fields.len(), 2); assert_eq!(fields[0].0, "op");
334 assert_eq!(fields[1].0, "key");
335 }
336}