1use serde::{Serialize, Deserialize};
35use std::io::Read;
36
37const ZSTD_MAGIC: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD];
39
40pub const CDC_STREAM_SUFFIX: &str = "cdc";
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum CdcOp {
48 Put,
49 Delete,
50}
51
52impl CdcOp {
53 pub fn as_str(&self) -> &'static str {
55 match self {
56 CdcOp::Put => "PUT",
57 CdcOp::Delete => "DEL",
58 }
59 }
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct CdcMeta {
65 pub content_type: String,
66 pub version: u64,
67 pub updated_at: i64,
68 #[serde(skip_serializing_if = "Option::is_none")]
69 pub trace_parent: Option<String>,
70}
71
72#[derive(Debug, Clone)]
74pub struct CdcEntry {
75 pub op: CdcOp,
77 pub key: String,
79 pub hash: Option<String>,
81 pub data: Option<Vec<u8>>,
83 pub meta: Option<String>,
85}
86
87impl CdcEntry {
88 pub fn put(
90 key: String,
91 hash: String,
92 content: &[u8],
93 content_type: &str,
94 version: u64,
95 updated_at: i64,
96 trace_parent: Option<String>,
97 ) -> Self {
98 let data = maybe_compress(content);
99 let meta = CdcMeta {
100 content_type: content_type.to_string(),
101 version,
102 updated_at,
103 trace_parent,
104 };
105 let meta_json = serde_json::to_string(&meta).unwrap_or_else(|_| "{}".to_string());
106
107 Self {
108 op: CdcOp::Put,
109 key,
110 hash: Some(hash),
111 data: Some(data),
112 meta: Some(meta_json),
113 }
114 }
115
116 pub fn delete(key: String) -> Self {
118 Self {
119 op: CdcOp::Delete,
120 key,
121 hash: None,
122 data: None,
123 meta: None,
124 }
125 }
126
127 pub fn to_redis_fields(&self) -> Vec<(&'static str, CdcFieldValue)> {
130 let mut fields = vec![
131 ("op", CdcFieldValue::Str(self.op.as_str())),
132 ("key", CdcFieldValue::String(self.key.clone())),
133 ];
134
135 if let Some(ref hash) = self.hash {
136 fields.push(("hash", CdcFieldValue::String(hash.clone())));
137 }
138 if let Some(ref data) = self.data {
139 fields.push(("data", CdcFieldValue::Bytes(data.clone())));
140 }
141 if let Some(ref meta) = self.meta {
142 fields.push(("meta", CdcFieldValue::String(meta.clone())));
143 }
144
145 fields
146 }
147}
148
149#[derive(Debug, Clone)]
151pub enum CdcFieldValue {
152 Str(&'static str),
153 String(String),
154 Bytes(Vec<u8>),
155}
156
157impl CdcFieldValue {
158 pub fn as_bytes(&self) -> &[u8] {
160 match self {
161 CdcFieldValue::Str(s) => s.as_bytes(),
162 CdcFieldValue::String(s) => s.as_bytes(),
163 CdcFieldValue::Bytes(b) => b,
164 }
165 }
166}
167
168pub fn maybe_compress(data: &[u8]) -> Vec<u8> {
173 if data.len() >= 4 && data[..4] == ZSTD_MAGIC {
175 return data.to_vec();
176 }
177
178 if data.len() < 64 {
180 return data.to_vec();
181 }
182
183 match zstd::encode_all(data, 3) {
185 Ok(compressed) => {
186 if compressed.len() < data.len() {
188 compressed
189 } else {
190 data.to_vec()
191 }
192 }
193 Err(_) => data.to_vec(),
194 }
195}
196
197pub fn maybe_decompress(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
201 if data.len() >= 4 && data[..4] == ZSTD_MAGIC {
202 let mut decoder = zstd::Decoder::new(data)?;
203 let mut decompressed = Vec::new();
204 decoder.read_to_end(&mut decompressed)?;
205 Ok(decompressed)
206 } else {
207 Ok(data.to_vec())
208 }
209}
210
211pub fn is_zstd_compressed(data: &[u8]) -> bool {
213 data.len() >= 4 && data[..4] == ZSTD_MAGIC
214}
215
216pub fn cdc_stream_key(prefix: Option<&str>) -> String {
218 match prefix {
219 Some(p) => format!("{}{}", p, CDC_STREAM_SUFFIX),
220 None => CDC_STREAM_SUFFIX.to_string(),
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227
228 #[test]
229 fn test_maybe_compress_small_data() {
230 let small = b"hello";
231 let result = maybe_compress(small);
232 assert_eq!(result, small.to_vec());
234 }
235
236 #[test]
237 fn test_maybe_compress_already_zstd() {
238 let mut fake_zstd = ZSTD_MAGIC.to_vec();
240 fake_zstd.extend_from_slice(b"already compressed data");
241
242 let result = maybe_compress(&fake_zstd);
243 assert_eq!(result, fake_zstd);
245 }
246
247 #[test]
248 fn test_maybe_compress_json() {
249 let json = r#"{"name":"John Doe","email":"john@example.com","data":"some repeated data repeated data repeated data repeated data"}"#;
251 let result = maybe_compress(json.as_bytes());
252
253 assert!(result.len() < json.len());
255 assert!(is_zstd_compressed(&result));
257 }
258
259 #[test]
260 fn test_roundtrip_compression() {
261 let original = r#"{"key":"value","nested":{"a":1,"b":2},"array":[1,2,3,4,5]}"#.repeat(10);
262 let compressed = maybe_compress(original.as_bytes());
263 let decompressed = maybe_decompress(&compressed).unwrap();
264
265 assert_eq!(decompressed, original.as_bytes());
266 }
267
268 #[test]
269 fn test_cdc_entry_put() {
270 let entry = CdcEntry::put(
271 "test.key".to_string(),
272 "abc123".to_string(),
273 b"test content that is long enough to compress well when repeated",
274 "json",
275 1,
276 1735776000000,
277 None,
278 );
279
280 assert_eq!(entry.op, CdcOp::Put);
281 assert_eq!(entry.key, "test.key");
282 assert_eq!(entry.hash, Some("abc123".to_string()));
283 assert!(entry.data.is_some());
284 assert!(entry.meta.is_some());
285
286 let meta: CdcMeta = serde_json::from_str(entry.meta.as_ref().unwrap()).unwrap();
287 assert_eq!(meta.content_type, "json");
288 assert_eq!(meta.version, 1);
289 }
290
291 #[test]
292 fn test_cdc_entry_delete() {
293 let entry = CdcEntry::delete("test.key".to_string());
294
295 assert_eq!(entry.op, CdcOp::Delete);
296 assert_eq!(entry.key, "test.key");
297 assert!(entry.hash.is_none());
298 assert!(entry.data.is_none());
299 assert!(entry.meta.is_none());
300 }
301
302 #[test]
303 fn test_cdc_stream_key() {
304 assert_eq!(cdc_stream_key(None), "cdc");
306 assert_eq!(cdc_stream_key(Some("myapp:")), "myapp:cdc");
308 assert_eq!(cdc_stream_key(Some("")), "cdc");
310 }
311
312 #[test]
313 fn test_to_redis_fields_put() {
314 let entry = CdcEntry::put(
315 "test.key".to_string(),
316 "hash123".to_string(),
317 b"data",
318 "binary",
319 2,
320 1000,
321 Some("00-trace-span-01".to_string()),
322 );
323
324 let fields = entry.to_redis_fields();
325 assert_eq!(fields.len(), 5); assert_eq!(fields[0].0, "op");
328 assert_eq!(fields[1].0, "key");
329 assert_eq!(fields[2].0, "hash");
330 assert_eq!(fields[3].0, "data");
331 assert_eq!(fields[4].0, "meta");
332 }
333
334 #[test]
335 fn test_to_redis_fields_delete() {
336 let entry = CdcEntry::delete("test.key".to_string());
337
338 let fields = entry.to_redis_fields();
339 assert_eq!(fields.len(), 2); assert_eq!(fields[0].0, "op");
342 assert_eq!(fields[1].0, "key");
343 }
344}