replication_engine/
stream.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Redis Stream consumer for CDC events.
5//!
6//! Tails a peer's `cdc` stream and parses events.
7//!
8//! # Stream Trimming
9//!
10//! Redis streams can be trimmed via `MAXLEN` or `MINID` to bound memory.
11//! If our cursor points to an entry older than the oldest in the stream,
12//! we've "fallen behind" and missed events. This module detects this and
13//! returns a `StreamTrimmed` result so callers can:
14//! - Log a warning (potential data gap)
15//! - Reset cursor to oldest available entry
16//! - Emit a metric for alerting
17//!
18//! # Content Hash Validation
19//!
20//! When a PUT event includes a `hash` field, we verify that the SHA256
21//! of the decompressed data matches. This detects corruption from:
22//! - Network bit flips
23//! - Compression bugs
24//! - Malicious peers
25
26use crate::error::{ReplicationError, Result};
27use futures::future::join_all;
28use redis::aio::ConnectionManager;
29use redis::streams::{StreamReadOptions, StreamReadReply};
30use redis::AsyncCommands;
31use serde::{Deserialize, Serialize};
32use std::collections::HashMap;
33use std::io::Read;
34use std::time::Duration;
35use tracing::{trace, warn};
36
37/// zstd magic bytes for decompression detection
38const ZSTD_MAGIC: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD];
39
40/// CDC operation type.
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum CdcOp {
43    Put,
44    Delete,
45}
46
47impl CdcOp {
48    fn from_str(s: &str) -> Option<Self> {
49        match s.to_uppercase().as_str() {
50            "PUT" => Some(CdcOp::Put),
51            "DEL" | "DELETE" => Some(CdcOp::Delete),
52            _ => None,
53        }
54    }
55}
56
57/// Metadata from a CDC PUT event.
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct CdcMeta {
60    pub content_type: String,
61    pub version: u64,
62    pub updated_at: i64,
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub trace_parent: Option<String>,
65}
66
67/// A parsed CDC event from the stream.
68#[derive(Debug, Clone)]
69pub struct CdcEvent {
70    /// Stream entry ID (e.g., "1234567890123-0")
71    pub stream_id: String,
72    /// Operation type
73    pub op: CdcOp,
74    /// Object ID (key)
75    pub key: String,
76    /// Content hash for deduplication (only for PUT)
77    pub hash: Option<String>,
78    /// Decompressed data payload (only for PUT)
79    pub data: Option<Vec<u8>>,
80    /// Parsed metadata (only for PUT)
81    pub meta: Option<CdcMeta>,
82}
83
84impl CdcEvent {
85    /// Check if this is a PUT operation.
86    pub fn is_put(&self) -> bool {
87        self.op == CdcOp::Put
88    }
89
90    /// Check if this is a DELETE operation.
91    pub fn is_delete(&self) -> bool {
92        self.op == CdcOp::Delete
93    }
94
95    /// Get the trace parent from metadata, if present.
96    /// 
97    /// Format: W3C Trace Context (e.g., "00-traceid-spanid-flags")
98    pub fn trace_parent(&self) -> Option<&str> {
99        self.meta.as_ref().and_then(|m| m.trace_parent.as_deref())
100    }
101
102    /// Extract trace ID from trace_parent (first 32 hex chars after version).
103    pub fn trace_id(&self) -> Option<&str> {
104        self.trace_parent().and_then(|tp| {
105            let parts: Vec<&str> = tp.split('-').collect();
106            if parts.len() >= 2 {
107                Some(parts[1])
108            } else {
109                None
110            }
111        })
112    }
113
114    /// Extract span ID from trace_parent (16 hex chars after trace ID).
115    pub fn parent_span_id(&self) -> Option<&str> {
116        self.trace_parent().and_then(|tp| {
117            let parts: Vec<&str> = tp.split('-').collect();
118            if parts.len() >= 3 {
119                Some(parts[2])
120            } else {
121                None
122            }
123        })
124    }
125}
126
127/// Result of reading from a stream, accounting for trim scenarios.
128#[derive(Debug)]
129pub enum ReadResult {
130    /// Successfully read events (may be empty if no new events).
131    Events(Vec<CdcEvent>),
132
133    /// Stream was trimmed past our cursor.
134    ///
135    /// This means we've missed events and should:
136    /// 1. Log a warning (potential data gap)
137    /// 2. Reset cursor to `oldest_id`
138    /// 3. Emit a metric for alerting
139    ///
140    /// The cold path (Merkle repair) will eventually fix any gaps.
141    StreamTrimmed {
142        /// Our cursor that's now invalid
143        cursor: String,
144        /// The oldest available entry in the stream
145        oldest_id: String,
146    },
147
148    /// Stream is empty (no entries at all).
149    Empty,
150}
151
152impl ReadResult {
153    /// Get events if this is a successful read, empty vec otherwise.
154    pub fn events(self) -> Vec<CdcEvent> {
155        match self {
156            ReadResult::Events(events) => events,
157            _ => Vec::new(),
158        }
159    }
160
161    /// Check if the stream was trimmed.
162    pub fn is_trimmed(&self) -> bool {
163        matches!(self, ReadResult::StreamTrimmed { .. })
164    }
165}
166
167/// Stream tailer for a single peer's CDC stream.
168pub struct StreamTailer {
169    /// Peer node ID (for logging)
170    peer_id: String,
171    /// Stream key to tail
172    stream_key: String,
173    /// Block timeout for XREAD
174    block_timeout: Duration,
175    /// Max entries per read
176    batch_size: usize,
177}
178
179impl StreamTailer {
180    /// Create a new stream tailer.
181    pub fn new(
182        peer_id: String,
183        stream_key: String,
184        block_timeout: Duration,
185        batch_size: usize,
186    ) -> Self {
187        Self {
188            peer_id,
189            stream_key,
190            block_timeout,
191            batch_size,
192        }
193    }
194
195    /// Get the current batch size.
196    pub fn batch_size(&self) -> usize {
197        self.batch_size
198    }
199
200    /// Update the batch size (for adaptive sizing).
201    pub fn set_batch_size(&mut self, size: usize) {
202        self.batch_size = size;
203    }
204
205    /// Get the oldest entry ID in the stream, if any.
206    ///
207    /// Returns `None` if the stream is empty or doesn't exist.
208    pub async fn get_oldest_id(&self, conn: &mut ConnectionManager) -> Result<Option<String>> {
209        // XRANGE key - + COUNT 1 returns the oldest entry
210        let result: Vec<(String, HashMap<String, redis::Value>)> = redis::cmd("XRANGE")
211            .arg(&self.stream_key)
212            .arg("-")
213            .arg("+")
214            .arg("COUNT")
215            .arg(1)
216            .query_async(conn)
217            .await
218            .map_err(|e| ReplicationError::redis("XRANGE", e))?;
219
220        Ok(result.first().map(|(id, _)| id.clone()))
221    }
222
223    /// Get the latest (newest) entry ID in the stream, if any.
224    ///
225    /// Returns `None` if the stream is empty or doesn't exist.
226    pub async fn get_latest_id(&self, conn: &mut ConnectionManager) -> Result<Option<String>> {
227        // XREVRANGE key + - COUNT 1 returns the newest entry
228        let result: Vec<(String, HashMap<String, redis::Value>)> = redis::cmd("XREVRANGE")
229            .arg(&self.stream_key)
230            .arg("+")
231            .arg("-")
232            .arg("COUNT")
233            .arg(1)
234            .query_async(conn)
235            .await
236            .map_err(|e| ReplicationError::redis("XREVRANGE", e))?;
237
238        Ok(result.first().map(|(id, _)| id.clone()))
239    }
240
241    /// Get the total number of entries in the stream.
242    ///
243    /// Returns 0 if the stream doesn't exist.
244    pub async fn get_stream_length(&self, conn: &mut ConnectionManager) -> Result<u64> {
245        let len: u64 = redis::cmd("XLEN")
246            .arg(&self.stream_key)
247            .query_async(conn)
248            .await
249            .map_err(|e| ReplicationError::redis("XLEN", e))?;
250        Ok(len)
251    }
252
253    /// Check if a cursor is still valid (not older than the oldest stream entry).
254    ///
255    /// Returns the oldest ID if the cursor is invalid, `None` if cursor is valid.
256    pub async fn check_cursor_valid(
257        &self,
258        conn: &mut ConnectionManager,
259        cursor: &str,
260    ) -> Result<Option<String>> {
261        // "0" is always valid (start from beginning)
262        if cursor == "0" {
263            return Ok(None);
264        }
265
266        let oldest = self.get_oldest_id(conn).await?;
267
268        match oldest {
269            None => Ok(None), // Empty stream, cursor is valid
270            Some(oldest_id) => {
271                if compare_stream_ids(cursor, &oldest_id) == std::cmp::Ordering::Less {
272                    // Cursor is older than oldest entry - stream was trimmed
273                    Ok(Some(oldest_id))
274                } else {
275                    Ok(None) // Cursor is valid
276                }
277            }
278        }
279    }
280
281    /// Read events from the stream with trim detection.
282    ///
283    /// This is the preferred method for production use. It detects when the
284    /// stream has been trimmed past our cursor and returns `ReadResult::StreamTrimmed`
285    /// so callers can handle the gap appropriately.
286    pub async fn read_events_checked(
287        &self,
288        conn: &mut ConnectionManager,
289        cursor: &str,
290    ) -> Result<ReadResult> {
291        // First check if cursor is still valid
292        if let Some(oldest_id) = self.check_cursor_valid(conn, cursor).await? {
293            warn!(
294                peer_id = %self.peer_id,
295                cursor = %cursor,
296                oldest_id = %oldest_id,
297                "Stream was trimmed past our cursor - potential data gap!"
298            );
299            return Ok(ReadResult::StreamTrimmed {
300                cursor: cursor.to_string(),
301                oldest_id,
302            });
303        }
304
305        // Check if stream is empty
306        if cursor == "0" && self.get_oldest_id(conn).await?.is_none() {
307            return Ok(ReadResult::Empty);
308        }
309
310        // Normal read
311        let events = self.read_events(conn, cursor).await?;
312        Ok(ReadResult::Events(events))
313    }
314
315    /// Read events from the stream starting after `cursor`.
316    ///
317    /// Returns a vector of parsed events. Empty vector means no new events
318    /// (timeout or stream empty).
319    ///
320    /// The cursor should be the last successfully processed stream ID,
321    /// or "0" to start from the beginning.
322    pub async fn read_events(
323        &self,
324        conn: &mut ConnectionManager,
325        cursor: &str,
326    ) -> Result<Vec<CdcEvent>> {
327        let opts = StreamReadOptions::default()
328            .block(self.block_timeout.as_millis() as usize)
329            .count(self.batch_size);
330
331        // XREAD BLOCK timeout COUNT batch STREAMS key cursor
332        let reply: StreamReadReply = conn
333            .xread_options(&[&self.stream_key], &[cursor], &opts)
334            .await
335            .map_err(|e| ReplicationError::redis("XREAD", e))?;
336
337        let mut events = Vec::new();
338
339        for stream_key in reply.keys {
340            for entry in stream_key.ids {
341                match self.parse_entry(&entry.id, &entry.map) {
342                    Ok(event) => {
343                        trace!(
344                            peer_id = %self.peer_id,
345                            stream_id = %event.stream_id,
346                            op = ?event.op,
347                            key = %event.key,
348                            "Parsed CDC event"
349                        );
350                        events.push(event);
351                    }
352                    Err(e) => {
353                        warn!(
354                            peer_id = %self.peer_id,
355                            stream_id = %entry.id,
356                            error = %e,
357                            "Failed to parse stream entry, skipping"
358                        );
359                        // Continue processing other entries
360                    }
361                }
362            }
363        }
364
365        if !events.is_empty() {
366            trace!(
367                peer_id = %self.peer_id,
368                count = events.len(),
369                first_id = %events.first().map(|e| e.stream_id.as_str()).unwrap_or(""),
370                last_id = %events.last().map(|e| e.stream_id.as_str()).unwrap_or(""),
371                "Read CDC events"
372            );
373        }
374
375        Ok(events)
376    }
377
378    /// Read a range of events using XRANGE (non-blocking).
379    ///
380    /// This is faster than XREAD for catchup scenarios because:
381    /// 1. No blocking timeout overhead
382    /// 2. Simpler command (no consumer group semantics)
383    /// 3. Better for bulk reads when we know we're behind
384    ///
385    /// Use this when catching up, then switch to `read_events` (XREAD) when tailing.
386    ///
387    /// # Arguments
388    /// * `start` - Exclusive start ID (use "0" to start from beginning, or last processed ID)
389    /// * `count` - Maximum number of entries to fetch
390    ///
391    /// # Returns
392    /// Vector of parsed events. Empty means we've caught up.
393    pub async fn read_events_range(
394        &self,
395        conn: &mut ConnectionManager,
396        start: &str,
397        count: usize,
398    ) -> Result<Vec<CdcEvent>> {
399        // XRANGE uses inclusive start, but we want exclusive (after cursor).
400        // Use "(" prefix for exclusive range in Redis 6.2+, or increment the sequence.
401        let exclusive_start = if start == "0" {
402            "-".to_string() // Start from very beginning
403        } else {
404            // Use exclusive range syntax: (id means "greater than id"
405            format!("({}", start)
406        };
407
408        let result: Vec<(String, HashMap<String, redis::Value>)> = redis::cmd("XRANGE")
409            .arg(&self.stream_key)
410            .arg(&exclusive_start)
411            .arg("+")
412            .arg("COUNT")
413            .arg(count)
414            .query_async(conn)
415            .await
416            .map_err(|e| ReplicationError::redis("XRANGE", e))?;
417
418        // Use parallel decompression for catchup batches (typically larger)
419        let events = self.parse_entries_parallel(result).await;
420
421        if !events.is_empty() {
422            trace!(
423                peer_id = %self.peer_id,
424                count = events.len(),
425                first_id = %events.first().map(|e| e.stream_id.as_str()).unwrap_or(""),
426                last_id = %events.last().map(|e| e.stream_id.as_str()).unwrap_or(""),
427                "Read CDC events via XRANGE (catchup mode)"
428            );
429        }
430
431        Ok(events)
432    }
433
434    /// Parse a stream entry into a CdcEvent.
435    fn parse_entry(
436        &self,
437        stream_id: &str,
438        fields: &HashMap<String, redis::Value>,
439    ) -> Result<CdcEvent> {
440        // Extract "op" field
441        let op_str = get_string_field(fields, "op")?;
442        let op = CdcOp::from_str(&op_str).ok_or_else(|| {
443            ReplicationError::StreamParse(format!("Unknown op type: {}", op_str))
444        })?;
445
446        // Extract "key" field
447        let key = get_string_field(fields, "key")?;
448
449        // For DELETE, we're done
450        if op == CdcOp::Delete {
451            return Ok(CdcEvent {
452                stream_id: stream_id.to_string(),
453                op,
454                key,
455                hash: None,
456                data: None,
457                meta: None,
458            });
459        }
460
461        // For PUT, extract additional fields
462        let hash = get_string_field(fields, "hash").ok();
463
464        // Data is binary, may be compressed
465        let raw_data = get_bytes_field(fields, "data")?;
466        let data = maybe_decompress(&raw_data)?;
467
468        // Validate content hash if present (detect corruption)
469        if let Some(ref expected_hash) = hash {
470            let computed = compute_content_hash(&data);
471            if &computed != expected_hash {
472                return Err(ReplicationError::StreamParse(format!(
473                    "Content hash mismatch for key '{}': expected {}, got {}",
474                    key, expected_hash, computed
475                )));
476            }
477        }
478
479        // Meta is JSON string
480        let meta = if let Ok(meta_str) = get_string_field(fields, "meta") {
481            serde_json::from_str(&meta_str).ok()
482        } else {
483            None
484        };
485
486        Ok(CdcEvent {
487            stream_id: stream_id.to_string(),
488            op,
489            key,
490            hash,
491            data: Some(data),
492            meta,
493        })
494    }
495
496    /// Parse multiple stream entries in parallel.
497    ///
498    /// Decompression is CPU-bound, so we offload it to blocking tasks.
499    /// This significantly improves throughput when processing batches
500    /// of compressed events.
501    ///
502    /// Failed entries are logged and skipped (same as sequential parsing).
503    async fn parse_entries_parallel(
504        &self,
505        entries: Vec<(String, HashMap<String, redis::Value>)>,
506    ) -> Vec<CdcEvent> {
507        if entries.is_empty() {
508            return Vec::new();
509        }
510
511        // For small batches, parse sequentially (avoids spawn overhead)
512        if entries.len() <= 2 {
513            return entries
514                .into_iter()
515                .filter_map(|(id, fields)| {
516                    match self.parse_entry(&id, &fields) {
517                        Ok(event) => Some(event),
518                        Err(e) => {
519                            warn!(
520                                peer_id = %self.peer_id,
521                                stream_id = %id,
522                                error = %e,
523                                "Failed to parse stream entry, skipping"
524                            );
525                            None
526                        }
527                    }
528                })
529                .collect();
530        }
531
532        // Split into chunks to limit parallelism
533        let peer_id = self.peer_id.clone();
534        let futures: Vec<_> = entries
535            .into_iter()
536            .map(|(stream_id, fields)| {
537                tokio::task::spawn_blocking(move || {
538                    // Extract fields (non-blocking)
539                    let op_str = match get_string_field(&fields, "op") {
540                        Ok(s) => s,
541                        Err(e) => return Err((stream_id, e)),
542                    };
543                    let op = match CdcOp::from_str(&op_str) {
544                        Some(op) => op,
545                        None => {
546                            return Err((
547                                stream_id,
548                                ReplicationError::StreamParse(format!("Unknown op: {}", op_str)),
549                            ))
550                        }
551                    };
552
553                    let key = match get_string_field(&fields, "key") {
554                        Ok(k) => k,
555                        Err(e) => return Err((stream_id, e)),
556                    };
557
558                    if op == CdcOp::Delete {
559                        return Ok(CdcEvent {
560                            stream_id,
561                            op,
562                            key,
563                            hash: None,
564                            data: None,
565                            meta: None,
566                        });
567                    }
568
569                    let hash = get_string_field(&fields, "hash").ok();
570                    let raw_data = match get_bytes_field(&fields, "data") {
571                        Ok(d) => d,
572                        Err(e) => return Err((stream_id, e)),
573                    };
574
575                    // This is the CPU-intensive part
576                    let data = match maybe_decompress(&raw_data) {
577                        Ok(d) => d,
578                        Err(e) => return Err((stream_id, e)),
579                    };
580
581                    // Validate hash
582                    if let Some(ref expected) = hash {
583                        let computed = compute_content_hash(&data);
584                        if &computed != expected {
585                            return Err((
586                                stream_id.clone(),
587                                ReplicationError::StreamParse(format!(
588                                    "Hash mismatch for key '{}': expected {}, got {}",
589                                    key, expected, computed
590                                )),
591                            ));
592                        }
593                    }
594
595                    let meta = if let Ok(meta_str) = get_string_field(&fields, "meta") {
596                        serde_json::from_str(&meta_str).ok()
597                    } else {
598                        None
599                    };
600
601                    Ok(CdcEvent {
602                        stream_id,
603                        op,
604                        key,
605                        hash,
606                        data: Some(data),
607                        meta,
608                    })
609                })
610            })
611            .collect();
612
613        // Wait for all tasks, filter successes
614        let results = join_all(futures).await;
615        let mut events = Vec::with_capacity(results.len());
616
617        for result in results {
618            match result {
619                Ok(Ok(event)) => {
620                    trace!(
621                        peer_id = %peer_id,
622                        stream_id = %event.stream_id,
623                        op = ?event.op,
624                        key = %event.key,
625                        "Parsed CDC event (parallel)"
626                    );
627                    events.push(event);
628                }
629                Ok(Err((stream_id, e))) => {
630                    warn!(
631                        peer_id = %peer_id,
632                        stream_id = %stream_id,
633                        error = %e,
634                        "Failed to parse stream entry, skipping"
635                    );
636                }
637                Err(e) => {
638                    warn!(
639                        peer_id = %peer_id,
640                        error = %e,
641                        "Spawn blocking task panicked"
642                    );
643                }
644            }
645        }
646
647        events
648    }
649}
650
651/// Extract a string field from Redis hash.
652fn get_string_field(fields: &HashMap<String, redis::Value>, name: &str) -> Result<String> {
653    let value = fields
654        .get(name)
655        .ok_or_else(|| ReplicationError::StreamParse(format!("Missing field: {}", name)))?;
656
657    match value {
658        redis::Value::BulkString(bytes) => String::from_utf8(bytes.clone())
659            .map_err(|e| ReplicationError::StreamParse(format!("Invalid UTF-8 in {}: {}", name, e))),
660        redis::Value::SimpleString(s) => Ok(s.clone()),
661        _ => Err(ReplicationError::StreamParse(format!(
662            "Unexpected type for field {}: {:?}",
663            name, value
664        ))),
665    }
666}
667
668/// Extract a bytes field from Redis hash.
669fn get_bytes_field(fields: &HashMap<String, redis::Value>, name: &str) -> Result<Vec<u8>> {
670    let value = fields
671        .get(name)
672        .ok_or_else(|| ReplicationError::StreamParse(format!("Missing field: {}", name)))?;
673
674    match value {
675        redis::Value::BulkString(bytes) => Ok(bytes.clone()),
676        redis::Value::SimpleString(s) => Ok(s.as_bytes().to_vec()),
677        _ => Err(ReplicationError::StreamParse(format!(
678            "Unexpected type for field {}: {:?}",
679            name, value
680        ))),
681    }
682}
683
684/// Decompress zstd data if it has the magic header, otherwise return as-is.
685pub fn maybe_decompress(data: &[u8]) -> Result<Vec<u8>> {
686    if data.len() >= 4 && data[..4] == ZSTD_MAGIC {
687        let mut decoder = zstd::Decoder::new(data)
688            .map_err(|e| ReplicationError::Decompression(format!("zstd init: {}", e)))?;
689        let mut decompressed = Vec::new();
690        decoder
691            .read_to_end(&mut decompressed)
692            .map_err(|e| ReplicationError::Decompression(format!("zstd decode: {}", e)))?;
693        Ok(decompressed)
694    } else {
695        Ok(data.to_vec())
696    }
697}
698
699/// Compute SHA256 content hash as hex string.
700fn compute_content_hash(data: &[u8]) -> String {
701    use sha2::{Digest, Sha256};
702    let hash = Sha256::digest(data);
703    hex::encode(hash)
704}
705
706/// Compare two Redis stream IDs.
707///
708/// Stream IDs are formatted as `{timestamp}-{sequence}` (e.g., "1234567890123-0").
709/// This function compares them numerically, not lexicographically.
710///
711/// Returns `Ordering::Less` if `a < b`, `Equal` if `a == b`, `Greater` if `a > b`.
712pub fn compare_stream_ids(a: &str, b: &str) -> std::cmp::Ordering {
713    use std::cmp::Ordering;
714
715    // Parse "timestamp-sequence" format
716    let parse = |s: &str| -> (u64, u64) {
717        let parts: Vec<&str> = s.split('-').collect();
718        if parts.len() == 2 {
719            let ts = parts[0].parse().unwrap_or(0);
720            let seq = parts[1].parse().unwrap_or(0);
721            (ts, seq)
722        } else {
723            // Handle special cases like "0" (start) or malformed IDs
724            (s.parse().unwrap_or(0), 0)
725        }
726    };
727
728    let (a_ts, a_seq) = parse(a);
729    let (b_ts, b_seq) = parse(b);
730
731    match a_ts.cmp(&b_ts) {
732        Ordering::Equal => a_seq.cmp(&b_seq),
733        other => other,
734    }
735}
736
737/// Parse the timestamp (milliseconds since epoch) from a stream ID.
738///
739/// Stream IDs are formatted as `{timestamp_ms}-{sequence}`.
740/// Returns `None` for malformed IDs or special cases like "0".
741pub fn parse_stream_id_timestamp(stream_id: &str) -> Option<u64> {
742    let parts: Vec<&str> = stream_id.split('-').collect();
743    if parts.len() == 2 {
744        parts[0].parse().ok()
745    } else {
746        None
747    }
748}
749
750/// Calculate the time lag in milliseconds between two stream IDs.
751///
752/// Returns `None` if either ID can't be parsed.
753/// Returns 0 if cursor is ahead of latest (shouldn't happen normally).
754pub fn calculate_lag_ms(cursor: &str, latest: &str) -> Option<u64> {
755    let cursor_ts = parse_stream_id_timestamp(cursor)?;
756    let latest_ts = parse_stream_id_timestamp(latest)?;
757    Some(latest_ts.saturating_sub(cursor_ts))
758}
759
760#[cfg(test)]
761mod tests {
762    use super::*;
763
764    #[test]
765    fn test_cdc_op_from_str() {
766        assert_eq!(CdcOp::from_str("PUT"), Some(CdcOp::Put));
767        assert_eq!(CdcOp::from_str("put"), Some(CdcOp::Put));
768        assert_eq!(CdcOp::from_str("DEL"), Some(CdcOp::Delete));
769        assert_eq!(CdcOp::from_str("DELETE"), Some(CdcOp::Delete));
770        assert_eq!(CdcOp::from_str("delete"), Some(CdcOp::Delete));
771        assert_eq!(CdcOp::from_str("UNKNOWN"), None);
772        assert_eq!(CdcOp::from_str(""), None);
773    }
774
775    #[test]
776    fn test_cdc_event_is_put() {
777        let event = CdcEvent {
778            stream_id: "1-0".to_string(),
779            op: CdcOp::Put,
780            key: "test".to_string(),
781            hash: None,
782            data: None,
783            meta: None,
784        };
785        assert!(event.is_put());
786        assert!(!event.is_delete());
787    }
788
789    #[test]
790    fn test_cdc_event_is_delete() {
791        let event = CdcEvent {
792            stream_id: "1-0".to_string(),
793            op: CdcOp::Delete,
794            key: "test".to_string(),
795            hash: None,
796            data: None,
797            meta: None,
798        };
799        assert!(event.is_delete());
800        assert!(!event.is_put());
801    }
802
803    #[test]
804    fn test_cdc_event_trace_parent() {
805        // No meta
806        let event = CdcEvent {
807            stream_id: "1-0".to_string(),
808            op: CdcOp::Put,
809            key: "test".to_string(),
810            hash: None,
811            data: None,
812            meta: None,
813        };
814        assert_eq!(event.trace_parent(), None);
815        assert_eq!(event.trace_id(), None);
816        assert_eq!(event.parent_span_id(), None);
817
818        // Meta without trace_parent
819        let event = CdcEvent {
820            stream_id: "1-0".to_string(),
821            op: CdcOp::Put,
822            key: "test".to_string(),
823            hash: None,
824            data: None,
825            meta: Some(CdcMeta {
826                content_type: "application/json".to_string(),
827                version: 1,
828                updated_at: 12345,
829                trace_parent: None,
830            }),
831        };
832        assert_eq!(event.trace_parent(), None);
833
834        // Meta with trace_parent (W3C format)
835        let event = CdcEvent {
836            stream_id: "1-0".to_string(),
837            op: CdcOp::Put,
838            key: "test".to_string(),
839            hash: None,
840            data: None,
841            meta: Some(CdcMeta {
842                content_type: "application/json".to_string(),
843                version: 1,
844                updated_at: 12345,
845                trace_parent: Some("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".to_string()),
846            }),
847        };
848        assert_eq!(event.trace_parent(), Some("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"));
849        assert_eq!(event.trace_id(), Some("0af7651916cd43dd8448eb211c80319c"));
850        assert_eq!(event.parent_span_id(), Some("b7ad6b7169203331"));
851    }
852
853    #[test]
854    fn test_cdc_event_trace_parent_malformed() {
855        // Malformed trace_parent (not enough parts)
856        let event = CdcEvent {
857            stream_id: "1-0".to_string(),
858            op: CdcOp::Put,
859            key: "test".to_string(),
860            hash: None,
861            data: None,
862            meta: Some(CdcMeta {
863                content_type: "application/json".to_string(),
864                version: 1,
865                updated_at: 12345,
866                trace_parent: Some("invalid".to_string()),
867            }),
868        };
869        assert_eq!(event.trace_parent(), Some("invalid"));
870        assert_eq!(event.trace_id(), None);
871        assert_eq!(event.parent_span_id(), None);
872
873        // Only version
874        let event = CdcEvent {
875            stream_id: "1-0".to_string(),
876            op: CdcOp::Put,
877            key: "test".to_string(),
878            hash: None,
879            data: None,
880            meta: Some(CdcMeta {
881                content_type: "application/json".to_string(),
882                version: 1,
883                updated_at: 12345,
884                trace_parent: Some("00-traceid".to_string()),
885            }),
886        };
887        assert_eq!(event.trace_id(), Some("traceid"));
888        assert_eq!(event.parent_span_id(), None);
889    }
890
891    #[test]
892    fn test_maybe_decompress_uncompressed() {
893        let data = b"hello world";
894        let result = maybe_decompress(data).unwrap();
895        assert_eq!(result, data);
896    }
897
898    #[test]
899    fn test_maybe_decompress_zstd() {
900        let original = b"hello world hello world hello world";
901        let compressed = zstd::encode_all(&original[..], 3).unwrap();
902        
903        // Verify it has magic bytes
904        assert_eq!(&compressed[..4], &ZSTD_MAGIC);
905        
906        let result = maybe_decompress(&compressed).unwrap();
907        assert_eq!(result, original);
908    }
909
910    #[test]
911    fn test_maybe_decompress_short_data() {
912        // Less than 4 bytes should not crash
913        let data = b"ab";
914        let result = maybe_decompress(data).unwrap();
915        assert_eq!(result, data);
916
917        let data = b"";
918        let result = maybe_decompress(data).unwrap();
919        assert_eq!(result, data);
920    }
921
922    #[test]
923    fn test_maybe_decompress_fake_magic() {
924        // Data that starts with zstd magic but isn't valid zstd
925        let mut data = ZSTD_MAGIC.to_vec();
926        data.extend_from_slice(b"not valid zstd data");
927        
928        let result = maybe_decompress(&data);
929        assert!(result.is_err());
930    }
931
932    #[test]
933    fn test_compare_stream_ids() {
934        use std::cmp::Ordering;
935
936        // Same timestamp, different sequence
937        assert_eq!(
938            compare_stream_ids("1234567890123-0", "1234567890123-1"),
939            Ordering::Less
940        );
941        assert_eq!(
942            compare_stream_ids("1234567890123-1", "1234567890123-0"),
943            Ordering::Greater
944        );
945        assert_eq!(
946            compare_stream_ids("1234567890123-0", "1234567890123-0"),
947            Ordering::Equal
948        );
949
950        // Different timestamps
951        assert_eq!(
952            compare_stream_ids("1234567890000-0", "1234567890123-0"),
953            Ordering::Less
954        );
955        assert_eq!(
956            compare_stream_ids("1234567890123-0", "1234567890000-0"),
957            Ordering::Greater
958        );
959
960        // Edge case: "0" start marker
961        assert_eq!(
962            compare_stream_ids("0", "1234567890123-0"),
963            Ordering::Less
964        );
965
966        // Both are special "0"
967        assert_eq!(compare_stream_ids("0", "0"), Ordering::Equal);
968    }
969
970    #[test]
971    fn test_compare_stream_ids_high_sequence() {
972        use std::cmp::Ordering;
973
974        // High sequence numbers
975        assert_eq!(
976            compare_stream_ids("1000-999999", "1000-1000000"),
977            Ordering::Less
978        );
979    }
980
981    #[test]
982    fn test_read_result_methods() {
983        // Events result
984        let events = vec![CdcEvent {
985            stream_id: "1-0".to_string(),
986            op: CdcOp::Put,
987            key: "test".to_string(),
988            hash: None,
989            data: None,
990            meta: None,
991        }];
992        let result = ReadResult::Events(events);
993        assert!(!result.is_trimmed());
994        assert_eq!(result.events().len(), 1);
995
996        // Trimmed result
997        let result = ReadResult::StreamTrimmed {
998            cursor: "1-0".to_string(),
999            oldest_id: "100-0".to_string(),
1000        };
1001        assert!(result.is_trimmed());
1002        assert_eq!(result.events().len(), 0);
1003
1004        // Empty result
1005        let result = ReadResult::Empty;
1006        assert!(!result.is_trimmed());
1007        assert_eq!(result.events().len(), 0);
1008    }
1009
1010    #[test]
1011    fn test_read_result_events_empty() {
1012        let result = ReadResult::Events(vec![]);
1013        assert!(!result.is_trimmed());
1014        assert_eq!(result.events().len(), 0);
1015    }
1016
1017    #[test]
1018    fn test_compute_content_hash() {
1019        let data = b"hello world";
1020        let hash = compute_content_hash(data);
1021        // SHA256 of "hello world"
1022        assert_eq!(
1023            hash,
1024            "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
1025        );
1026    }
1027
1028    #[test]
1029    fn test_compute_content_hash_empty() {
1030        let data = b"";
1031        let hash = compute_content_hash(data);
1032        // SHA256 of empty string
1033        assert_eq!(
1034            hash,
1035            "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
1036        );
1037    }
1038
1039    #[test]
1040    fn test_content_hash_validation_matches() {
1041        use sha2::{Digest, Sha256};
1042
1043        let data = b"test data for hashing";
1044        let expected_hash = hex::encode(Sha256::digest(data));
1045        let computed = compute_content_hash(data);
1046        assert_eq!(computed, expected_hash);
1047    }
1048
1049    #[test]
1050    fn test_parse_stream_id_timestamp() {
1051        // Normal stream ID
1052        assert_eq!(
1053            parse_stream_id_timestamp("1234567890123-0"),
1054            Some(1234567890123)
1055        );
1056        assert_eq!(
1057            parse_stream_id_timestamp("1234567890123-99"),
1058            Some(1234567890123)
1059        );
1060
1061        // Special "0" marker
1062        assert_eq!(parse_stream_id_timestamp("0"), None);
1063
1064        // Malformed
1065        assert_eq!(parse_stream_id_timestamp("invalid"), None);
1066        assert_eq!(parse_stream_id_timestamp(""), None);
1067        assert_eq!(parse_stream_id_timestamp("abc-def"), None);
1068    }
1069
1070    #[test]
1071    fn test_calculate_lag_ms() {
1072        // Normal case: cursor behind latest
1073        assert_eq!(
1074            calculate_lag_ms("1000000000000-0", "1000000005000-0"),
1075            Some(5000)
1076        );
1077
1078        // Caught up (same timestamp)
1079        assert_eq!(
1080            calculate_lag_ms("1000000000000-0", "1000000000000-5"),
1081            Some(0)
1082        );
1083
1084        // Cursor ahead (shouldn't happen, but should handle gracefully)
1085        assert_eq!(
1086            calculate_lag_ms("1000000005000-0", "1000000000000-0"),
1087            Some(0) // saturating_sub returns 0
1088        );
1089
1090        // Invalid cursor
1091        assert_eq!(calculate_lag_ms("0", "1000000000000-0"), None);
1092
1093        // Invalid latest
1094        assert_eq!(calculate_lag_ms("1000000000000-0", "invalid"), None);
1095
1096        // Both invalid
1097        assert_eq!(calculate_lag_ms("abc", "def"), None);
1098    }
1099
1100    #[test]
1101    fn test_get_string_field() {
1102        let mut fields = HashMap::new();
1103        fields.insert("key1".to_string(), redis::Value::BulkString(b"value1".to_vec()));
1104        fields.insert("key2".to_string(), redis::Value::SimpleString("value2".to_string()));
1105        fields.insert("key3".to_string(), redis::Value::Int(42));
1106
1107        // BulkString extraction
1108        assert_eq!(get_string_field(&fields, "key1").unwrap(), "value1");
1109
1110        // SimpleString extraction
1111        assert_eq!(get_string_field(&fields, "key2").unwrap(), "value2");
1112
1113        // Missing field
1114        assert!(get_string_field(&fields, "missing").is_err());
1115
1116        // Wrong type (Int)
1117        assert!(get_string_field(&fields, "key3").is_err());
1118    }
1119
1120    #[test]
1121    fn test_get_string_field_invalid_utf8() {
1122        let mut fields = HashMap::new();
1123        // Invalid UTF-8 sequence
1124        fields.insert("bad".to_string(), redis::Value::BulkString(vec![0xFF, 0xFE]));
1125
1126        let result = get_string_field(&fields, "bad");
1127        assert!(result.is_err());
1128    }
1129
1130    #[test]
1131    fn test_get_bytes_field() {
1132        let mut fields = HashMap::new();
1133        fields.insert("data".to_string(), redis::Value::BulkString(vec![1, 2, 3, 4]));
1134        fields.insert("text".to_string(), redis::Value::SimpleString("hello".to_string()));
1135        fields.insert("num".to_string(), redis::Value::Int(42));
1136
1137        // BulkString extraction
1138        assert_eq!(get_bytes_field(&fields, "data").unwrap(), vec![1, 2, 3, 4]);
1139
1140        // SimpleString extraction (as bytes)
1141        assert_eq!(get_bytes_field(&fields, "text").unwrap(), b"hello".to_vec());
1142
1143        // Missing field
1144        assert!(get_bytes_field(&fields, "missing").is_err());
1145
1146        // Wrong type (Int)
1147        assert!(get_bytes_field(&fields, "num").is_err());
1148    }
1149
1150    #[test]
1151    fn test_stream_tailer_new() {
1152        let tailer = StreamTailer::new(
1153            "peer-1".to_string(),
1154            "cdc".to_string(),
1155            Duration::from_secs(5),
1156            100,
1157        );
1158        assert_eq!(tailer.batch_size(), 100);
1159    }
1160
1161    #[test]
1162    fn test_stream_tailer_set_batch_size() {
1163        let mut tailer = StreamTailer::new(
1164            "peer-1".to_string(),
1165            "cdc".to_string(),
1166            Duration::from_secs(5),
1167            100,
1168        );
1169        assert_eq!(tailer.batch_size(), 100);
1170        
1171        tailer.set_batch_size(500);
1172        assert_eq!(tailer.batch_size(), 500);
1173        
1174        tailer.set_batch_size(0);
1175        assert_eq!(tailer.batch_size(), 0);
1176    }
1177
1178    #[test]
1179    fn test_cdc_meta_serialization() {
1180        let meta = CdcMeta {
1181            content_type: "application/json".to_string(),
1182            version: 42,
1183            updated_at: 1234567890123,
1184            trace_parent: Some("00-traceid-spanid-01".to_string()),
1185        };
1186        
1187        let json = serde_json::to_string(&meta).unwrap();
1188        assert!(json.contains("application/json"));
1189        assert!(json.contains("42"));
1190        assert!(json.contains("1234567890123"));
1191        assert!(json.contains("traceid"));
1192
1193        let parsed: CdcMeta = serde_json::from_str(&json).unwrap();
1194        assert_eq!(parsed.content_type, "application/json");
1195        assert_eq!(parsed.version, 42);
1196        assert_eq!(parsed.updated_at, 1234567890123);
1197        assert_eq!(parsed.trace_parent, Some("00-traceid-spanid-01".to_string()));
1198    }
1199
1200    #[test]
1201    fn test_cdc_meta_serialization_no_trace() {
1202        let meta = CdcMeta {
1203            content_type: "text/plain".to_string(),
1204            version: 1,
1205            updated_at: 0,
1206            trace_parent: None,
1207        };
1208        
1209        let json = serde_json::to_string(&meta).unwrap();
1210        // trace_parent should be skipped when None
1211        assert!(!json.contains("trace_parent"));
1212    }
1213}