crabka-connect-postgres 0.3.7

Postgres logical-decoding source connector for Crabka Connect
use std::fmt;
use std::str::FromStr;

use crabka_connect::{OffsetMap, OffsetValue, SourceOffset};

use crate::PostgresConnectError;

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct PgLsn(pub u64);

impl PgLsn {
    #[must_use]
    pub fn to_source_offset(self, database: &str, slot: &str) -> SourceOffset {
        let partition = OffsetMap::from([
            (
                "database".to_owned(),
                OffsetValue::String(database.to_owned()),
            ),
            ("slot".to_owned(), OffsetValue::String(slot.to_owned())),
        ]);
        let position = OffsetMap::from([("lsn".to_owned(), OffsetValue::String(self.to_string()))]);

        SourceOffset::new(partition, position)
    }

    pub fn from_source_offset(
        offset: &SourceOffset,
        expected_slot: &str,
    ) -> Result<PgLsn, PostgresConnectError> {
        match offset.partition.get("slot") {
            Some(OffsetValue::String(slot)) if slot == expected_slot => {}
            Some(OffsetValue::String(slot)) => {
                return Err(PostgresConnectError::Offset(format!(
                    "source offset slot {slot:?} does not match expected slot {expected_slot:?}"
                )));
            }
            _ => {
                return Err(PostgresConnectError::Offset(
                    "source offset missing string slot".to_owned(),
                ));
            }
        }

        match offset.position.get("lsn") {
            Some(OffsetValue::String(lsn)) => lsn.parse(),
            _ => Err(PostgresConnectError::Offset(
                "source offset missing string lsn".to_owned(),
            )),
        }
    }
}

impl fmt::Display for PgLsn {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        let hi = self.0 >> 32;
        let lo = self.0 & 0xffff_ffff;

        write!(f, "{hi:X}/{lo:X}")
    }
}

impl FromStr for PgLsn {
    type Err = PostgresConnectError;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        let (hi, lo) = s
            .split_once('/')
            .ok_or_else(|| PostgresConnectError::Offset(format!("invalid Postgres LSN {s:?}")))?;

        let hi = parse_lsn_half(hi, s)?;
        let lo = parse_lsn_half(lo, s)?;

        Ok(PgLsn((hi << 32) | lo))
    }
}

fn parse_lsn_half(half: &str, lsn: &str) -> Result<u64, PostgresConnectError> {
    let value = u64::from_str_radix(half, 16)
        .map_err(|_| PostgresConnectError::Offset(format!("invalid Postgres LSN {lsn:?}")))?;

    if value > 0xffff_ffff {
        return Err(PostgresConnectError::Offset(format!(
            "invalid Postgres LSN {lsn:?}: component exceeds 32 bits"
        )));
    }

    Ok(value)
}

#[cfg(test)]
mod tests {
    use crabka_connect::{OffsetValue, SourceOffset};

    use super::PgLsn;
    use crate::PostgresConnectError;

    #[test]
    fn lsn_round_trips_postgres_text_form() {
        let lsn: PgLsn = "16/B374D848".parse().expect("valid LSN");

        assert_eq!(lsn, PgLsn(0x16_b374_d848));
        assert_eq!(lsn.to_string(), "16/B374D848");
        assert_eq!(
            "FFFFFFFF/FFFFFFFF".parse::<PgLsn>().expect("max LSN"),
            PgLsn(u64::MAX)
        );
    }

    #[test]
    fn source_offset_round_trips_and_checks_slot() {
        let lsn: PgLsn = "0/2A".parse().expect("valid LSN");
        let offset = lsn.to_source_offset("app", "slot_a");

        assert_eq!(
            PgLsn::from_source_offset(&offset, "slot_a").expect("matching slot"),
            lsn
        );
        match PgLsn::from_source_offset(&offset, "slot_b").expect_err("slot mismatch") {
            PostgresConnectError::Offset(message) => {
                assert!(message.contains("does not match expected slot"));
                assert!(message.contains("slot_a"));
                assert!(message.contains("slot_b"));
            }
            error => panic!("expected offset error, got {error:?}"),
        }
    }

    #[test]
    fn source_offset_rejects_missing_or_non_string_slot() {
        let missing = SourceOffset::default();
        let mut non_string = PgLsn(42).to_source_offset("app", "slot_a");
        non_string
            .partition
            .insert("slot".to_owned(), OffsetValue::Long(7));

        for offset in [missing, non_string] {
            match PgLsn::from_source_offset(&offset, "slot_a").expect_err("slot should fail") {
                PostgresConnectError::Offset(message) => {
                    assert_eq!(message, "source offset missing string slot");
                }
                error => panic!("expected offset error, got {error:?}"),
            }
        }
    }

    #[test]
    fn malformed_lsn_is_offset_error() {
        for input in ["not-a-lsn", "1/XYZ", "100000000/0"] {
            assert!(matches!(
                input.parse::<PgLsn>(),
                Err(PostgresConnectError::Offset(_))
            ));
        }
    }
}