Skip to main content

faucet_source_postgres_cdc/
state.rs

1//! Bookmark <-> JSON serialization for replication slot LSN progress.
2//!
3//! Bookmark shape (round-trips through `serde_json::Value`):
4//!
5//! ```json
6//! { "last_lsn": "0/16A4F88" }
7//! ```
8//!
9//! `last_lsn` is the **`end_lsn`** of the last committed transaction whose
10//! change records have been written to the sink — the WAL position immediately
11//! *after* that commit record, i.e. exactly where the next `START_REPLICATION`
12//! resumes. (It is the commit's `end_lsn`, not its `commit_lsn`; see the
13//! persist site in `stream.rs`.)
14
15use faucet_core::FaucetError;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18
19/// Durable bookmark for a `PostgresCdcSource`.
20#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
21pub struct Bookmark {
22    /// LSN in Postgres' canonical hex form: `XXXXXXXX/XXXXXXXX`.
23    pub last_lsn: String,
24}
25
26impl Bookmark {
27    /// Parse a `Value` previously emitted by `to_value`.
28    pub fn from_value(v: Value) -> Result<Self, FaucetError> {
29        let b: Self = serde_json::from_value(v)
30            .map_err(|e| FaucetError::State(format!("postgres-cdc bookmark parse: {e}")))?;
31        parse_lsn(&b.last_lsn)?;
32        Ok(b)
33    }
34
35    /// Serialize for the state store.
36    pub fn to_value(&self) -> Result<Value, FaucetError> {
37        serde_json::to_value(self)
38            .map_err(|e| FaucetError::State(format!("postgres-cdc bookmark serialize: {e}")))
39    }
40
41    /// Build a Bookmark from a raw 64-bit LSN value.
42    pub fn from_u64(lsn: u64) -> Self {
43        Self {
44            last_lsn: format_lsn(lsn),
45        }
46    }
47
48    /// Parse `last_lsn` into a `u64` for wire-protocol use.
49    pub fn as_u64(&self) -> Result<u64, FaucetError> {
50        parse_lsn(&self.last_lsn)
51    }
52}
53
54/// Format a `u64` LSN into Postgres' `XXXXXXXX/XXXXXXXX` text form, dropping
55/// leading zeros from each half (matches `pg_lsn::out`).
56pub fn format_lsn(lsn: u64) -> String {
57    let hi = (lsn >> 32) as u32;
58    let lo = lsn as u32;
59    format!("{hi:X}/{lo:X}")
60}
61
62/// Parse `XXXX/XXXX` (case-insensitive hex, no leading zeros required) into a
63/// `u64`. Returns `FaucetError::State` on any malformation.
64pub fn parse_lsn(s: &str) -> Result<u64, FaucetError> {
65    let (hi, lo) = s.split_once('/').ok_or_else(|| {
66        FaucetError::State(format!("postgres-cdc invalid LSN '{s}': missing '/'"))
67    })?;
68    if hi.is_empty() || lo.is_empty() || lo.contains('/') {
69        return Err(FaucetError::State(format!(
70            "postgres-cdc invalid LSN '{s}'"
71        )));
72    }
73    let hi = u32::from_str_radix(hi, 16)
74        .map_err(|e| FaucetError::State(format!("postgres-cdc LSN high half '{hi}': {e}")))?;
75    let lo = u32::from_str_radix(lo, 16)
76        .map_err(|e| FaucetError::State(format!("postgres-cdc LSN low half '{lo}': {e}")))?;
77    Ok((u64::from(hi) << 32) | u64::from(lo))
78}
79
80/// Generate the `state_key` for a given replication slot.
81///
82/// Allowed characters per `faucet_core::state::validate_state_key` are
83/// `[A-Za-z0-9_:.-]`. Postgres slot names are constrained to `[a-z0-9_]`, so
84/// every key is valid by construction.
85pub fn state_key(slot_name: &str) -> String {
86    format!("postgres-cdc:{slot_name}")
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92    use serde_json::json;
93
94    #[test]
95    fn round_trip_via_value() {
96        let b = Bookmark {
97            last_lsn: "0/16A4F88".into(),
98        };
99        let v = b.to_value().unwrap();
100        let parsed = Bookmark::from_value(v).unwrap();
101        assert_eq!(parsed.last_lsn, "0/16A4F88");
102    }
103
104    #[test]
105    fn from_value_rejects_garbage() {
106        assert!(Bookmark::from_value(json!({"last_lsn": 42})).is_err());
107        assert!(Bookmark::from_value(json!({})).is_err());
108        assert!(Bookmark::from_value(json!("bare string")).is_err());
109    }
110
111    #[test]
112    fn from_value_rejects_malformed_lsn() {
113        assert!(Bookmark::from_value(json!({"last_lsn": ""})).is_err());
114        assert!(Bookmark::from_value(json!({"last_lsn": "not-an-lsn"})).is_err());
115        assert!(Bookmark::from_value(json!({"last_lsn": "0/"})).is_err());
116        assert!(Bookmark::from_value(json!({"last_lsn": "/0"})).is_err());
117        assert!(Bookmark::from_value(json!({"last_lsn": "0/16A4F88/extra"})).is_err());
118    }
119
120    #[test]
121    fn from_value_accepts_canonical_lsn() {
122        let ok = Bookmark::from_value(json!({"last_lsn": "1A/BEEFCAFE"})).unwrap();
123        assert_eq!(ok.last_lsn, "1A/BEEFCAFE");
124    }
125
126    #[test]
127    fn lsn_u64_round_trip() {
128        let b = Bookmark::from_u64(0x1A_BEEF_CAFE);
129        // 0x1A_BEEF_CAFE = 0x1A_0000_0000 | 0xBEEF_CAFE → "1A/BEEFCAFE"
130        // (the high 32 bits are the WAL segment, low 32 are the offset)
131        assert_eq!(b.last_lsn, "1A/BEEFCAFE");
132        assert_eq!(b.as_u64().unwrap(), 0x1A_BEEF_CAFE);
133    }
134
135    #[test]
136    fn state_key_for_slot() {
137        assert_eq!(state_key("faucet_slot"), "postgres-cdc:faucet_slot");
138    }
139
140    #[test]
141    fn parse_lsn_is_case_insensitive() {
142        assert_eq!(
143            parse_lsn("0/16a4f88").unwrap(),
144            parse_lsn("0/16A4F88").unwrap()
145        );
146        assert_eq!(
147            parse_lsn("1a/beefcafe").unwrap(),
148            parse_lsn("1A/BEEFCAFE").unwrap()
149        );
150    }
151
152    #[test]
153    fn format_parse_lsn_boundaries() {
154        assert_eq!(parse_lsn(&format_lsn(0)).unwrap(), 0);
155        assert_eq!(parse_lsn(&format_lsn(u64::MAX)).unwrap(), u64::MAX);
156        // also verify a value with non-zero high+low so the `(hi << 32) | lo` join is exercised
157        let v: u64 = 0x1234_5678_9ABC_DEF0;
158        assert_eq!(parse_lsn(&format_lsn(v)).unwrap(), v);
159    }
160}