#[allow(clippy::disallowed_types)] use std::collections::HashMap;
use std::fmt;
use serde::{Deserialize, Serialize};
use crate::checkpoint::layout::SourceOffsetEntry;
use crate::checkpoint_manifest::ConnectorCheckpoint;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct SourceId(pub String);
impl SourceId {
#[must_use]
pub fn new(id: impl Into<String>) -> Self {
Self(id.into())
}
#[must_use]
pub fn as_str(&self) -> &str {
&self.0
}
}
impl fmt::Display for SourceId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<&str> for SourceId {
fn from(s: &str) -> Self {
Self(s.to_string())
}
}
impl From<String> for SourceId {
fn from(s: String) -> Self {
Self(s)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SourceOffset {
pub source_id: SourceId,
pub position: SourcePosition,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct KafkaPartitionOffset {
pub topic: String,
pub partition: i32,
pub offset: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct KafkaPosition {
pub group_id: String,
pub partitions: Vec<KafkaPartitionOffset>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PostgresCdcPosition {
pub confirmed_flush_lsn: String,
pub write_lsn: Option<String>,
pub slot_name: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct MysqlCdcPosition {
pub gtid_set: Option<String>,
pub binlog_file: Option<String>,
pub binlog_position: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct FilePosition {
pub path: String,
pub byte_offset: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct GenericPosition {
pub connector_type: String,
pub offsets: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "type")]
pub enum SourcePosition {
Kafka(KafkaPosition),
PostgresCdc(PostgresCdcPosition),
MysqlCdc(MysqlCdcPosition),
File(FilePosition),
Generic(GenericPosition),
}
impl SourcePosition {
#[must_use]
pub fn to_connector_checkpoint(&self, epoch: u64) -> ConnectorCheckpoint {
let mut cp = ConnectorCheckpoint::new(epoch);
match self {
Self::Kafka(pos) => {
cp.metadata.insert("connector_type".into(), "kafka".into());
cp.metadata.insert("group_id".into(), pos.group_id.clone());
for p in &pos.partitions {
cp.offsets
.insert(format!("{}-{}", p.topic, p.partition), p.offset.to_string());
}
}
Self::PostgresCdc(pos) => {
cp.metadata
.insert("connector_type".into(), "postgres_cdc".into());
cp.offsets.insert(
"confirmed_flush_lsn".into(),
pos.confirmed_flush_lsn.clone(),
);
if let Some(ref lsn) = pos.write_lsn {
cp.offsets.insert("write_lsn".into(), lsn.clone());
}
cp.metadata
.insert("slot_name".into(), pos.slot_name.clone());
}
Self::MysqlCdc(pos) => {
cp.metadata
.insert("connector_type".into(), "mysql_cdc".into());
if let Some(ref gtid) = pos.gtid_set {
cp.offsets.insert("gtid_set".into(), gtid.clone());
}
if let Some(ref file) = pos.binlog_file {
cp.offsets.insert("binlog_file".into(), file.clone());
}
if let Some(binlog_pos) = pos.binlog_position {
cp.offsets
.insert("binlog_position".into(), binlog_pos.to_string());
}
}
Self::File(pos) => {
cp.metadata.insert("connector_type".into(), "file".into());
cp.offsets.insert("path".into(), pos.path.clone());
cp.offsets
.insert("byte_offset".into(), pos.byte_offset.to_string());
}
Self::Generic(pos) => {
cp.metadata
.insert("connector_type".into(), pos.connector_type.clone());
cp.offsets.clone_from(&pos.offsets);
}
}
cp
}
#[must_use]
pub fn from_connector_checkpoint(
cp: &ConnectorCheckpoint,
type_hint: Option<&str>,
) -> Option<Self> {
let connector_type =
type_hint.or_else(|| cp.metadata.get("connector_type").map(String::as_str))?;
match connector_type {
"kafka" => {
let group_id = cp.metadata.get("group_id").cloned().unwrap_or_else(|| {
tracing::warn!(
"[LDB-6011] Kafka source checkpoint missing 'group_id' metadata; \
defaulting to empty — offset recovery may use wrong consumer group"
);
String::new()
});
let mut partitions = Vec::new();
for (key, value) in &cp.offsets {
if let Some(dash_pos) = key.rfind('-') {
let topic = key[..dash_pos].to_string();
if let Ok(partition) = key[dash_pos + 1..].parse::<i32>() {
if let Ok(offset) = value.parse::<i64>() {
partitions.push(KafkaPartitionOffset {
topic,
partition,
offset,
});
}
}
}
}
Some(Self::Kafka(KafkaPosition {
group_id,
partitions,
}))
}
"postgres_cdc" => {
let confirmed_flush_lsn = cp.offsets.get("confirmed_flush_lsn")?.clone();
let write_lsn = cp.offsets.get("write_lsn").cloned();
let slot_name = cp.metadata.get("slot_name").cloned().unwrap_or_else(|| {
tracing::warn!(
"[LDB-6011] Postgres CDC checkpoint missing 'slot_name' metadata; \
defaulting to empty — replication slot recovery may fail"
);
String::new()
});
Some(Self::PostgresCdc(PostgresCdcPosition {
confirmed_flush_lsn,
write_lsn,
slot_name,
}))
}
"mysql_cdc" => Some(Self::MysqlCdc(MysqlCdcPosition {
gtid_set: cp.offsets.get("gtid_set").cloned(),
binlog_file: cp.offsets.get("binlog_file").cloned(),
binlog_position: cp
.offsets
.get("binlog_position")
.and_then(|s| s.parse().ok()),
})),
"file" => {
let path = cp.offsets.get("path")?.clone();
let byte_offset = cp
.offsets
.get("byte_offset")
.and_then(|s| s.parse().ok())
.unwrap_or(0);
Some(Self::File(FilePosition { path, byte_offset }))
}
_ => Some(Self::Generic(GenericPosition {
connector_type: connector_type.to_string(),
offsets: cp.offsets.clone(),
})),
}
}
#[must_use]
pub fn to_offset_entry(&self, epoch: u64) -> SourceOffsetEntry {
let (source_type, offsets) = match self {
Self::Kafka(pos) => {
let mut offsets = HashMap::new();
offsets.insert("group_id".into(), pos.group_id.clone());
for p in &pos.partitions {
offsets.insert(format!("{}-{}", p.topic, p.partition), p.offset.to_string());
}
("kafka".to_string(), offsets)
}
Self::PostgresCdc(pos) => {
let mut offsets = HashMap::new();
offsets.insert(
"confirmed_flush_lsn".into(),
pos.confirmed_flush_lsn.clone(),
);
if let Some(ref lsn) = pos.write_lsn {
offsets.insert("write_lsn".into(), lsn.clone());
}
offsets.insert("slot_name".into(), pos.slot_name.clone());
("postgres_cdc".to_string(), offsets)
}
Self::MysqlCdc(pos) => {
let mut offsets = HashMap::new();
if let Some(ref gtid) = pos.gtid_set {
offsets.insert("gtid_set".into(), gtid.clone());
}
if let Some(ref file) = pos.binlog_file {
offsets.insert("binlog_file".into(), file.clone());
}
if let Some(binlog_pos) = pos.binlog_position {
offsets.insert("binlog_position".into(), binlog_pos.to_string());
}
("mysql_cdc".to_string(), offsets)
}
Self::File(pos) => {
let mut offsets = HashMap::new();
offsets.insert("path".into(), pos.path.clone());
offsets.insert("byte_offset".into(), pos.byte_offset.to_string());
("file".to_string(), offsets)
}
Self::Generic(pos) => (pos.connector_type.clone(), pos.offsets.clone()),
};
SourceOffsetEntry {
source_type,
offsets,
epoch,
}
}
#[must_use]
pub fn from_offset_entry(entry: &SourceOffsetEntry) -> Option<Self> {
match entry.source_type.as_str() {
"kafka" => {
let group_id = entry.offsets.get("group_id").cloned().unwrap_or_default();
let mut partitions = Vec::new();
for (key, value) in &entry.offsets {
if key == "group_id" {
continue;
}
if let Some(dash_pos) = key.rfind('-') {
let topic = key[..dash_pos].to_string();
if let Ok(partition) = key[dash_pos + 1..].parse::<i32>() {
if let Ok(offset) = value.parse::<i64>() {
partitions.push(KafkaPartitionOffset {
topic,
partition,
offset,
});
}
}
}
}
Some(Self::Kafka(KafkaPosition {
group_id,
partitions,
}))
}
"postgres_cdc" => {
let confirmed_flush_lsn = entry.offsets.get("confirmed_flush_lsn")?.clone();
let write_lsn = entry.offsets.get("write_lsn").cloned();
let slot_name = entry.offsets.get("slot_name").cloned().unwrap_or_default();
Some(Self::PostgresCdc(PostgresCdcPosition {
confirmed_flush_lsn,
write_lsn,
slot_name,
}))
}
"mysql_cdc" => Some(Self::MysqlCdc(MysqlCdcPosition {
gtid_set: entry.offsets.get("gtid_set").cloned(),
binlog_file: entry.offsets.get("binlog_file").cloned(),
binlog_position: entry
.offsets
.get("binlog_position")
.and_then(|s| s.parse().ok()),
})),
"file" => {
let path = entry.offsets.get("path")?.clone();
let byte_offset = entry
.offsets
.get("byte_offset")
.and_then(|s| s.parse().ok())
.unwrap_or(0);
Some(Self::File(FilePosition { path, byte_offset }))
}
_ => Some(Self::Generic(GenericPosition {
connector_type: entry.source_type.clone(),
offsets: entry.offsets.clone(),
})),
}
}
}
pub use laminar_core::error_codes::WarningSeverity;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DeterminismWarning {
pub source_name: String,
pub message: String,
pub severity: WarningSeverity,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RecoveryPlan {
pub source_positions: HashMap<String, SourcePosition>,
pub resume_epoch: u64,
pub warnings: Vec<DeterminismWarning>,
}
impl RecoveryPlan {
#[must_use]
pub fn from_manifest(
source_offsets: &HashMap<String, ConnectorCheckpoint>,
epoch: u64,
) -> Self {
let mut source_positions = HashMap::new();
let mut warnings = Vec::new();
for (name, cp) in source_offsets {
if let Some(pos) = SourcePosition::from_connector_checkpoint(cp, None) {
source_positions.insert(name.clone(), pos);
} else {
warnings.push(DeterminismWarning {
source_name: name.clone(),
message: "Could not reconstruct typed position; \
will use raw offsets for recovery"
.into(),
severity: WarningSeverity::Warning,
});
}
}
Self {
source_positions,
resume_epoch: epoch,
warnings,
}
}
#[must_use]
pub fn from_manifest_v2(
source_offsets: &HashMap<String, SourceOffsetEntry>,
epoch: u64,
) -> Self {
let mut source_positions = HashMap::new();
let mut warnings = Vec::new();
for (name, entry) in source_offsets {
if let Some(pos) = SourcePosition::from_offset_entry(entry) {
source_positions.insert(name.clone(), pos);
} else {
warnings.push(DeterminismWarning {
source_name: name.clone(),
message: "Could not reconstruct typed position from V2 \
offset entry; will use raw offsets for recovery"
.into(),
severity: WarningSeverity::Warning,
});
}
}
Self {
source_positions,
resume_epoch: epoch,
warnings,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_kafka_position_serde_roundtrip() {
let pos = SourcePosition::Kafka(KafkaPosition {
group_id: "my-group".into(),
partitions: vec![
KafkaPartitionOffset {
topic: "events".into(),
partition: 0,
offset: 1234,
},
KafkaPartitionOffset {
topic: "events".into(),
partition: 1,
offset: 5678,
},
],
});
let json = serde_json::to_string(&pos).unwrap();
let restored: SourcePosition = serde_json::from_str(&json).unwrap();
assert_eq!(pos, restored);
}
#[test]
fn test_postgres_cdc_position_serde_roundtrip() {
let pos = SourcePosition::PostgresCdc(PostgresCdcPosition {
confirmed_flush_lsn: "0/1234ABCD".into(),
write_lsn: Some("0/1234ABCE".into()),
slot_name: "laminar_slot".into(),
});
let json = serde_json::to_string(&pos).unwrap();
let restored: SourcePosition = serde_json::from_str(&json).unwrap();
assert_eq!(pos, restored);
}
#[test]
fn test_mysql_cdc_position_serde_roundtrip() {
let pos = SourcePosition::MysqlCdc(MysqlCdcPosition {
gtid_set: Some("3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5".into()),
binlog_file: Some("mysql-bin.000003".into()),
binlog_position: Some(154),
});
let json = serde_json::to_string(&pos).unwrap();
let restored: SourcePosition = serde_json::from_str(&json).unwrap();
assert_eq!(pos, restored);
}
#[test]
fn test_file_position_serde_roundtrip() {
let pos = SourcePosition::File(FilePosition {
path: "/data/events.csv".into(),
byte_offset: 4096,
});
let json = serde_json::to_string(&pos).unwrap();
let restored: SourcePosition = serde_json::from_str(&json).unwrap();
assert_eq!(pos, restored);
}
#[test]
fn test_generic_position_serde_roundtrip() {
let pos = SourcePosition::Generic(GenericPosition {
connector_type: "custom".into(),
offsets: HashMap::from([("cursor".into(), "abc123".into())]),
});
let json = serde_json::to_string(&pos).unwrap();
let restored: SourcePosition = serde_json::from_str(&json).unwrap();
assert_eq!(pos, restored);
}
#[test]
fn test_kafka_to_connector_checkpoint() {
let pos = SourcePosition::Kafka(KafkaPosition {
group_id: "my-group".into(),
partitions: vec![KafkaPartitionOffset {
topic: "events".into(),
partition: 0,
offset: 1234,
}],
});
let cp = pos.to_connector_checkpoint(5);
assert_eq!(cp.epoch, 5);
assert_eq!(cp.offsets.get("events-0"), Some(&"1234".to_string()));
assert_eq!(
cp.metadata.get("connector_type"),
Some(&"kafka".to_string())
);
assert_eq!(cp.metadata.get("group_id"), Some(&"my-group".to_string()));
}
#[test]
fn test_postgres_to_from_checkpoint() {
let original = SourcePosition::PostgresCdc(PostgresCdcPosition {
confirmed_flush_lsn: "0/ABCD".into(),
write_lsn: Some("0/ABCE".into()),
slot_name: "test_slot".into(),
});
let cp = original.to_connector_checkpoint(10);
let restored = SourcePosition::from_connector_checkpoint(&cp, None).unwrap();
assert_eq!(original, restored);
}
#[test]
fn test_mysql_to_from_checkpoint() {
let original = SourcePosition::MysqlCdc(MysqlCdcPosition {
gtid_set: Some("uuid:1-5".into()),
binlog_file: Some("mysql-bin.000003".into()),
binlog_position: Some(154),
});
let cp = original.to_connector_checkpoint(3);
let restored = SourcePosition::from_connector_checkpoint(&cp, None).unwrap();
assert_eq!(original, restored);
}
#[test]
fn test_file_to_from_checkpoint() {
let original = SourcePosition::File(FilePosition {
path: "/data/file.csv".into(),
byte_offset: 4096,
});
let cp = original.to_connector_checkpoint(1);
let restored = SourcePosition::from_connector_checkpoint(&cp, None).unwrap();
assert_eq!(original, restored);
}
#[test]
fn test_from_checkpoint_with_type_hint() {
let mut cp = ConnectorCheckpoint::new(1);
cp.offsets.insert("events-0".into(), "100".into());
let pos = SourcePosition::from_connector_checkpoint(&cp, Some("kafka")).unwrap();
match pos {
SourcePosition::Kafka(k) => {
assert_eq!(k.partitions.len(), 1);
assert_eq!(k.partitions[0].offset, 100);
}
_ => panic!("Expected Kafka position"),
}
}
#[test]
fn test_from_checkpoint_no_type_returns_none() {
let cp = ConnectorCheckpoint::new(1);
assert!(SourcePosition::from_connector_checkpoint(&cp, None).is_none());
}
#[test]
fn test_recovery_plan_from_manifest() {
let mut source_offsets = HashMap::new();
let mut kafka_cp = ConnectorCheckpoint::new(5);
kafka_cp
.metadata
.insert("connector_type".into(), "kafka".into());
kafka_cp.metadata.insert("group_id".into(), "g1".into());
kafka_cp.offsets.insert("topic-0".into(), "100".into());
source_offsets.insert("kafka-src".into(), kafka_cp);
let empty_cp = ConnectorCheckpoint::new(5);
source_offsets.insert("unknown-src".into(), empty_cp);
let plan = RecoveryPlan::from_manifest(&source_offsets, 5);
assert_eq!(plan.resume_epoch, 5);
assert_eq!(plan.source_positions.len(), 1);
assert!(plan.source_positions.contains_key("kafka-src"));
assert_eq!(plan.warnings.len(), 1);
assert_eq!(plan.warnings[0].source_name, "unknown-src");
}
#[test]
fn test_source_id_display() {
let id = SourceId::new("kafka-orders");
assert_eq!(id.to_string(), "kafka-orders");
assert_eq!(id.as_str(), "kafka-orders");
}
#[test]
fn test_source_id_from_str() {
let id: SourceId = "my-source".into();
assert_eq!(id.0, "my-source");
}
#[test]
fn test_source_offset_serde_roundtrip() {
let offset = SourceOffset {
source_id: SourceId::new("kafka-src"),
position: SourcePosition::Kafka(KafkaPosition {
group_id: "g1".into(),
partitions: vec![KafkaPartitionOffset {
topic: "events".into(),
partition: 0,
offset: 100,
}],
}),
};
let json = serde_json::to_string(&offset).unwrap();
let restored: SourceOffset = serde_json::from_str(&json).unwrap();
assert_eq!(offset, restored);
}
#[test]
fn test_kafka_to_offset_entry() {
let pos = SourcePosition::Kafka(KafkaPosition {
group_id: "g1".into(),
partitions: vec![KafkaPartitionOffset {
topic: "events".into(),
partition: 0,
offset: 1234,
}],
});
let entry = pos.to_offset_entry(5);
assert_eq!(entry.source_type, "kafka");
assert_eq!(entry.epoch, 5);
assert_eq!(entry.offsets.get("group_id"), Some(&"g1".to_string()));
assert_eq!(entry.offsets.get("events-0"), Some(&"1234".to_string()));
}
#[test]
fn test_kafka_offset_entry_roundtrip() {
let pos = SourcePosition::Kafka(KafkaPosition {
group_id: "g1".into(),
partitions: vec![KafkaPartitionOffset {
topic: "events".into(),
partition: 0,
offset: 1234,
}],
});
let entry = pos.to_offset_entry(5);
let restored = SourcePosition::from_offset_entry(&entry).unwrap();
assert_eq!(pos, restored);
}
#[test]
fn test_postgres_offset_entry_roundtrip() {
let pos = SourcePosition::PostgresCdc(PostgresCdcPosition {
confirmed_flush_lsn: "0/ABCD".into(),
write_lsn: Some("0/ABCE".into()),
slot_name: "test_slot".into(),
});
let entry = pos.to_offset_entry(10);
assert_eq!(entry.source_type, "postgres_cdc");
let restored = SourcePosition::from_offset_entry(&entry).unwrap();
assert_eq!(pos, restored);
}
#[test]
fn test_mysql_offset_entry_roundtrip() {
let pos = SourcePosition::MysqlCdc(MysqlCdcPosition {
gtid_set: Some("uuid:1-5".into()),
binlog_file: Some("mysql-bin.000003".into()),
binlog_position: Some(154),
});
let entry = pos.to_offset_entry(3);
assert_eq!(entry.source_type, "mysql_cdc");
let restored = SourcePosition::from_offset_entry(&entry).unwrap();
assert_eq!(pos, restored);
}
#[test]
fn test_file_offset_entry_roundtrip() {
let pos = SourcePosition::File(FilePosition {
path: "/data/file.csv".into(),
byte_offset: 4096,
});
let entry = pos.to_offset_entry(1);
assert_eq!(entry.source_type, "file");
let restored = SourcePosition::from_offset_entry(&entry).unwrap();
assert_eq!(pos, restored);
}
#[test]
fn test_generic_offset_entry_roundtrip() {
let pos = SourcePosition::Generic(GenericPosition {
connector_type: "custom".into(),
offsets: HashMap::from([("cursor".into(), "abc123".into())]),
});
let entry = pos.to_offset_entry(1);
assert_eq!(entry.source_type, "custom");
let restored = SourcePosition::from_offset_entry(&entry).unwrap();
assert_eq!(pos, restored);
}
#[test]
fn test_recovery_plan_from_manifest_v2() {
use crate::checkpoint::layout::SourceOffsetEntry;
let mut source_offsets = HashMap::new();
source_offsets.insert(
"kafka-src".into(),
SourceOffsetEntry {
source_type: "kafka".into(),
offsets: HashMap::from([
("group_id".into(), "g1".into()),
("topic-0".into(), "100".into()),
]),
epoch: 5,
},
);
source_offsets.insert(
"custom-src".into(),
SourceOffsetEntry {
source_type: "redis".into(),
offsets: HashMap::from([("cursor".into(), "42".into())]),
epoch: 5,
},
);
let plan = RecoveryPlan::from_manifest_v2(&source_offsets, 5);
assert_eq!(plan.resume_epoch, 5);
assert_eq!(plan.source_positions.len(), 2);
assert!(plan.source_positions.contains_key("kafka-src"));
assert!(plan.source_positions.contains_key("custom-src"));
assert!(plan.warnings.is_empty());
}
#[test]
fn test_warning_severity_serde() {
let warning = DeterminismWarning {
source_name: "src".into(),
message: "test".into(),
severity: WarningSeverity::Warning,
};
let json = serde_json::to_string(&warning).unwrap();
let restored: DeterminismWarning = serde_json::from_str(&json).unwrap();
assert_eq!(warning, restored);
}
}