faucet_source_postgres_cdc/pgoutput/
values.rs1use base64::Engine;
8use faucet_core::FaucetError;
9use serde_json::Value;
10
11const OID_BOOL: u32 = 16;
13const OID_BYTEA: u32 = 17;
14const OID_INT2: u32 = 21;
15const OID_INT4: u32 = 23;
16const OID_INT8: u32 = 20;
17const OID_FLOAT4: u32 = 700;
18const OID_FLOAT8: u32 = 701;
19const OID_NUMERIC: u32 = 1700;
20const OID_JSON: u32 = 114;
21const OID_JSONB: u32 = 3802;
22fn array_element_oid(array_oid: u32) -> Option<u32> {
33 Some(match array_oid {
34 1000 => OID_BOOL, 1001 => OID_BYTEA, 1005 => OID_INT2, 1007 => OID_INT4, 1016 => OID_INT8, 1021 => OID_FLOAT4, 1022 => OID_FLOAT8, 1231 => OID_NUMERIC, 199 => OID_JSON, 3807 => OID_JSONB, 1009 => 25, 1015 => 1043, 1014 => 1042, 2951 => 2950, 1115 => 1114, 1185 => 1184, 1182 => 1082, 1183 => 1083, _ => return None,
53 })
54}
55
56pub fn text_to_json(type_oid: u32, text: &str) -> Result<Value, FaucetError> {
64 Ok(match type_oid {
65 OID_BOOL => match text {
66 "t" => Value::Bool(true),
67 "f" => Value::Bool(false),
68 other => {
69 return Err(FaucetError::Source(format!(
70 "pgoutput: bool column has non-t/f text {other:?}"
71 )));
72 }
73 },
74 OID_INT2 | OID_INT4 | OID_INT8 => {
75 let n: i64 = text.parse().map_err(|e| {
76 FaucetError::Source(format!(
77 "pgoutput: int (oid={type_oid}) parse {text:?}: {e}"
78 ))
79 })?;
80 Value::from(n)
81 }
82 OID_FLOAT4 | OID_FLOAT8 => match text {
83 "NaN" => Value::String("NaN".into()),
87 "Infinity" => Value::String("Infinity".into()),
88 "-Infinity" => Value::String("-Infinity".into()),
89 other => {
90 let n: f64 = other.parse().map_err(|e| {
91 FaucetError::Source(format!("pgoutput: float parse {text:?}: {e}"))
92 })?;
93 serde_json::Number::from_f64(n)
94 .map(Value::Number)
95 .unwrap_or(Value::Null)
96 }
97 },
98 OID_NUMERIC => Value::String(text.into()),
99 OID_BYTEA => {
100 let stripped = text.strip_prefix("\\x").ok_or_else(|| {
101 FaucetError::Source(format!(
102 "pgoutput: bytea text {text:?} missing '\\x' prefix"
103 ))
104 })?;
105 let bytes = hex_decode(stripped)?;
106 Value::String(base64::engine::general_purpose::STANDARD.encode(bytes))
107 }
108 OID_JSON | OID_JSONB => serde_json::from_str(text).map_err(|e| {
109 FaucetError::Source(format!("pgoutput: json/jsonb parse {text:?}: {e}"))
110 })?,
111 other => {
112 if let Some(elem_oid) = array_element_oid(other)
117 && let Some(elements) = parse_pg_array(text)
118 {
119 let mut out = Vec::with_capacity(elements.len());
120 for elem in elements {
121 match elem {
122 Some(s) => out.push(text_to_json(elem_oid, &s)?),
123 None => out.push(Value::Null),
124 }
125 }
126 Value::Array(out)
127 } else {
128 Value::String(text.into())
129 }
130 }
131 })
132}
133
134fn parse_pg_array(text: &str) -> Option<Vec<Option<String>>> {
140 let bytes = text.as_bytes();
141 if bytes.first() != Some(&b'{') || bytes.last() != Some(&b'}') {
142 return None;
143 }
144 let inner = &text[1..text.len() - 1];
145 if inner.is_empty() {
146 return Some(Vec::new());
147 }
148
149 let mut out: Vec<Option<String>> = Vec::new();
150 let mut cur = String::new();
151 let mut in_quotes = false;
152 let mut quoted = false; let mut chars = inner.chars().peekable();
154
155 while let Some(c) = chars.next() {
156 if in_quotes {
157 match c {
158 '\\' => {
159 if let Some(next) = chars.next() {
161 cur.push(next);
162 }
163 }
164 '"' => in_quotes = false,
165 _ => cur.push(c),
166 }
167 continue;
168 }
169 match c {
170 '"' => {
171 in_quotes = true;
172 quoted = true;
173 }
174 '{' => return None,
176 ',' => {
177 out.push(finish_array_element(&cur, quoted));
178 cur.clear();
179 quoted = false;
180 }
181 _ => cur.push(c),
182 }
183 }
184 if in_quotes {
185 return None; }
187 out.push(finish_array_element(&cur, quoted));
188 Some(out)
189}
190
191fn finish_array_element(raw: &str, quoted: bool) -> Option<String> {
194 if !quoted && raw == "NULL" {
195 None
196 } else {
197 Some(raw.to_owned())
198 }
199}
200
201fn hex_decode(s: &str) -> Result<Vec<u8>, FaucetError> {
202 if !s.len().is_multiple_of(2) {
203 return Err(FaucetError::Source(format!(
204 "pgoutput: bytea hex has odd length: {s:?}"
205 )));
206 }
207 let mut out = Vec::with_capacity(s.len() / 2);
208 for i in (0..s.len()).step_by(2) {
209 out.push(
210 u8::from_str_radix(&s[i..i + 2], 16)
211 .map_err(|e| FaucetError::Source(format!("pgoutput: bytea hex {s:?}: {e}")))?,
212 );
213 }
214 Ok(out)
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220 use serde_json::json;
221
222 #[test]
223 fn bool_t_and_f() {
224 assert_eq!(text_to_json(OID_BOOL, "t").unwrap(), json!(true));
225 assert_eq!(text_to_json(OID_BOOL, "f").unwrap(), json!(false));
226 assert!(text_to_json(OID_BOOL, "yes").is_err());
227 }
228
229 #[test]
230 fn integer_types() {
231 assert_eq!(text_to_json(OID_INT2, "32000").unwrap(), json!(32000));
232 assert_eq!(text_to_json(OID_INT4, "-1").unwrap(), json!(-1));
233 assert_eq!(
234 text_to_json(OID_INT8, "9223372036854775807").unwrap(),
235 json!(9223372036854775807_i64)
236 );
237 assert!(text_to_json(OID_INT4, "abc").is_err());
238 }
239
240 #[test]
241 fn floats() {
242 assert_eq!(text_to_json(OID_FLOAT8, "3.5").unwrap(), json!(3.5));
243 assert_eq!(text_to_json(OID_FLOAT8, "NaN").unwrap(), json!("NaN"));
245 assert_eq!(
246 text_to_json(OID_FLOAT4, "Infinity").unwrap(),
247 json!("Infinity")
248 );
249 assert_eq!(
250 text_to_json(OID_FLOAT8, "-Infinity").unwrap(),
251 json!("-Infinity")
252 );
253 }
254
255 #[test]
256 fn int_array_decodes_to_json_array() {
257 assert_eq!(text_to_json(1007, "{1,2,3}").unwrap(), json!([1, 2, 3]));
258 assert_eq!(text_to_json(1007, "{}").unwrap(), json!([]));
259 assert_eq!(text_to_json(1016, "{-9,0,9}").unwrap(), json!([-9, 0, 9]));
260 }
261
262 #[test]
263 fn text_array_handles_quotes_nulls_and_commas() {
264 assert_eq!(
265 text_to_json(1009, r#"{a,"b,c",NULL,"NULL"}"#).unwrap(),
266 json!(["a", "b,c", null, "NULL"])
267 );
268 assert_eq!(
270 text_to_json(1009, r#"{"he said \"hi\""}"#).unwrap(),
271 json!(["he said \"hi\""])
272 );
273 }
274
275 #[test]
276 fn bool_array_decodes() {
277 assert_eq!(
278 text_to_json(1000, "{t,f,t}").unwrap(),
279 json!([true, false, true])
280 );
281 }
282
283 #[test]
284 fn multidimensional_array_falls_back_to_string() {
285 assert_eq!(
286 text_to_json(1007, "{{1,2},{3,4}}").unwrap(),
287 json!("{{1,2},{3,4}}")
288 );
289 }
290
291 #[test]
292 fn numeric_kept_as_string() {
293 assert_eq!(
294 text_to_json(OID_NUMERIC, "12345.67890").unwrap(),
295 json!("12345.67890")
296 );
297 }
298
299 #[test]
300 fn bytea_base64() {
301 assert_eq!(
303 text_to_json(OID_BYTEA, "\\xDEADBEEF").unwrap(),
304 json!("3q2+7w==")
305 );
306 assert!(text_to_json(OID_BYTEA, "deadbeef").is_err()); assert!(text_to_json(OID_BYTEA, "\\xZZ").is_err()); }
309
310 #[test]
311 fn json_columns_parsed() {
312 assert_eq!(
313 text_to_json(OID_JSON, r#"{"a":1}"#).unwrap(),
314 json!({"a": 1})
315 );
316 assert_eq!(
317 text_to_json(OID_JSONB, r#"[1,2,3]"#).unwrap(),
318 json!([1, 2, 3])
319 );
320 }
321
322 #[test]
323 fn unknown_oid_falls_back_to_string() {
324 assert_eq!(
325 text_to_json(99999, "2026-05-17 12:34:56+00").unwrap(),
326 json!("2026-05-17 12:34:56+00")
327 );
328 }
329}