1use crate::error::{ReplicationError, Result};
27use futures::future::join_all;
28use redis::aio::ConnectionManager;
29use redis::streams::{StreamReadOptions, StreamReadReply};
30use redis::AsyncCommands;
31use serde::{Deserialize, Serialize};
32use std::collections::HashMap;
33use std::io::Read;
34use std::time::Duration;
35use tracing::{trace, warn};
36
37const ZSTD_MAGIC: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD];
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum CdcOp {
43 Put,
44 Delete,
45}
46
47impl CdcOp {
48 fn from_str(s: &str) -> Option<Self> {
49 match s.to_uppercase().as_str() {
50 "PUT" => Some(CdcOp::Put),
51 "DEL" | "DELETE" => Some(CdcOp::Delete),
52 _ => None,
53 }
54 }
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct CdcMeta {
60 pub content_type: String,
61 pub version: u64,
62 pub updated_at: i64,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 pub trace_parent: Option<String>,
65}
66
67#[derive(Debug, Clone)]
69pub struct CdcEvent {
70 pub stream_id: String,
72 pub op: CdcOp,
74 pub key: String,
76 pub hash: Option<String>,
78 pub data: Option<Vec<u8>>,
80 pub meta: Option<CdcMeta>,
82}
83
84impl CdcEvent {
85 pub fn is_put(&self) -> bool {
87 self.op == CdcOp::Put
88 }
89
90 pub fn is_delete(&self) -> bool {
92 self.op == CdcOp::Delete
93 }
94
95 pub fn trace_parent(&self) -> Option<&str> {
99 self.meta.as_ref().and_then(|m| m.trace_parent.as_deref())
100 }
101
102 pub fn trace_id(&self) -> Option<&str> {
104 self.trace_parent().and_then(|tp| {
105 let parts: Vec<&str> = tp.split('-').collect();
106 if parts.len() >= 2 {
107 Some(parts[1])
108 } else {
109 None
110 }
111 })
112 }
113
114 pub fn parent_span_id(&self) -> Option<&str> {
116 self.trace_parent().and_then(|tp| {
117 let parts: Vec<&str> = tp.split('-').collect();
118 if parts.len() >= 3 {
119 Some(parts[2])
120 } else {
121 None
122 }
123 })
124 }
125}
126
127#[derive(Debug)]
129pub enum ReadResult {
130 Events(Vec<CdcEvent>),
132
133 StreamTrimmed {
142 cursor: String,
144 oldest_id: String,
146 },
147
148 Empty,
150}
151
152impl ReadResult {
153 pub fn events(self) -> Vec<CdcEvent> {
155 match self {
156 ReadResult::Events(events) => events,
157 _ => Vec::new(),
158 }
159 }
160
161 pub fn is_trimmed(&self) -> bool {
163 matches!(self, ReadResult::StreamTrimmed { .. })
164 }
165}
166
167pub struct StreamTailer {
169 peer_id: String,
171 stream_key: String,
173 block_timeout: Duration,
175 batch_size: usize,
177}
178
179impl StreamTailer {
180 pub fn new(
182 peer_id: String,
183 stream_key: String,
184 block_timeout: Duration,
185 batch_size: usize,
186 ) -> Self {
187 Self {
188 peer_id,
189 stream_key,
190 block_timeout,
191 batch_size,
192 }
193 }
194
195 pub fn batch_size(&self) -> usize {
197 self.batch_size
198 }
199
200 pub fn set_batch_size(&mut self, size: usize) {
202 self.batch_size = size;
203 }
204
205 pub async fn get_oldest_id(&self, conn: &mut ConnectionManager) -> Result<Option<String>> {
209 let result: Vec<(String, HashMap<String, redis::Value>)> = redis::cmd("XRANGE")
211 .arg(&self.stream_key)
212 .arg("-")
213 .arg("+")
214 .arg("COUNT")
215 .arg(1)
216 .query_async(conn)
217 .await
218 .map_err(|e| ReplicationError::redis("XRANGE", e))?;
219
220 Ok(result.first().map(|(id, _)| id.clone()))
221 }
222
223 pub async fn get_latest_id(&self, conn: &mut ConnectionManager) -> Result<Option<String>> {
227 let result: Vec<(String, HashMap<String, redis::Value>)> = redis::cmd("XREVRANGE")
229 .arg(&self.stream_key)
230 .arg("+")
231 .arg("-")
232 .arg("COUNT")
233 .arg(1)
234 .query_async(conn)
235 .await
236 .map_err(|e| ReplicationError::redis("XREVRANGE", e))?;
237
238 Ok(result.first().map(|(id, _)| id.clone()))
239 }
240
241 pub async fn get_stream_length(&self, conn: &mut ConnectionManager) -> Result<u64> {
245 let len: u64 = redis::cmd("XLEN")
246 .arg(&self.stream_key)
247 .query_async(conn)
248 .await
249 .map_err(|e| ReplicationError::redis("XLEN", e))?;
250 Ok(len)
251 }
252
253 pub async fn check_cursor_valid(
257 &self,
258 conn: &mut ConnectionManager,
259 cursor: &str,
260 ) -> Result<Option<String>> {
261 if cursor == "0" {
263 return Ok(None);
264 }
265
266 let oldest = self.get_oldest_id(conn).await?;
267
268 match oldest {
269 None => Ok(None), Some(oldest_id) => {
271 if compare_stream_ids(cursor, &oldest_id) == std::cmp::Ordering::Less {
272 Ok(Some(oldest_id))
274 } else {
275 Ok(None) }
277 }
278 }
279 }
280
281 pub async fn read_events_checked(
287 &self,
288 conn: &mut ConnectionManager,
289 cursor: &str,
290 ) -> Result<ReadResult> {
291 if let Some(oldest_id) = self.check_cursor_valid(conn, cursor).await? {
293 warn!(
294 peer_id = %self.peer_id,
295 cursor = %cursor,
296 oldest_id = %oldest_id,
297 "Stream was trimmed past our cursor - potential data gap!"
298 );
299 return Ok(ReadResult::StreamTrimmed {
300 cursor: cursor.to_string(),
301 oldest_id,
302 });
303 }
304
305 if cursor == "0" && self.get_oldest_id(conn).await?.is_none() {
307 return Ok(ReadResult::Empty);
308 }
309
310 let events = self.read_events(conn, cursor).await?;
312 Ok(ReadResult::Events(events))
313 }
314
315 pub async fn read_events(
323 &self,
324 conn: &mut ConnectionManager,
325 cursor: &str,
326 ) -> Result<Vec<CdcEvent>> {
327 let opts = StreamReadOptions::default()
328 .block(self.block_timeout.as_millis() as usize)
329 .count(self.batch_size);
330
331 let reply: StreamReadReply = conn
333 .xread_options(&[&self.stream_key], &[cursor], &opts)
334 .await
335 .map_err(|e| ReplicationError::redis("XREAD", e))?;
336
337 let mut events = Vec::new();
338
339 for stream_key in reply.keys {
340 for entry in stream_key.ids {
341 match self.parse_entry(&entry.id, &entry.map) {
342 Ok(event) => {
343 trace!(
344 peer_id = %self.peer_id,
345 stream_id = %event.stream_id,
346 op = ?event.op,
347 key = %event.key,
348 "Parsed CDC event"
349 );
350 events.push(event);
351 }
352 Err(e) => {
353 warn!(
354 peer_id = %self.peer_id,
355 stream_id = %entry.id,
356 error = %e,
357 "Failed to parse stream entry, skipping"
358 );
359 }
361 }
362 }
363 }
364
365 if !events.is_empty() {
366 trace!(
367 peer_id = %self.peer_id,
368 count = events.len(),
369 first_id = %events.first().map(|e| e.stream_id.as_str()).unwrap_or(""),
370 last_id = %events.last().map(|e| e.stream_id.as_str()).unwrap_or(""),
371 "Read CDC events"
372 );
373 }
374
375 Ok(events)
376 }
377
378 pub async fn read_events_range(
394 &self,
395 conn: &mut ConnectionManager,
396 start: &str,
397 count: usize,
398 ) -> Result<Vec<CdcEvent>> {
399 let exclusive_start = if start == "0" {
402 "-".to_string() } else {
404 format!("({}", start)
406 };
407
408 let result: Vec<(String, HashMap<String, redis::Value>)> = redis::cmd("XRANGE")
409 .arg(&self.stream_key)
410 .arg(&exclusive_start)
411 .arg("+")
412 .arg("COUNT")
413 .arg(count)
414 .query_async(conn)
415 .await
416 .map_err(|e| ReplicationError::redis("XRANGE", e))?;
417
418 let events = self.parse_entries_parallel(result).await;
420
421 if !events.is_empty() {
422 trace!(
423 peer_id = %self.peer_id,
424 count = events.len(),
425 first_id = %events.first().map(|e| e.stream_id.as_str()).unwrap_or(""),
426 last_id = %events.last().map(|e| e.stream_id.as_str()).unwrap_or(""),
427 "Read CDC events via XRANGE (catchup mode)"
428 );
429 }
430
431 Ok(events)
432 }
433
434 fn parse_entry(
436 &self,
437 stream_id: &str,
438 fields: &HashMap<String, redis::Value>,
439 ) -> Result<CdcEvent> {
440 let op_str = get_string_field(fields, "op")?;
442 let op = CdcOp::from_str(&op_str).ok_or_else(|| {
443 ReplicationError::StreamParse(format!("Unknown op type: {}", op_str))
444 })?;
445
446 let key = get_string_field(fields, "key")?;
448
449 if op == CdcOp::Delete {
451 return Ok(CdcEvent {
452 stream_id: stream_id.to_string(),
453 op,
454 key,
455 hash: None,
456 data: None,
457 meta: None,
458 });
459 }
460
461 let hash = get_string_field(fields, "hash").ok();
463
464 let raw_data = get_bytes_field(fields, "data")?;
466 let data = maybe_decompress(&raw_data)?;
467
468 if let Some(ref expected_hash) = hash {
470 let computed = compute_content_hash(&data);
471 if &computed != expected_hash {
472 return Err(ReplicationError::StreamParse(format!(
473 "Content hash mismatch for key '{}': expected {}, got {}",
474 key, expected_hash, computed
475 )));
476 }
477 }
478
479 let meta = if let Ok(meta_str) = get_string_field(fields, "meta") {
481 serde_json::from_str(&meta_str).ok()
482 } else {
483 None
484 };
485
486 Ok(CdcEvent {
487 stream_id: stream_id.to_string(),
488 op,
489 key,
490 hash,
491 data: Some(data),
492 meta,
493 })
494 }
495
496 async fn parse_entries_parallel(
504 &self,
505 entries: Vec<(String, HashMap<String, redis::Value>)>,
506 ) -> Vec<CdcEvent> {
507 if entries.is_empty() {
508 return Vec::new();
509 }
510
511 if entries.len() <= 2 {
513 return entries
514 .into_iter()
515 .filter_map(|(id, fields)| {
516 match self.parse_entry(&id, &fields) {
517 Ok(event) => Some(event),
518 Err(e) => {
519 warn!(
520 peer_id = %self.peer_id,
521 stream_id = %id,
522 error = %e,
523 "Failed to parse stream entry, skipping"
524 );
525 None
526 }
527 }
528 })
529 .collect();
530 }
531
532 let peer_id = self.peer_id.clone();
534 let futures: Vec<_> = entries
535 .into_iter()
536 .map(|(stream_id, fields)| {
537 tokio::task::spawn_blocking(move || {
538 let op_str = match get_string_field(&fields, "op") {
540 Ok(s) => s,
541 Err(e) => return Err((stream_id, e)),
542 };
543 let op = match CdcOp::from_str(&op_str) {
544 Some(op) => op,
545 None => {
546 return Err((
547 stream_id,
548 ReplicationError::StreamParse(format!("Unknown op: {}", op_str)),
549 ))
550 }
551 };
552
553 let key = match get_string_field(&fields, "key") {
554 Ok(k) => k,
555 Err(e) => return Err((stream_id, e)),
556 };
557
558 if op == CdcOp::Delete {
559 return Ok(CdcEvent {
560 stream_id,
561 op,
562 key,
563 hash: None,
564 data: None,
565 meta: None,
566 });
567 }
568
569 let hash = get_string_field(&fields, "hash").ok();
570 let raw_data = match get_bytes_field(&fields, "data") {
571 Ok(d) => d,
572 Err(e) => return Err((stream_id, e)),
573 };
574
575 let data = match maybe_decompress(&raw_data) {
577 Ok(d) => d,
578 Err(e) => return Err((stream_id, e)),
579 };
580
581 if let Some(ref expected) = hash {
583 let computed = compute_content_hash(&data);
584 if &computed != expected {
585 return Err((
586 stream_id.clone(),
587 ReplicationError::StreamParse(format!(
588 "Hash mismatch for key '{}': expected {}, got {}",
589 key, expected, computed
590 )),
591 ));
592 }
593 }
594
595 let meta = if let Ok(meta_str) = get_string_field(&fields, "meta") {
596 serde_json::from_str(&meta_str).ok()
597 } else {
598 None
599 };
600
601 Ok(CdcEvent {
602 stream_id,
603 op,
604 key,
605 hash,
606 data: Some(data),
607 meta,
608 })
609 })
610 })
611 .collect();
612
613 let results = join_all(futures).await;
615 let mut events = Vec::with_capacity(results.len());
616
617 for result in results {
618 match result {
619 Ok(Ok(event)) => {
620 trace!(
621 peer_id = %peer_id,
622 stream_id = %event.stream_id,
623 op = ?event.op,
624 key = %event.key,
625 "Parsed CDC event (parallel)"
626 );
627 events.push(event);
628 }
629 Ok(Err((stream_id, e))) => {
630 warn!(
631 peer_id = %peer_id,
632 stream_id = %stream_id,
633 error = %e,
634 "Failed to parse stream entry, skipping"
635 );
636 }
637 Err(e) => {
638 warn!(
639 peer_id = %peer_id,
640 error = %e,
641 "Spawn blocking task panicked"
642 );
643 }
644 }
645 }
646
647 events
648 }
649}
650
651fn get_string_field(fields: &HashMap<String, redis::Value>, name: &str) -> Result<String> {
653 let value = fields
654 .get(name)
655 .ok_or_else(|| ReplicationError::StreamParse(format!("Missing field: {}", name)))?;
656
657 match value {
658 redis::Value::BulkString(bytes) => String::from_utf8(bytes.clone())
659 .map_err(|e| ReplicationError::StreamParse(format!("Invalid UTF-8 in {}: {}", name, e))),
660 redis::Value::SimpleString(s) => Ok(s.clone()),
661 _ => Err(ReplicationError::StreamParse(format!(
662 "Unexpected type for field {}: {:?}",
663 name, value
664 ))),
665 }
666}
667
668fn get_bytes_field(fields: &HashMap<String, redis::Value>, name: &str) -> Result<Vec<u8>> {
670 let value = fields
671 .get(name)
672 .ok_or_else(|| ReplicationError::StreamParse(format!("Missing field: {}", name)))?;
673
674 match value {
675 redis::Value::BulkString(bytes) => Ok(bytes.clone()),
676 redis::Value::SimpleString(s) => Ok(s.as_bytes().to_vec()),
677 _ => Err(ReplicationError::StreamParse(format!(
678 "Unexpected type for field {}: {:?}",
679 name, value
680 ))),
681 }
682}
683
684pub fn maybe_decompress(data: &[u8]) -> Result<Vec<u8>> {
686 if data.len() >= 4 && data[..4] == ZSTD_MAGIC {
687 let mut decoder = zstd::Decoder::new(data)
688 .map_err(|e| ReplicationError::Decompression(format!("zstd init: {}", e)))?;
689 let mut decompressed = Vec::new();
690 decoder
691 .read_to_end(&mut decompressed)
692 .map_err(|e| ReplicationError::Decompression(format!("zstd decode: {}", e)))?;
693 Ok(decompressed)
694 } else {
695 Ok(data.to_vec())
696 }
697}
698
699fn compute_content_hash(data: &[u8]) -> String {
701 use sha2::{Digest, Sha256};
702 let hash = Sha256::digest(data);
703 hex::encode(hash)
704}
705
706pub fn compare_stream_ids(a: &str, b: &str) -> std::cmp::Ordering {
713 use std::cmp::Ordering;
714
715 let parse = |s: &str| -> (u64, u64) {
717 let parts: Vec<&str> = s.split('-').collect();
718 if parts.len() == 2 {
719 let ts = parts[0].parse().unwrap_or(0);
720 let seq = parts[1].parse().unwrap_or(0);
721 (ts, seq)
722 } else {
723 (s.parse().unwrap_or(0), 0)
725 }
726 };
727
728 let (a_ts, a_seq) = parse(a);
729 let (b_ts, b_seq) = parse(b);
730
731 match a_ts.cmp(&b_ts) {
732 Ordering::Equal => a_seq.cmp(&b_seq),
733 other => other,
734 }
735}
736
737pub fn parse_stream_id_timestamp(stream_id: &str) -> Option<u64> {
742 let parts: Vec<&str> = stream_id.split('-').collect();
743 if parts.len() == 2 {
744 parts[0].parse().ok()
745 } else {
746 None
747 }
748}
749
750pub fn calculate_lag_ms(cursor: &str, latest: &str) -> Option<u64> {
755 let cursor_ts = parse_stream_id_timestamp(cursor)?;
756 let latest_ts = parse_stream_id_timestamp(latest)?;
757 Some(latest_ts.saturating_sub(cursor_ts))
758}
759
760#[cfg(test)]
761mod tests {
762 use super::*;
763
764 #[test]
765 fn test_cdc_op_from_str() {
766 assert_eq!(CdcOp::from_str("PUT"), Some(CdcOp::Put));
767 assert_eq!(CdcOp::from_str("put"), Some(CdcOp::Put));
768 assert_eq!(CdcOp::from_str("DEL"), Some(CdcOp::Delete));
769 assert_eq!(CdcOp::from_str("DELETE"), Some(CdcOp::Delete));
770 assert_eq!(CdcOp::from_str("delete"), Some(CdcOp::Delete));
771 assert_eq!(CdcOp::from_str("UNKNOWN"), None);
772 assert_eq!(CdcOp::from_str(""), None);
773 }
774
775 #[test]
776 fn test_cdc_event_is_put() {
777 let event = CdcEvent {
778 stream_id: "1-0".to_string(),
779 op: CdcOp::Put,
780 key: "test".to_string(),
781 hash: None,
782 data: None,
783 meta: None,
784 };
785 assert!(event.is_put());
786 assert!(!event.is_delete());
787 }
788
789 #[test]
790 fn test_cdc_event_is_delete() {
791 let event = CdcEvent {
792 stream_id: "1-0".to_string(),
793 op: CdcOp::Delete,
794 key: "test".to_string(),
795 hash: None,
796 data: None,
797 meta: None,
798 };
799 assert!(event.is_delete());
800 assert!(!event.is_put());
801 }
802
803 #[test]
804 fn test_cdc_event_trace_parent() {
805 let event = CdcEvent {
807 stream_id: "1-0".to_string(),
808 op: CdcOp::Put,
809 key: "test".to_string(),
810 hash: None,
811 data: None,
812 meta: None,
813 };
814 assert_eq!(event.trace_parent(), None);
815 assert_eq!(event.trace_id(), None);
816 assert_eq!(event.parent_span_id(), None);
817
818 let event = CdcEvent {
820 stream_id: "1-0".to_string(),
821 op: CdcOp::Put,
822 key: "test".to_string(),
823 hash: None,
824 data: None,
825 meta: Some(CdcMeta {
826 content_type: "application/json".to_string(),
827 version: 1,
828 updated_at: 12345,
829 trace_parent: None,
830 }),
831 };
832 assert_eq!(event.trace_parent(), None);
833
834 let event = CdcEvent {
836 stream_id: "1-0".to_string(),
837 op: CdcOp::Put,
838 key: "test".to_string(),
839 hash: None,
840 data: None,
841 meta: Some(CdcMeta {
842 content_type: "application/json".to_string(),
843 version: 1,
844 updated_at: 12345,
845 trace_parent: Some("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".to_string()),
846 }),
847 };
848 assert_eq!(event.trace_parent(), Some("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"));
849 assert_eq!(event.trace_id(), Some("0af7651916cd43dd8448eb211c80319c"));
850 assert_eq!(event.parent_span_id(), Some("b7ad6b7169203331"));
851 }
852
853 #[test]
854 fn test_cdc_event_trace_parent_malformed() {
855 let event = CdcEvent {
857 stream_id: "1-0".to_string(),
858 op: CdcOp::Put,
859 key: "test".to_string(),
860 hash: None,
861 data: None,
862 meta: Some(CdcMeta {
863 content_type: "application/json".to_string(),
864 version: 1,
865 updated_at: 12345,
866 trace_parent: Some("invalid".to_string()),
867 }),
868 };
869 assert_eq!(event.trace_parent(), Some("invalid"));
870 assert_eq!(event.trace_id(), None);
871 assert_eq!(event.parent_span_id(), None);
872
873 let event = CdcEvent {
875 stream_id: "1-0".to_string(),
876 op: CdcOp::Put,
877 key: "test".to_string(),
878 hash: None,
879 data: None,
880 meta: Some(CdcMeta {
881 content_type: "application/json".to_string(),
882 version: 1,
883 updated_at: 12345,
884 trace_parent: Some("00-traceid".to_string()),
885 }),
886 };
887 assert_eq!(event.trace_id(), Some("traceid"));
888 assert_eq!(event.parent_span_id(), None);
889 }
890
891 #[test]
892 fn test_maybe_decompress_uncompressed() {
893 let data = b"hello world";
894 let result = maybe_decompress(data).unwrap();
895 assert_eq!(result, data);
896 }
897
898 #[test]
899 fn test_maybe_decompress_zstd() {
900 let original = b"hello world hello world hello world";
901 let compressed = zstd::encode_all(&original[..], 3).unwrap();
902
903 assert_eq!(&compressed[..4], &ZSTD_MAGIC);
905
906 let result = maybe_decompress(&compressed).unwrap();
907 assert_eq!(result, original);
908 }
909
910 #[test]
911 fn test_maybe_decompress_short_data() {
912 let data = b"ab";
914 let result = maybe_decompress(data).unwrap();
915 assert_eq!(result, data);
916
917 let data = b"";
918 let result = maybe_decompress(data).unwrap();
919 assert_eq!(result, data);
920 }
921
922 #[test]
923 fn test_maybe_decompress_fake_magic() {
924 let mut data = ZSTD_MAGIC.to_vec();
926 data.extend_from_slice(b"not valid zstd data");
927
928 let result = maybe_decompress(&data);
929 assert!(result.is_err());
930 }
931
932 #[test]
933 fn test_compare_stream_ids() {
934 use std::cmp::Ordering;
935
936 assert_eq!(
938 compare_stream_ids("1234567890123-0", "1234567890123-1"),
939 Ordering::Less
940 );
941 assert_eq!(
942 compare_stream_ids("1234567890123-1", "1234567890123-0"),
943 Ordering::Greater
944 );
945 assert_eq!(
946 compare_stream_ids("1234567890123-0", "1234567890123-0"),
947 Ordering::Equal
948 );
949
950 assert_eq!(
952 compare_stream_ids("1234567890000-0", "1234567890123-0"),
953 Ordering::Less
954 );
955 assert_eq!(
956 compare_stream_ids("1234567890123-0", "1234567890000-0"),
957 Ordering::Greater
958 );
959
960 assert_eq!(
962 compare_stream_ids("0", "1234567890123-0"),
963 Ordering::Less
964 );
965
966 assert_eq!(compare_stream_ids("0", "0"), Ordering::Equal);
968 }
969
970 #[test]
971 fn test_compare_stream_ids_high_sequence() {
972 use std::cmp::Ordering;
973
974 assert_eq!(
976 compare_stream_ids("1000-999999", "1000-1000000"),
977 Ordering::Less
978 );
979 }
980
981 #[test]
982 fn test_read_result_methods() {
983 let events = vec![CdcEvent {
985 stream_id: "1-0".to_string(),
986 op: CdcOp::Put,
987 key: "test".to_string(),
988 hash: None,
989 data: None,
990 meta: None,
991 }];
992 let result = ReadResult::Events(events);
993 assert!(!result.is_trimmed());
994 assert_eq!(result.events().len(), 1);
995
996 let result = ReadResult::StreamTrimmed {
998 cursor: "1-0".to_string(),
999 oldest_id: "100-0".to_string(),
1000 };
1001 assert!(result.is_trimmed());
1002 assert_eq!(result.events().len(), 0);
1003
1004 let result = ReadResult::Empty;
1006 assert!(!result.is_trimmed());
1007 assert_eq!(result.events().len(), 0);
1008 }
1009
1010 #[test]
1011 fn test_read_result_events_empty() {
1012 let result = ReadResult::Events(vec![]);
1013 assert!(!result.is_trimmed());
1014 assert_eq!(result.events().len(), 0);
1015 }
1016
1017 #[test]
1018 fn test_compute_content_hash() {
1019 let data = b"hello world";
1020 let hash = compute_content_hash(data);
1021 assert_eq!(
1023 hash,
1024 "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
1025 );
1026 }
1027
1028 #[test]
1029 fn test_compute_content_hash_empty() {
1030 let data = b"";
1031 let hash = compute_content_hash(data);
1032 assert_eq!(
1034 hash,
1035 "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
1036 );
1037 }
1038
1039 #[test]
1040 fn test_content_hash_validation_matches() {
1041 use sha2::{Digest, Sha256};
1042
1043 let data = b"test data for hashing";
1044 let expected_hash = hex::encode(Sha256::digest(data));
1045 let computed = compute_content_hash(data);
1046 assert_eq!(computed, expected_hash);
1047 }
1048
1049 #[test]
1050 fn test_parse_stream_id_timestamp() {
1051 assert_eq!(
1053 parse_stream_id_timestamp("1234567890123-0"),
1054 Some(1234567890123)
1055 );
1056 assert_eq!(
1057 parse_stream_id_timestamp("1234567890123-99"),
1058 Some(1234567890123)
1059 );
1060
1061 assert_eq!(parse_stream_id_timestamp("0"), None);
1063
1064 assert_eq!(parse_stream_id_timestamp("invalid"), None);
1066 assert_eq!(parse_stream_id_timestamp(""), None);
1067 assert_eq!(parse_stream_id_timestamp("abc-def"), None);
1068 }
1069
1070 #[test]
1071 fn test_calculate_lag_ms() {
1072 assert_eq!(
1074 calculate_lag_ms("1000000000000-0", "1000000005000-0"),
1075 Some(5000)
1076 );
1077
1078 assert_eq!(
1080 calculate_lag_ms("1000000000000-0", "1000000000000-5"),
1081 Some(0)
1082 );
1083
1084 assert_eq!(
1086 calculate_lag_ms("1000000005000-0", "1000000000000-0"),
1087 Some(0) );
1089
1090 assert_eq!(calculate_lag_ms("0", "1000000000000-0"), None);
1092
1093 assert_eq!(calculate_lag_ms("1000000000000-0", "invalid"), None);
1095
1096 assert_eq!(calculate_lag_ms("abc", "def"), None);
1098 }
1099
1100 #[test]
1101 fn test_get_string_field() {
1102 let mut fields = HashMap::new();
1103 fields.insert("key1".to_string(), redis::Value::BulkString(b"value1".to_vec()));
1104 fields.insert("key2".to_string(), redis::Value::SimpleString("value2".to_string()));
1105 fields.insert("key3".to_string(), redis::Value::Int(42));
1106
1107 assert_eq!(get_string_field(&fields, "key1").unwrap(), "value1");
1109
1110 assert_eq!(get_string_field(&fields, "key2").unwrap(), "value2");
1112
1113 assert!(get_string_field(&fields, "missing").is_err());
1115
1116 assert!(get_string_field(&fields, "key3").is_err());
1118 }
1119
1120 #[test]
1121 fn test_get_string_field_invalid_utf8() {
1122 let mut fields = HashMap::new();
1123 fields.insert("bad".to_string(), redis::Value::BulkString(vec![0xFF, 0xFE]));
1125
1126 let result = get_string_field(&fields, "bad");
1127 assert!(result.is_err());
1128 }
1129
1130 #[test]
1131 fn test_get_bytes_field() {
1132 let mut fields = HashMap::new();
1133 fields.insert("data".to_string(), redis::Value::BulkString(vec![1, 2, 3, 4]));
1134 fields.insert("text".to_string(), redis::Value::SimpleString("hello".to_string()));
1135 fields.insert("num".to_string(), redis::Value::Int(42));
1136
1137 assert_eq!(get_bytes_field(&fields, "data").unwrap(), vec![1, 2, 3, 4]);
1139
1140 assert_eq!(get_bytes_field(&fields, "text").unwrap(), b"hello".to_vec());
1142
1143 assert!(get_bytes_field(&fields, "missing").is_err());
1145
1146 assert!(get_bytes_field(&fields, "num").is_err());
1148 }
1149
1150 #[test]
1151 fn test_stream_tailer_new() {
1152 let tailer = StreamTailer::new(
1153 "peer-1".to_string(),
1154 "cdc".to_string(),
1155 Duration::from_secs(5),
1156 100,
1157 );
1158 assert_eq!(tailer.batch_size(), 100);
1159 }
1160
1161 #[test]
1162 fn test_stream_tailer_set_batch_size() {
1163 let mut tailer = StreamTailer::new(
1164 "peer-1".to_string(),
1165 "cdc".to_string(),
1166 Duration::from_secs(5),
1167 100,
1168 );
1169 assert_eq!(tailer.batch_size(), 100);
1170
1171 tailer.set_batch_size(500);
1172 assert_eq!(tailer.batch_size(), 500);
1173
1174 tailer.set_batch_size(0);
1175 assert_eq!(tailer.batch_size(), 0);
1176 }
1177
1178 #[test]
1179 fn test_cdc_meta_serialization() {
1180 let meta = CdcMeta {
1181 content_type: "application/json".to_string(),
1182 version: 42,
1183 updated_at: 1234567890123,
1184 trace_parent: Some("00-traceid-spanid-01".to_string()),
1185 };
1186
1187 let json = serde_json::to_string(&meta).unwrap();
1188 assert!(json.contains("application/json"));
1189 assert!(json.contains("42"));
1190 assert!(json.contains("1234567890123"));
1191 assert!(json.contains("traceid"));
1192
1193 let parsed: CdcMeta = serde_json::from_str(&json).unwrap();
1194 assert_eq!(parsed.content_type, "application/json");
1195 assert_eq!(parsed.version, 42);
1196 assert_eq!(parsed.updated_at, 1234567890123);
1197 assert_eq!(parsed.trace_parent, Some("00-traceid-spanid-01".to_string()));
1198 }
1199
1200 #[test]
1201 fn test_cdc_meta_serialization_no_trace() {
1202 let meta = CdcMeta {
1203 content_type: "text/plain".to_string(),
1204 version: 1,
1205 updated_at: 0,
1206 trace_parent: None,
1207 };
1208
1209 let json = serde_json::to_string(&meta).unwrap();
1210 assert!(!json.contains("trace_parent"));
1212 }
1213}