Skip to main content

faucet_source_postgres_cdc/pgoutput/
values.rs

1//! Postgres type OID -> JSON Value mapping for text-mode tuple cells.
2//!
3//! Reference: <https://github.com/postgres/postgres/blob/master/src/include/catalog/pg_type.dat>
4//! Only the OIDs that ship with every Postgres install are special-cased;
5//! anything else falls back to a JSON string.
6
7use base64::Engine;
8use faucet_core::FaucetError;
9use serde_json::Value;
10
11// Selected pg_type built-in OIDs.
12const OID_BOOL: u32 = 16;
13const OID_BYTEA: u32 = 17;
14const OID_INT2: u32 = 21;
15const OID_INT4: u32 = 23;
16const OID_INT8: u32 = 20;
17const OID_FLOAT4: u32 = 700;
18const OID_FLOAT8: u32 = 701;
19const OID_NUMERIC: u32 = 1700;
20const OID_JSON: u32 = 114;
21const OID_JSONB: u32 = 3802;
22// Types passed through as their Postgres ISO text form (stable under the
23// default `DateStyle ISO` / `IntervalStyle`): date/time/timestamp/
24// timestamptz/uuid/interval. Downstream consumers don't agree on a single
25// binary encoding, and the text form is already ISO-8601-ish, so we keep
26// the string verbatim rather than risk a lossy reformat.
27
28/// Map a built-in Postgres *array* type OID to its element type OID, so a
29/// `{...}` array literal can be decoded element-by-element instead of being
30/// emitted as one opaque string (e.g. `int4[]` → `"{1,2,3}"`) (#78/#45).
31/// Returns `None` for non-array OIDs.
32fn array_element_oid(array_oid: u32) -> Option<u32> {
33    Some(match array_oid {
34        1000 => OID_BOOL,    // _bool
35        1001 => OID_BYTEA,   // _bytea
36        1005 => OID_INT2,    // _int2
37        1007 => OID_INT4,    // _int4
38        1016 => OID_INT8,    // _int8
39        1021 => OID_FLOAT4,  // _float4
40        1022 => OID_FLOAT8,  // _float8
41        1231 => OID_NUMERIC, // _numeric
42        199 => OID_JSON,     // _json
43        3807 => OID_JSONB,   // _jsonb
44        1009 => 25,          // _text → text
45        1015 => 1043,        // _varchar → varchar
46        1014 => 1042,        // _bpchar → bpchar
47        2951 => 2950,        // _uuid → uuid
48        1115 => 1114,        // _timestamp
49        1185 => 1184,        // _timestamptz
50        1182 => 1082,        // _date
51        1183 => 1083,        // _time
52        _ => return None,
53    })
54}
55
56/// Decode a text-encoded value with the given column type OID into JSON.
57///
58/// Unknown OIDs and decode failures both fall back to wrapping the raw text
59/// in a JSON string — this is the safest default for a generic CDC
60/// connector. We never panic on bad data; the only way this returns `Err` is
61/// if a structurally promised invariant is violated (e.g. `OID_BYTEA` text
62/// that doesn't start with `\x`).
63pub fn text_to_json(type_oid: u32, text: &str) -> Result<Value, FaucetError> {
64    Ok(match type_oid {
65        OID_BOOL => match text {
66            "t" => Value::Bool(true),
67            "f" => Value::Bool(false),
68            other => {
69                return Err(FaucetError::Source(format!(
70                    "pgoutput: bool column has non-t/f text {other:?}"
71                )));
72            }
73        },
74        OID_INT2 | OID_INT4 | OID_INT8 => {
75            let n: i64 = text.parse().map_err(|e| {
76                FaucetError::Source(format!(
77                    "pgoutput: int (oid={type_oid}) parse {text:?}: {e}"
78                ))
79            })?;
80            Value::from(n)
81        }
82        OID_FLOAT4 | OID_FLOAT8 => match text {
83            // JSON has no NaN/Inf. Preserve them as strings so the
84            // non-finite value is distinguishable from a SQL NULL, which
85            // also maps to `Value::Null` (#78/#45).
86            "NaN" => Value::String("NaN".into()),
87            "Infinity" => Value::String("Infinity".into()),
88            "-Infinity" => Value::String("-Infinity".into()),
89            other => {
90                let n: f64 = other.parse().map_err(|e| {
91                    FaucetError::Source(format!("pgoutput: float parse {text:?}: {e}"))
92                })?;
93                serde_json::Number::from_f64(n)
94                    .map(Value::Number)
95                    .unwrap_or(Value::Null)
96            }
97        },
98        OID_NUMERIC => Value::String(text.into()),
99        OID_BYTEA => {
100            let stripped = text.strip_prefix("\\x").ok_or_else(|| {
101                FaucetError::Source(format!(
102                    "pgoutput: bytea text {text:?} missing '\\x' prefix"
103                ))
104            })?;
105            let bytes = hex_decode(stripped)?;
106            Value::String(base64::engine::general_purpose::STANDARD.encode(bytes))
107        }
108        OID_JSON | OID_JSONB => serde_json::from_str(text).map_err(|e| {
109            FaucetError::Source(format!("pgoutput: json/jsonb parse {text:?}: {e}"))
110        })?,
111        other => {
112            // One-dimensional arrays of a known scalar element type decode
113            // into a JSON array; everything else (incl. multi-dimensional
114            // arrays, ranges, composites, enums, and unmapped scalars) falls
115            // back to the raw Postgres text string (#78/#45).
116            if let Some(elem_oid) = array_element_oid(other)
117                && let Some(elements) = parse_pg_array(text)
118            {
119                let mut out = Vec::with_capacity(elements.len());
120                for elem in elements {
121                    match elem {
122                        Some(s) => out.push(text_to_json(elem_oid, &s)?),
123                        None => out.push(Value::Null),
124                    }
125                }
126                Value::Array(out)
127            } else {
128                Value::String(text.into())
129            }
130        }
131    })
132}
133
134/// Parse a one-dimensional Postgres array literal (`{a,b,"c,d",NULL}`) into a
135/// vector of element texts, where `None` marks an unquoted `NULL`. Returns
136/// `None` (caller falls back to the raw string) for anything this simple
137/// parser doesn't handle: a non-`{...}`-delimited input or a nested
138/// (multi-dimensional) array.
139fn parse_pg_array(text: &str) -> Option<Vec<Option<String>>> {
140    let bytes = text.as_bytes();
141    if bytes.first() != Some(&b'{') || bytes.last() != Some(&b'}') {
142        return None;
143    }
144    let inner = &text[1..text.len() - 1];
145    if inner.is_empty() {
146        return Some(Vec::new());
147    }
148
149    let mut out: Vec<Option<String>> = Vec::new();
150    let mut cur = String::new();
151    let mut in_quotes = false;
152    let mut quoted = false; // current element was quoted (so "NULL" != NULL)
153    let mut chars = inner.chars().peekable();
154
155    while let Some(c) = chars.next() {
156        if in_quotes {
157            match c {
158                '\\' => {
159                    // Escaped next char (\" or \\) is taken literally.
160                    if let Some(next) = chars.next() {
161                        cur.push(next);
162                    }
163                }
164                '"' => in_quotes = false,
165                _ => cur.push(c),
166            }
167            continue;
168        }
169        match c {
170            '"' => {
171                in_quotes = true;
172                quoted = true;
173            }
174            // A nested array — bail out, let the caller keep the raw string.
175            '{' => return None,
176            ',' => {
177                out.push(finish_array_element(&cur, quoted));
178                cur.clear();
179                quoted = false;
180            }
181            _ => cur.push(c),
182        }
183    }
184    if in_quotes {
185        return None; // unterminated quote — malformed, fall back to string
186    }
187    out.push(finish_array_element(&cur, quoted));
188    Some(out)
189}
190
191/// An unquoted `NULL` token is a SQL NULL; anything else (incl. a quoted
192/// `"NULL"`) is the literal text.
193fn finish_array_element(raw: &str, quoted: bool) -> Option<String> {
194    if !quoted && raw == "NULL" {
195        None
196    } else {
197        Some(raw.to_owned())
198    }
199}
200
201fn hex_decode(s: &str) -> Result<Vec<u8>, FaucetError> {
202    if !s.len().is_multiple_of(2) {
203        return Err(FaucetError::Source(format!(
204            "pgoutput: bytea hex has odd length: {s:?}"
205        )));
206    }
207    let mut out = Vec::with_capacity(s.len() / 2);
208    for i in (0..s.len()).step_by(2) {
209        out.push(
210            u8::from_str_radix(&s[i..i + 2], 16)
211                .map_err(|e| FaucetError::Source(format!("pgoutput: bytea hex {s:?}: {e}")))?,
212        );
213    }
214    Ok(out)
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220    use serde_json::json;
221
222    #[test]
223    fn bool_t_and_f() {
224        assert_eq!(text_to_json(OID_BOOL, "t").unwrap(), json!(true));
225        assert_eq!(text_to_json(OID_BOOL, "f").unwrap(), json!(false));
226        assert!(text_to_json(OID_BOOL, "yes").is_err());
227    }
228
229    #[test]
230    fn integer_types() {
231        assert_eq!(text_to_json(OID_INT2, "32000").unwrap(), json!(32000));
232        assert_eq!(text_to_json(OID_INT4, "-1").unwrap(), json!(-1));
233        assert_eq!(
234            text_to_json(OID_INT8, "9223372036854775807").unwrap(),
235            json!(9223372036854775807_i64)
236        );
237        assert!(text_to_json(OID_INT4, "abc").is_err());
238    }
239
240    #[test]
241    fn floats() {
242        assert_eq!(text_to_json(OID_FLOAT8, "3.5").unwrap(), json!(3.5));
243        // NaN/Inf are preserved as strings (distinct from a NULL → Value::Null).
244        assert_eq!(text_to_json(OID_FLOAT8, "NaN").unwrap(), json!("NaN"));
245        assert_eq!(
246            text_to_json(OID_FLOAT4, "Infinity").unwrap(),
247            json!("Infinity")
248        );
249        assert_eq!(
250            text_to_json(OID_FLOAT8, "-Infinity").unwrap(),
251            json!("-Infinity")
252        );
253    }
254
255    #[test]
256    fn int_array_decodes_to_json_array() {
257        assert_eq!(text_to_json(1007, "{1,2,3}").unwrap(), json!([1, 2, 3]));
258        assert_eq!(text_to_json(1007, "{}").unwrap(), json!([]));
259        assert_eq!(text_to_json(1016, "{-9,0,9}").unwrap(), json!([-9, 0, 9]));
260    }
261
262    #[test]
263    fn text_array_handles_quotes_nulls_and_commas() {
264        assert_eq!(
265            text_to_json(1009, r#"{a,"b,c",NULL,"NULL"}"#).unwrap(),
266            json!(["a", "b,c", null, "NULL"])
267        );
268        // Escaped quote inside a quoted element.
269        assert_eq!(
270            text_to_json(1009, r#"{"he said \"hi\""}"#).unwrap(),
271            json!(["he said \"hi\""])
272        );
273    }
274
275    #[test]
276    fn bool_array_decodes() {
277        assert_eq!(
278            text_to_json(1000, "{t,f,t}").unwrap(),
279            json!([true, false, true])
280        );
281    }
282
283    #[test]
284    fn multidimensional_array_falls_back_to_string() {
285        assert_eq!(
286            text_to_json(1007, "{{1,2},{3,4}}").unwrap(),
287            json!("{{1,2},{3,4}}")
288        );
289    }
290
291    #[test]
292    fn numeric_kept_as_string() {
293        assert_eq!(
294            text_to_json(OID_NUMERIC, "12345.67890").unwrap(),
295            json!("12345.67890")
296        );
297    }
298
299    #[test]
300    fn bytea_base64() {
301        // \xDEADBEEF -> base64 "3q2+7w=="
302        assert_eq!(
303            text_to_json(OID_BYTEA, "\\xDEADBEEF").unwrap(),
304            json!("3q2+7w==")
305        );
306        assert!(text_to_json(OID_BYTEA, "deadbeef").is_err()); // missing \x
307        assert!(text_to_json(OID_BYTEA, "\\xZZ").is_err()); // not hex
308    }
309
310    #[test]
311    fn json_columns_parsed() {
312        assert_eq!(
313            text_to_json(OID_JSON, r#"{"a":1}"#).unwrap(),
314            json!({"a": 1})
315        );
316        assert_eq!(
317            text_to_json(OID_JSONB, r#"[1,2,3]"#).unwrap(),
318            json!([1, 2, 3])
319        );
320    }
321
322    #[test]
323    fn unknown_oid_falls_back_to_string() {
324        assert_eq!(
325            text_to_json(99999, "2026-05-17 12:34:56+00").unwrap(),
326            json!("2026-05-17 12:34:56+00")
327        );
328    }
329}