faucet_source_postgres_cdc/
state.rs1use faucet_core::FaucetError;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18
19#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
21pub struct Bookmark {
22 pub last_lsn: String,
24}
25
26impl Bookmark {
27 pub fn from_value(v: Value) -> Result<Self, FaucetError> {
29 let b: Self = serde_json::from_value(v)
30 .map_err(|e| FaucetError::State(format!("postgres-cdc bookmark parse: {e}")))?;
31 parse_lsn(&b.last_lsn)?;
32 Ok(b)
33 }
34
35 pub fn to_value(&self) -> Result<Value, FaucetError> {
37 serde_json::to_value(self)
38 .map_err(|e| FaucetError::State(format!("postgres-cdc bookmark serialize: {e}")))
39 }
40
41 pub fn from_u64(lsn: u64) -> Self {
43 Self {
44 last_lsn: format_lsn(lsn),
45 }
46 }
47
48 pub fn as_u64(&self) -> Result<u64, FaucetError> {
50 parse_lsn(&self.last_lsn)
51 }
52}
53
54pub fn format_lsn(lsn: u64) -> String {
57 let hi = (lsn >> 32) as u32;
58 let lo = lsn as u32;
59 format!("{hi:X}/{lo:X}")
60}
61
62pub fn parse_lsn(s: &str) -> Result<u64, FaucetError> {
65 let (hi, lo) = s.split_once('/').ok_or_else(|| {
66 FaucetError::State(format!("postgres-cdc invalid LSN '{s}': missing '/'"))
67 })?;
68 if hi.is_empty() || lo.is_empty() || lo.contains('/') {
69 return Err(FaucetError::State(format!(
70 "postgres-cdc invalid LSN '{s}'"
71 )));
72 }
73 let hi = u32::from_str_radix(hi, 16)
74 .map_err(|e| FaucetError::State(format!("postgres-cdc LSN high half '{hi}': {e}")))?;
75 let lo = u32::from_str_radix(lo, 16)
76 .map_err(|e| FaucetError::State(format!("postgres-cdc LSN low half '{lo}': {e}")))?;
77 Ok((u64::from(hi) << 32) | u64::from(lo))
78}
79
80pub fn state_key(slot_name: &str) -> String {
86 format!("postgres-cdc:{slot_name}")
87}
88
89#[cfg(test)]
90mod tests {
91 use super::*;
92 use serde_json::json;
93
94 #[test]
95 fn round_trip_via_value() {
96 let b = Bookmark {
97 last_lsn: "0/16A4F88".into(),
98 };
99 let v = b.to_value().unwrap();
100 let parsed = Bookmark::from_value(v).unwrap();
101 assert_eq!(parsed.last_lsn, "0/16A4F88");
102 }
103
104 #[test]
105 fn from_value_rejects_garbage() {
106 assert!(Bookmark::from_value(json!({"last_lsn": 42})).is_err());
107 assert!(Bookmark::from_value(json!({})).is_err());
108 assert!(Bookmark::from_value(json!("bare string")).is_err());
109 }
110
111 #[test]
112 fn from_value_rejects_malformed_lsn() {
113 assert!(Bookmark::from_value(json!({"last_lsn": ""})).is_err());
114 assert!(Bookmark::from_value(json!({"last_lsn": "not-an-lsn"})).is_err());
115 assert!(Bookmark::from_value(json!({"last_lsn": "0/"})).is_err());
116 assert!(Bookmark::from_value(json!({"last_lsn": "/0"})).is_err());
117 assert!(Bookmark::from_value(json!({"last_lsn": "0/16A4F88/extra"})).is_err());
118 }
119
120 #[test]
121 fn from_value_accepts_canonical_lsn() {
122 let ok = Bookmark::from_value(json!({"last_lsn": "1A/BEEFCAFE"})).unwrap();
123 assert_eq!(ok.last_lsn, "1A/BEEFCAFE");
124 }
125
126 #[test]
127 fn lsn_u64_round_trip() {
128 let b = Bookmark::from_u64(0x1A_BEEF_CAFE);
129 assert_eq!(b.last_lsn, "1A/BEEFCAFE");
132 assert_eq!(b.as_u64().unwrap(), 0x1A_BEEF_CAFE);
133 }
134
135 #[test]
136 fn state_key_for_slot() {
137 assert_eq!(state_key("faucet_slot"), "postgres-cdc:faucet_slot");
138 }
139
140 #[test]
141 fn parse_lsn_is_case_insensitive() {
142 assert_eq!(
143 parse_lsn("0/16a4f88").unwrap(),
144 parse_lsn("0/16A4F88").unwrap()
145 );
146 assert_eq!(
147 parse_lsn("1a/beefcafe").unwrap(),
148 parse_lsn("1A/BEEFCAFE").unwrap()
149 );
150 }
151
152 #[test]
153 fn format_parse_lsn_boundaries() {
154 assert_eq!(parse_lsn(&format_lsn(0)).unwrap(), 0);
155 assert_eq!(parse_lsn(&format_lsn(u64::MAX)).unwrap(), u64::MAX);
156 let v: u64 = 0x1234_5678_9ABC_DEF0;
158 assert_eq!(parse_lsn(&format_lsn(v)).unwrap(), v);
159 }
160}