Skip to main content

laminar_storage/checkpoint/
source_offsets.rs

1//! Typed source position tracking for checkpoint recovery.
2//!
3//! Provides `SourcePosition` — a strongly typed enum representing
4//! connector-specific offsets — alongside conversion methods to/from
5//! the existing `ConnectorCheckpoint` format.
6//!
7//! Also provides:
8//! - `SourceId` — newtype for source identifiers within a pipeline.
9//! - `SourceOffset` — combines a `SourceId` with a `SourcePosition`.
10//! - `RecoveryPlan` — recovery plan built from checkpoint manifests.
11//! - `DeterminismWarning` — warnings about non-determinism during recovery.
12
13#[allow(clippy::disallowed_types)] // cold path: checkpoint recovery
14use std::collections::HashMap;
15use std::fmt;
16
17use serde::{Deserialize, Serialize};
18
19use crate::checkpoint::layout::SourceOffsetEntry;
20use crate::checkpoint_manifest::ConnectorCheckpoint;
21
22/// Unique identifier for a source within a pipeline.
23#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
24pub struct SourceId(pub String);
25
26impl SourceId {
27    /// Create a new source identifier.
28    #[must_use]
29    pub fn new(id: impl Into<String>) -> Self {
30        Self(id.into())
31    }
32
33    /// Return the inner string slice.
34    #[must_use]
35    pub fn as_str(&self) -> &str {
36        &self.0
37    }
38}
39
40impl fmt::Display for SourceId {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        write!(f, "{}", self.0)
43    }
44}
45
46impl From<&str> for SourceId {
47    fn from(s: &str) -> Self {
48        Self(s.to_string())
49    }
50}
51
52impl From<String> for SourceId {
53    fn from(s: String) -> Self {
54        Self(s)
55    }
56}
57
58/// A source's read position at checkpoint time.
59///
60/// Combines a [`SourceId`] with a [`SourcePosition`] to fully identify
61/// where a specific source was reading when the checkpoint was taken.
62#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
63pub struct SourceOffset {
64    /// Which source this offset belongs to.
65    pub source_id: SourceId,
66    /// The source-specific position.
67    pub position: SourcePosition,
68}
69
70/// Kafka partition-level offset.
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
72pub struct KafkaPartitionOffset {
73    /// Topic name.
74    pub topic: String,
75    /// Partition number.
76    pub partition: i32,
77    /// Offset within the partition.
78    pub offset: i64,
79}
80
81/// Kafka source position (all partitions for a consumer group).
82#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
83pub struct KafkaPosition {
84    /// Consumer group ID.
85    pub group_id: String,
86    /// Per-partition offsets.
87    pub partitions: Vec<KafkaPartitionOffset>,
88}
89
90/// `PostgreSQL` CDC position tracked via replication slot.
91#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
92pub struct PostgresCdcPosition {
93    /// Confirmed flush LSN (hex string, e.g. "0/1234ABCD").
94    pub confirmed_flush_lsn: String,
95    /// Write LSN (may be ahead of confirmed flush).
96    pub write_lsn: Option<String>,
97    /// Replication slot name.
98    pub slot_name: String,
99}
100
101/// `MySQL` CDC position tracked via GTID set or binlog coordinates.
102#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
103pub struct MysqlCdcPosition {
104    /// GTID set (e.g. "uuid:1-5").
105    pub gtid_set: Option<String>,
106    /// Binlog file name.
107    pub binlog_file: Option<String>,
108    /// Binlog position within the file.
109    pub binlog_position: Option<u64>,
110}
111
112/// File source position.
113#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
114pub struct FilePosition {
115    /// Path to the file being read.
116    pub path: String,
117    /// Byte offset into the file.
118    pub byte_offset: u64,
119}
120
121/// Generic position for custom connectors.
122#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
123pub struct GenericPosition {
124    /// Connector type identifier.
125    pub connector_type: String,
126    /// Opaque key-value offsets.
127    pub offsets: HashMap<String, String>,
128}
129
130/// Strongly typed source position.
131///
132/// Each variant captures the native offset format for a connector type,
133/// enabling type-safe validation and recovery planning.
134#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
135#[serde(tag = "type")]
136pub enum SourcePosition {
137    /// Kafka consumer group offsets.
138    Kafka(KafkaPosition),
139    /// `PostgreSQL` CDC replication slot position.
140    PostgresCdc(PostgresCdcPosition),
141    /// `MySQL` CDC binlog/GTID position.
142    MysqlCdc(MysqlCdcPosition),
143    /// File source byte offset.
144    File(FilePosition),
145    /// Generic connector offsets.
146    Generic(GenericPosition),
147}
148
149impl SourcePosition {
150    /// Convert to a [`ConnectorCheckpoint`] for storage.
151    #[must_use]
152    pub fn to_connector_checkpoint(&self, epoch: u64) -> ConnectorCheckpoint {
153        let mut cp = ConnectorCheckpoint::new(epoch);
154        match self {
155            Self::Kafka(pos) => {
156                cp.metadata.insert("connector_type".into(), "kafka".into());
157                cp.metadata.insert("group_id".into(), pos.group_id.clone());
158                for p in &pos.partitions {
159                    cp.offsets
160                        .insert(format!("{}-{}", p.topic, p.partition), p.offset.to_string());
161                }
162            }
163            Self::PostgresCdc(pos) => {
164                cp.metadata
165                    .insert("connector_type".into(), "postgres_cdc".into());
166                cp.offsets.insert(
167                    "confirmed_flush_lsn".into(),
168                    pos.confirmed_flush_lsn.clone(),
169                );
170                if let Some(ref lsn) = pos.write_lsn {
171                    cp.offsets.insert("write_lsn".into(), lsn.clone());
172                }
173                cp.metadata
174                    .insert("slot_name".into(), pos.slot_name.clone());
175            }
176            Self::MysqlCdc(pos) => {
177                cp.metadata
178                    .insert("connector_type".into(), "mysql_cdc".into());
179                if let Some(ref gtid) = pos.gtid_set {
180                    cp.offsets.insert("gtid_set".into(), gtid.clone());
181                }
182                if let Some(ref file) = pos.binlog_file {
183                    cp.offsets.insert("binlog_file".into(), file.clone());
184                }
185                if let Some(binlog_pos) = pos.binlog_position {
186                    cp.offsets
187                        .insert("binlog_position".into(), binlog_pos.to_string());
188                }
189            }
190            Self::File(pos) => {
191                cp.metadata.insert("connector_type".into(), "file".into());
192                cp.offsets.insert("path".into(), pos.path.clone());
193                cp.offsets
194                    .insert("byte_offset".into(), pos.byte_offset.to_string());
195            }
196            Self::Generic(pos) => {
197                cp.metadata
198                    .insert("connector_type".into(), pos.connector_type.clone());
199                cp.offsets.clone_from(&pos.offsets);
200            }
201        }
202        cp
203    }
204
205    /// Try to reconstruct a typed position from a [`ConnectorCheckpoint`].
206    ///
207    /// Uses the `connector_type` metadata key to determine which variant
208    /// to build. Returns `None` if the type is unknown or the offsets
209    /// are incomplete.
210    #[must_use]
211    pub fn from_connector_checkpoint(
212        cp: &ConnectorCheckpoint,
213        type_hint: Option<&str>,
214    ) -> Option<Self> {
215        let connector_type =
216            type_hint.or_else(|| cp.metadata.get("connector_type").map(String::as_str))?;
217
218        match connector_type {
219            "kafka" => {
220                let group_id = cp.metadata.get("group_id").cloned().unwrap_or_else(|| {
221                    tracing::warn!(
222                        "[LDB-6011] Kafka source checkpoint missing 'group_id' metadata; \
223                         defaulting to empty — offset recovery may use wrong consumer group"
224                    );
225                    String::new()
226                });
227                let mut partitions = Vec::new();
228                for (key, value) in &cp.offsets {
229                    // Keys are "topic-partition"
230                    if let Some(dash_pos) = key.rfind('-') {
231                        let topic = key[..dash_pos].to_string();
232                        if let Ok(partition) = key[dash_pos + 1..].parse::<i32>() {
233                            if let Ok(offset) = value.parse::<i64>() {
234                                partitions.push(KafkaPartitionOffset {
235                                    topic,
236                                    partition,
237                                    offset,
238                                });
239                            }
240                        }
241                    }
242                }
243                Some(Self::Kafka(KafkaPosition {
244                    group_id,
245                    partitions,
246                }))
247            }
248            "postgres_cdc" => {
249                let confirmed_flush_lsn = cp.offsets.get("confirmed_flush_lsn")?.clone();
250                let write_lsn = cp.offsets.get("write_lsn").cloned();
251                let slot_name = cp.metadata.get("slot_name").cloned().unwrap_or_else(|| {
252                    tracing::warn!(
253                        "[LDB-6011] Postgres CDC checkpoint missing 'slot_name' metadata; \
254                         defaulting to empty — replication slot recovery may fail"
255                    );
256                    String::new()
257                });
258                Some(Self::PostgresCdc(PostgresCdcPosition {
259                    confirmed_flush_lsn,
260                    write_lsn,
261                    slot_name,
262                }))
263            }
264            "mysql_cdc" => Some(Self::MysqlCdc(MysqlCdcPosition {
265                gtid_set: cp.offsets.get("gtid_set").cloned(),
266                binlog_file: cp.offsets.get("binlog_file").cloned(),
267                binlog_position: cp
268                    .offsets
269                    .get("binlog_position")
270                    .and_then(|s| s.parse().ok()),
271            })),
272            "file" => {
273                let path = cp.offsets.get("path")?.clone();
274                let byte_offset = cp
275                    .offsets
276                    .get("byte_offset")
277                    .and_then(|s| s.parse().ok())
278                    .unwrap_or(0);
279                Some(Self::File(FilePosition { path, byte_offset }))
280            }
281            _ => Some(Self::Generic(GenericPosition {
282                connector_type: connector_type.to_string(),
283                offsets: cp.offsets.clone(),
284            })),
285        }
286    }
287
288    /// Convert to a [`SourceOffsetEntry`] for the V2 manifest format.
289    #[must_use]
290    pub fn to_offset_entry(&self, epoch: u64) -> SourceOffsetEntry {
291        let (source_type, offsets) = match self {
292            Self::Kafka(pos) => {
293                let mut offsets = HashMap::new();
294                offsets.insert("group_id".into(), pos.group_id.clone());
295                for p in &pos.partitions {
296                    offsets.insert(format!("{}-{}", p.topic, p.partition), p.offset.to_string());
297                }
298                ("kafka".to_string(), offsets)
299            }
300            Self::PostgresCdc(pos) => {
301                let mut offsets = HashMap::new();
302                offsets.insert(
303                    "confirmed_flush_lsn".into(),
304                    pos.confirmed_flush_lsn.clone(),
305                );
306                if let Some(ref lsn) = pos.write_lsn {
307                    offsets.insert("write_lsn".into(), lsn.clone());
308                }
309                offsets.insert("slot_name".into(), pos.slot_name.clone());
310                ("postgres_cdc".to_string(), offsets)
311            }
312            Self::MysqlCdc(pos) => {
313                let mut offsets = HashMap::new();
314                if let Some(ref gtid) = pos.gtid_set {
315                    offsets.insert("gtid_set".into(), gtid.clone());
316                }
317                if let Some(ref file) = pos.binlog_file {
318                    offsets.insert("binlog_file".into(), file.clone());
319                }
320                if let Some(binlog_pos) = pos.binlog_position {
321                    offsets.insert("binlog_position".into(), binlog_pos.to_string());
322                }
323                ("mysql_cdc".to_string(), offsets)
324            }
325            Self::File(pos) => {
326                let mut offsets = HashMap::new();
327                offsets.insert("path".into(), pos.path.clone());
328                offsets.insert("byte_offset".into(), pos.byte_offset.to_string());
329                ("file".to_string(), offsets)
330            }
331            Self::Generic(pos) => (pos.connector_type.clone(), pos.offsets.clone()),
332        };
333        SourceOffsetEntry {
334            source_type,
335            offsets,
336            epoch,
337        }
338    }
339
340    /// Reconstruct a typed position from a [`SourceOffsetEntry`].
341    ///
342    /// Returns `None` if the entry's source type is unrecognized or the
343    /// offsets are incomplete.
344    #[must_use]
345    pub fn from_offset_entry(entry: &SourceOffsetEntry) -> Option<Self> {
346        match entry.source_type.as_str() {
347            "kafka" => {
348                let group_id = entry.offsets.get("group_id").cloned().unwrap_or_default();
349                let mut partitions = Vec::new();
350                for (key, value) in &entry.offsets {
351                    if key == "group_id" {
352                        continue;
353                    }
354                    if let Some(dash_pos) = key.rfind('-') {
355                        let topic = key[..dash_pos].to_string();
356                        if let Ok(partition) = key[dash_pos + 1..].parse::<i32>() {
357                            if let Ok(offset) = value.parse::<i64>() {
358                                partitions.push(KafkaPartitionOffset {
359                                    topic,
360                                    partition,
361                                    offset,
362                                });
363                            }
364                        }
365                    }
366                }
367                Some(Self::Kafka(KafkaPosition {
368                    group_id,
369                    partitions,
370                }))
371            }
372            "postgres_cdc" => {
373                let confirmed_flush_lsn = entry.offsets.get("confirmed_flush_lsn")?.clone();
374                let write_lsn = entry.offsets.get("write_lsn").cloned();
375                let slot_name = entry.offsets.get("slot_name").cloned().unwrap_or_default();
376                Some(Self::PostgresCdc(PostgresCdcPosition {
377                    confirmed_flush_lsn,
378                    write_lsn,
379                    slot_name,
380                }))
381            }
382            "mysql_cdc" => Some(Self::MysqlCdc(MysqlCdcPosition {
383                gtid_set: entry.offsets.get("gtid_set").cloned(),
384                binlog_file: entry.offsets.get("binlog_file").cloned(),
385                binlog_position: entry
386                    .offsets
387                    .get("binlog_position")
388                    .and_then(|s| s.parse().ok()),
389            })),
390            "file" => {
391                let path = entry.offsets.get("path")?.clone();
392                let byte_offset = entry
393                    .offsets
394                    .get("byte_offset")
395                    .and_then(|s| s.parse().ok())
396                    .unwrap_or(0);
397                Some(Self::File(FilePosition { path, byte_offset }))
398            }
399            _ => Some(Self::Generic(GenericPosition {
400                connector_type: entry.source_type.clone(),
401                offsets: entry.offsets.clone(),
402            })),
403        }
404    }
405}
406
407/// Severity level for recovery warnings.
408#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
409pub enum WarningSeverity {
410    /// Informational — recovery can proceed normally.
411    Info,
412    /// Warning — recovery can proceed but results may differ.
413    Warning,
414    /// Error — recovery may produce incorrect results.
415    Error,
416}
417
418/// A warning generated during recovery planning.
419#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
420pub struct DeterminismWarning {
421    /// Source name that generated the warning.
422    pub source_name: String,
423    /// Human-readable description of the issue.
424    pub message: String,
425    /// Severity level.
426    pub severity: WarningSeverity,
427}
428
429/// Recovery plan built from a checkpoint manifest.
430///
431/// Describes where each source should resume and any warnings
432/// about determinism or data availability.
433#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
434pub struct RecoveryPlan {
435    /// Per-source typed positions for resuming.
436    pub source_positions: HashMap<String, SourcePosition>,
437    /// Epoch to resume from.
438    pub resume_epoch: u64,
439    /// Warnings generated during planning.
440    pub warnings: Vec<DeterminismWarning>,
441}
442
443impl RecoveryPlan {
444    /// Build a recovery plan from a checkpoint manifest's source offsets.
445    #[must_use]
446    pub fn from_manifest(
447        source_offsets: &HashMap<String, ConnectorCheckpoint>,
448        epoch: u64,
449    ) -> Self {
450        let mut source_positions = HashMap::new();
451        let mut warnings = Vec::new();
452
453        for (name, cp) in source_offsets {
454            if let Some(pos) = SourcePosition::from_connector_checkpoint(cp, None) {
455                source_positions.insert(name.clone(), pos);
456            } else {
457                warnings.push(DeterminismWarning {
458                    source_name: name.clone(),
459                    message: "Could not reconstruct typed position; \
460                              will use raw offsets for recovery"
461                        .into(),
462                    severity: WarningSeverity::Warning,
463                });
464            }
465        }
466
467        Self {
468            source_positions,
469            resume_epoch: epoch,
470            warnings,
471        }
472    }
473
474    /// Build a recovery plan from a V2 manifest's source offset entries.
475    #[must_use]
476    pub fn from_manifest_v2(
477        source_offsets: &HashMap<String, SourceOffsetEntry>,
478        epoch: u64,
479    ) -> Self {
480        let mut source_positions = HashMap::new();
481        let mut warnings = Vec::new();
482
483        for (name, entry) in source_offsets {
484            if let Some(pos) = SourcePosition::from_offset_entry(entry) {
485                source_positions.insert(name.clone(), pos);
486            } else {
487                warnings.push(DeterminismWarning {
488                    source_name: name.clone(),
489                    message: "Could not reconstruct typed position from V2 \
490                              offset entry; will use raw offsets for recovery"
491                        .into(),
492                    severity: WarningSeverity::Warning,
493                });
494            }
495        }
496
497        Self {
498            source_positions,
499            resume_epoch: epoch,
500            warnings,
501        }
502    }
503}
504
505/// Trait describing an operator's determinism properties.
506///
507/// Used by [`DeterminismValidator`] to check whether operators in an
508/// exactly-once pipeline are deterministic (same input + same state = same
509/// output). Non-deterministic operators break the replay-based exactly-once
510/// guarantee.
511pub trait OperatorDescriptor {
512    /// Unique operator identifier.
513    fn id(&self) -> &str;
514
515    /// Whether the operator reads wall-clock / processing time.
516    fn uses_wall_clock(&self) -> bool;
517
518    /// Whether the operator uses randomness (e.g., `rand::random()`).
519    fn uses_random(&self) -> bool;
520
521    /// Whether the operator has external side effects (e.g., HTTP calls).
522    fn has_external_side_effects(&self) -> bool;
523}
524
525/// Warning about potential non-determinism in an operator.
526#[derive(Debug, Clone, PartialEq, Eq)]
527pub enum OperatorDeterminismWarning {
528    /// Operator uses wall-clock / processing time.
529    WallClockUsage {
530        /// Operator that uses wall-clock time.
531        operator_id: String,
532        /// Suggested fix.
533        suggestion: String,
534    },
535    /// Operator uses randomness.
536    RandomUsage {
537        /// Operator that uses randomness.
538        operator_id: String,
539        /// Suggested fix.
540        suggestion: String,
541    },
542    /// Operator has external side effects.
543    ExternalSideEffect {
544        /// Operator with side effects.
545        operator_id: String,
546        /// Suggested fix.
547        suggestion: String,
548    },
549}
550
551/// Validates that operators in a pipeline are deterministic.
552///
553/// Deterministic operators produce identical output given identical input
554/// and state. This validator checks known non-determinism sources and
555/// returns warnings for operators that may break replay-based exactly-once.
556pub struct DeterminismValidator;
557
558impl DeterminismValidator {
559    /// Validate that an operator is deterministic.
560    ///
561    /// Returns a list of warnings for potential non-determinism sources.
562    /// An empty list means the operator appears deterministic.
563    #[must_use]
564    pub fn validate(operator: &dyn OperatorDescriptor) -> Vec<OperatorDeterminismWarning> {
565        let mut warnings = Vec::new();
566
567        if operator.uses_wall_clock() {
568            warnings.push(OperatorDeterminismWarning::WallClockUsage {
569                operator_id: operator.id().to_string(),
570                suggestion: "Use event-time instead of processing-time".into(),
571            });
572        }
573
574        if operator.uses_random() {
575            warnings.push(OperatorDeterminismWarning::RandomUsage {
576                operator_id: operator.id().to_string(),
577                suggestion: "Use deterministic seed or remove randomness".into(),
578            });
579        }
580
581        if operator.has_external_side_effects() {
582            warnings.push(OperatorDeterminismWarning::ExternalSideEffect {
583                operator_id: operator.id().to_string(),
584                suggestion: "Move side effects to a sink connector".into(),
585            });
586        }
587
588        warnings
589    }
590
591    /// Validate all operators in a pipeline.
592    ///
593    /// Returns warnings for all non-deterministic operators.
594    #[must_use]
595    pub fn validate_all(operators: &[&dyn OperatorDescriptor]) -> Vec<OperatorDeterminismWarning> {
596        operators
597            .iter()
598            .flat_map(|op| Self::validate(*op))
599            .collect()
600    }
601}
602
603#[cfg(test)]
604mod tests {
605    use super::*;
606
607    #[test]
608    fn test_kafka_position_serde_roundtrip() {
609        let pos = SourcePosition::Kafka(KafkaPosition {
610            group_id: "my-group".into(),
611            partitions: vec![
612                KafkaPartitionOffset {
613                    topic: "events".into(),
614                    partition: 0,
615                    offset: 1234,
616                },
617                KafkaPartitionOffset {
618                    topic: "events".into(),
619                    partition: 1,
620                    offset: 5678,
621                },
622            ],
623        });
624        let json = serde_json::to_string(&pos).unwrap();
625        let restored: SourcePosition = serde_json::from_str(&json).unwrap();
626        assert_eq!(pos, restored);
627    }
628
629    #[test]
630    fn test_postgres_cdc_position_serde_roundtrip() {
631        let pos = SourcePosition::PostgresCdc(PostgresCdcPosition {
632            confirmed_flush_lsn: "0/1234ABCD".into(),
633            write_lsn: Some("0/1234ABCE".into()),
634            slot_name: "laminar_slot".into(),
635        });
636        let json = serde_json::to_string(&pos).unwrap();
637        let restored: SourcePosition = serde_json::from_str(&json).unwrap();
638        assert_eq!(pos, restored);
639    }
640
641    #[test]
642    fn test_mysql_cdc_position_serde_roundtrip() {
643        let pos = SourcePosition::MysqlCdc(MysqlCdcPosition {
644            gtid_set: Some("3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5".into()),
645            binlog_file: Some("mysql-bin.000003".into()),
646            binlog_position: Some(154),
647        });
648        let json = serde_json::to_string(&pos).unwrap();
649        let restored: SourcePosition = serde_json::from_str(&json).unwrap();
650        assert_eq!(pos, restored);
651    }
652
653    #[test]
654    fn test_file_position_serde_roundtrip() {
655        let pos = SourcePosition::File(FilePosition {
656            path: "/data/events.csv".into(),
657            byte_offset: 4096,
658        });
659        let json = serde_json::to_string(&pos).unwrap();
660        let restored: SourcePosition = serde_json::from_str(&json).unwrap();
661        assert_eq!(pos, restored);
662    }
663
664    #[test]
665    fn test_generic_position_serde_roundtrip() {
666        let pos = SourcePosition::Generic(GenericPosition {
667            connector_type: "custom".into(),
668            offsets: HashMap::from([("cursor".into(), "abc123".into())]),
669        });
670        let json = serde_json::to_string(&pos).unwrap();
671        let restored: SourcePosition = serde_json::from_str(&json).unwrap();
672        assert_eq!(pos, restored);
673    }
674
675    #[test]
676    fn test_kafka_to_connector_checkpoint() {
677        let pos = SourcePosition::Kafka(KafkaPosition {
678            group_id: "my-group".into(),
679            partitions: vec![KafkaPartitionOffset {
680                topic: "events".into(),
681                partition: 0,
682                offset: 1234,
683            }],
684        });
685        let cp = pos.to_connector_checkpoint(5);
686        assert_eq!(cp.epoch, 5);
687        assert_eq!(cp.offsets.get("events-0"), Some(&"1234".to_string()));
688        assert_eq!(
689            cp.metadata.get("connector_type"),
690            Some(&"kafka".to_string())
691        );
692        assert_eq!(cp.metadata.get("group_id"), Some(&"my-group".to_string()));
693    }
694
695    #[test]
696    fn test_postgres_to_from_checkpoint() {
697        let original = SourcePosition::PostgresCdc(PostgresCdcPosition {
698            confirmed_flush_lsn: "0/ABCD".into(),
699            write_lsn: Some("0/ABCE".into()),
700            slot_name: "test_slot".into(),
701        });
702        let cp = original.to_connector_checkpoint(10);
703        let restored = SourcePosition::from_connector_checkpoint(&cp, None).unwrap();
704        assert_eq!(original, restored);
705    }
706
707    #[test]
708    fn test_mysql_to_from_checkpoint() {
709        let original = SourcePosition::MysqlCdc(MysqlCdcPosition {
710            gtid_set: Some("uuid:1-5".into()),
711            binlog_file: Some("mysql-bin.000003".into()),
712            binlog_position: Some(154),
713        });
714        let cp = original.to_connector_checkpoint(3);
715        let restored = SourcePosition::from_connector_checkpoint(&cp, None).unwrap();
716        assert_eq!(original, restored);
717    }
718
719    #[test]
720    fn test_file_to_from_checkpoint() {
721        let original = SourcePosition::File(FilePosition {
722            path: "/data/file.csv".into(),
723            byte_offset: 4096,
724        });
725        let cp = original.to_connector_checkpoint(1);
726        let restored = SourcePosition::from_connector_checkpoint(&cp, None).unwrap();
727        assert_eq!(original, restored);
728    }
729
730    #[test]
731    fn test_from_checkpoint_with_type_hint() {
732        let mut cp = ConnectorCheckpoint::new(1);
733        cp.offsets.insert("events-0".into(), "100".into());
734        // No metadata — use type hint instead
735        let pos = SourcePosition::from_connector_checkpoint(&cp, Some("kafka")).unwrap();
736        match pos {
737            SourcePosition::Kafka(k) => {
738                assert_eq!(k.partitions.len(), 1);
739                assert_eq!(k.partitions[0].offset, 100);
740            }
741            _ => panic!("Expected Kafka position"),
742        }
743    }
744
745    #[test]
746    fn test_from_checkpoint_no_type_returns_none() {
747        let cp = ConnectorCheckpoint::new(1);
748        assert!(SourcePosition::from_connector_checkpoint(&cp, None).is_none());
749    }
750
751    #[test]
752    fn test_recovery_plan_from_manifest() {
753        let mut source_offsets = HashMap::new();
754
755        let mut kafka_cp = ConnectorCheckpoint::new(5);
756        kafka_cp
757            .metadata
758            .insert("connector_type".into(), "kafka".into());
759        kafka_cp.metadata.insert("group_id".into(), "g1".into());
760        kafka_cp.offsets.insert("topic-0".into(), "100".into());
761        source_offsets.insert("kafka-src".into(), kafka_cp);
762
763        // CP with no type info — should generate warning
764        let empty_cp = ConnectorCheckpoint::new(5);
765        source_offsets.insert("unknown-src".into(), empty_cp);
766
767        let plan = RecoveryPlan::from_manifest(&source_offsets, 5);
768        assert_eq!(plan.resume_epoch, 5);
769        assert_eq!(plan.source_positions.len(), 1);
770        assert!(plan.source_positions.contains_key("kafka-src"));
771        assert_eq!(plan.warnings.len(), 1);
772        assert_eq!(plan.warnings[0].source_name, "unknown-src");
773    }
774
775    #[test]
776    fn test_source_id_display() {
777        let id = SourceId::new("kafka-orders");
778        assert_eq!(id.to_string(), "kafka-orders");
779        assert_eq!(id.as_str(), "kafka-orders");
780    }
781
782    #[test]
783    fn test_source_id_from_str() {
784        let id: SourceId = "my-source".into();
785        assert_eq!(id.0, "my-source");
786    }
787
788    #[test]
789    fn test_source_offset_serde_roundtrip() {
790        let offset = SourceOffset {
791            source_id: SourceId::new("kafka-src"),
792            position: SourcePosition::Kafka(KafkaPosition {
793                group_id: "g1".into(),
794                partitions: vec![KafkaPartitionOffset {
795                    topic: "events".into(),
796                    partition: 0,
797                    offset: 100,
798                }],
799            }),
800        };
801        let json = serde_json::to_string(&offset).unwrap();
802        let restored: SourceOffset = serde_json::from_str(&json).unwrap();
803        assert_eq!(offset, restored);
804    }
805
806    #[test]
807    fn test_kafka_to_offset_entry() {
808        let pos = SourcePosition::Kafka(KafkaPosition {
809            group_id: "g1".into(),
810            partitions: vec![KafkaPartitionOffset {
811                topic: "events".into(),
812                partition: 0,
813                offset: 1234,
814            }],
815        });
816        let entry = pos.to_offset_entry(5);
817        assert_eq!(entry.source_type, "kafka");
818        assert_eq!(entry.epoch, 5);
819        assert_eq!(entry.offsets.get("group_id"), Some(&"g1".to_string()));
820        assert_eq!(entry.offsets.get("events-0"), Some(&"1234".to_string()));
821    }
822
823    #[test]
824    fn test_kafka_offset_entry_roundtrip() {
825        let pos = SourcePosition::Kafka(KafkaPosition {
826            group_id: "g1".into(),
827            partitions: vec![KafkaPartitionOffset {
828                topic: "events".into(),
829                partition: 0,
830                offset: 1234,
831            }],
832        });
833        let entry = pos.to_offset_entry(5);
834        let restored = SourcePosition::from_offset_entry(&entry).unwrap();
835        assert_eq!(pos, restored);
836    }
837
838    #[test]
839    fn test_postgres_offset_entry_roundtrip() {
840        let pos = SourcePosition::PostgresCdc(PostgresCdcPosition {
841            confirmed_flush_lsn: "0/ABCD".into(),
842            write_lsn: Some("0/ABCE".into()),
843            slot_name: "test_slot".into(),
844        });
845        let entry = pos.to_offset_entry(10);
846        assert_eq!(entry.source_type, "postgres_cdc");
847        let restored = SourcePosition::from_offset_entry(&entry).unwrap();
848        assert_eq!(pos, restored);
849    }
850
851    #[test]
852    fn test_mysql_offset_entry_roundtrip() {
853        let pos = SourcePosition::MysqlCdc(MysqlCdcPosition {
854            gtid_set: Some("uuid:1-5".into()),
855            binlog_file: Some("mysql-bin.000003".into()),
856            binlog_position: Some(154),
857        });
858        let entry = pos.to_offset_entry(3);
859        assert_eq!(entry.source_type, "mysql_cdc");
860        let restored = SourcePosition::from_offset_entry(&entry).unwrap();
861        assert_eq!(pos, restored);
862    }
863
864    #[test]
865    fn test_file_offset_entry_roundtrip() {
866        let pos = SourcePosition::File(FilePosition {
867            path: "/data/file.csv".into(),
868            byte_offset: 4096,
869        });
870        let entry = pos.to_offset_entry(1);
871        assert_eq!(entry.source_type, "file");
872        let restored = SourcePosition::from_offset_entry(&entry).unwrap();
873        assert_eq!(pos, restored);
874    }
875
876    #[test]
877    fn test_generic_offset_entry_roundtrip() {
878        let pos = SourcePosition::Generic(GenericPosition {
879            connector_type: "custom".into(),
880            offsets: HashMap::from([("cursor".into(), "abc123".into())]),
881        });
882        let entry = pos.to_offset_entry(1);
883        assert_eq!(entry.source_type, "custom");
884        let restored = SourcePosition::from_offset_entry(&entry).unwrap();
885        assert_eq!(pos, restored);
886    }
887
888    #[test]
889    fn test_recovery_plan_from_manifest_v2() {
890        use crate::checkpoint::layout::SourceOffsetEntry;
891
892        let mut source_offsets = HashMap::new();
893        source_offsets.insert(
894            "kafka-src".into(),
895            SourceOffsetEntry {
896                source_type: "kafka".into(),
897                offsets: HashMap::from([
898                    ("group_id".into(), "g1".into()),
899                    ("topic-0".into(), "100".into()),
900                ]),
901                epoch: 5,
902            },
903        );
904        // Entry with empty source_type that won't match any known type
905        // but will still parse as Generic
906        source_offsets.insert(
907            "custom-src".into(),
908            SourceOffsetEntry {
909                source_type: "redis".into(),
910                offsets: HashMap::from([("cursor".into(), "42".into())]),
911                epoch: 5,
912            },
913        );
914
915        let plan = RecoveryPlan::from_manifest_v2(&source_offsets, 5);
916        assert_eq!(plan.resume_epoch, 5);
917        assert_eq!(plan.source_positions.len(), 2);
918        assert!(plan.source_positions.contains_key("kafka-src"));
919        assert!(plan.source_positions.contains_key("custom-src"));
920        assert!(plan.warnings.is_empty());
921    }
922
923    #[test]
924    fn test_warning_severity_serde() {
925        let warning = DeterminismWarning {
926            source_name: "src".into(),
927            message: "test".into(),
928            severity: WarningSeverity::Warning,
929        };
930        let json = serde_json::to_string(&warning).unwrap();
931        let restored: DeterminismWarning = serde_json::from_str(&json).unwrap();
932        assert_eq!(warning, restored);
933    }
934
935    // ---- DeterminismValidator tests ----
936
937    struct TestOperator {
938        id: String,
939        wall_clock: bool,
940        random: bool,
941        side_effects: bool,
942    }
943
944    impl TestOperator {
945        fn deterministic(id: &str) -> Self {
946            Self {
947                id: id.into(),
948                wall_clock: false,
949                random: false,
950                side_effects: false,
951            }
952        }
953    }
954
955    impl OperatorDescriptor for TestOperator {
956        fn id(&self) -> &str {
957            &self.id
958        }
959        fn uses_wall_clock(&self) -> bool {
960            self.wall_clock
961        }
962        fn uses_random(&self) -> bool {
963            self.random
964        }
965        fn has_external_side_effects(&self) -> bool {
966            self.side_effects
967        }
968    }
969
970    #[test]
971    fn test_determinism_validator_clean_operator() {
972        let op = TestOperator::deterministic("agg-1");
973        let warnings = DeterminismValidator::validate(&op);
974        assert!(warnings.is_empty());
975    }
976
977    #[test]
978    fn test_determinism_validator_wall_clock_warning() {
979        let op = TestOperator {
980            wall_clock: true,
981            ..TestOperator::deterministic("timer-op")
982        };
983        let warnings = DeterminismValidator::validate(&op);
984        assert_eq!(warnings.len(), 1);
985        assert!(matches!(
986            &warnings[0],
987            OperatorDeterminismWarning::WallClockUsage { operator_id, .. }
988            if operator_id == "timer-op"
989        ));
990    }
991
992    #[test]
993    fn test_determinism_validator_random_warning() {
994        let op = TestOperator {
995            random: true,
996            ..TestOperator::deterministic("shuffle-op")
997        };
998        let warnings = DeterminismValidator::validate(&op);
999        assert_eq!(warnings.len(), 1);
1000        assert!(matches!(
1001            &warnings[0],
1002            OperatorDeterminismWarning::RandomUsage { operator_id, .. }
1003            if operator_id == "shuffle-op"
1004        ));
1005    }
1006
1007    #[test]
1008    fn test_determinism_validator_side_effect_warning() {
1009        let op = TestOperator {
1010            side_effects: true,
1011            ..TestOperator::deterministic("http-op")
1012        };
1013        let warnings = DeterminismValidator::validate(&op);
1014        assert_eq!(warnings.len(), 1);
1015        assert!(matches!(
1016            &warnings[0],
1017            OperatorDeterminismWarning::ExternalSideEffect { operator_id, .. }
1018            if operator_id == "http-op"
1019        ));
1020    }
1021
1022    #[test]
1023    fn test_determinism_validator_multiple_warnings() {
1024        let op = TestOperator {
1025            id: "bad-op".into(),
1026            wall_clock: true,
1027            random: true,
1028            side_effects: true,
1029        };
1030        let warnings = DeterminismValidator::validate(&op);
1031        assert_eq!(warnings.len(), 3);
1032    }
1033
1034    #[test]
1035    fn test_determinism_validator_validate_all() {
1036        let clean = TestOperator::deterministic("clean");
1037        let bad = TestOperator {
1038            wall_clock: true,
1039            ..TestOperator::deterministic("bad")
1040        };
1041        let warnings = DeterminismValidator::validate_all(&[&clean, &bad]);
1042        assert_eq!(warnings.len(), 1);
1043        assert!(matches!(
1044            &warnings[0],
1045            OperatorDeterminismWarning::WallClockUsage { operator_id, .. }
1046            if operator_id == "bad"
1047        ));
1048    }
1049}