use base64::Engine;
use faucet_core::FaucetError;
use serde_json::Value;
const OID_BOOL: u32 = 16;
const OID_BYTEA: u32 = 17;
const OID_INT2: u32 = 21;
const OID_INT4: u32 = 23;
const OID_INT8: u32 = 20;
const OID_FLOAT4: u32 = 700;
const OID_FLOAT8: u32 = 701;
const OID_NUMERIC: u32 = 1700;
const OID_JSON: u32 = 114;
const OID_JSONB: u32 = 3802;
fn array_element_oid(array_oid: u32) -> Option<u32> {
Some(match array_oid {
1000 => OID_BOOL, 1001 => OID_BYTEA, 1005 => OID_INT2, 1007 => OID_INT4, 1016 => OID_INT8, 1021 => OID_FLOAT4, 1022 => OID_FLOAT8, 1231 => OID_NUMERIC, 199 => OID_JSON, 3807 => OID_JSONB, 1009 => 25, 1015 => 1043, 1014 => 1042, 2951 => 2950, 1115 => 1114, 1185 => 1184, 1182 => 1082, 1183 => 1083, _ => return None,
})
}
pub fn text_to_json(type_oid: u32, text: &str) -> Result<Value, FaucetError> {
Ok(match type_oid {
OID_BOOL => match text {
"t" => Value::Bool(true),
"f" => Value::Bool(false),
other => {
return Err(FaucetError::Source(format!(
"pgoutput: bool column has non-t/f text {other:?}"
)));
}
},
OID_INT2 | OID_INT4 | OID_INT8 => {
let n: i64 = text.parse().map_err(|e| {
FaucetError::Source(format!(
"pgoutput: int (oid={type_oid}) parse {text:?}: {e}"
))
})?;
Value::from(n)
}
OID_FLOAT4 | OID_FLOAT8 => match text {
"NaN" => Value::String("NaN".into()),
"Infinity" => Value::String("Infinity".into()),
"-Infinity" => Value::String("-Infinity".into()),
other => {
let n: f64 = other.parse().map_err(|e| {
FaucetError::Source(format!("pgoutput: float parse {text:?}: {e}"))
})?;
serde_json::Number::from_f64(n)
.map(Value::Number)
.unwrap_or(Value::Null)
}
},
OID_NUMERIC => Value::String(text.into()),
OID_BYTEA => {
let stripped = text.strip_prefix("\\x").ok_or_else(|| {
FaucetError::Source(format!(
"pgoutput: bytea text {text:?} missing '\\x' prefix"
))
})?;
let bytes = hex_decode(stripped)?;
Value::String(base64::engine::general_purpose::STANDARD.encode(bytes))
}
OID_JSON | OID_JSONB => serde_json::from_str(text).map_err(|e| {
FaucetError::Source(format!("pgoutput: json/jsonb parse {text:?}: {e}"))
})?,
other => {
if let Some(elem_oid) = array_element_oid(other)
&& let Some(elements) = parse_pg_array(text)
{
let mut out = Vec::with_capacity(elements.len());
for elem in elements {
match elem {
Some(s) => out.push(text_to_json(elem_oid, &s)?),
None => out.push(Value::Null),
}
}
Value::Array(out)
} else {
Value::String(text.into())
}
}
})
}
fn parse_pg_array(text: &str) -> Option<Vec<Option<String>>> {
let bytes = text.as_bytes();
if bytes.first() != Some(&b'{') || bytes.last() != Some(&b'}') {
return None;
}
let inner = &text[1..text.len() - 1];
if inner.is_empty() {
return Some(Vec::new());
}
let mut out: Vec<Option<String>> = Vec::new();
let mut cur = String::new();
let mut in_quotes = false;
let mut quoted = false; let mut chars = inner.chars().peekable();
while let Some(c) = chars.next() {
if in_quotes {
match c {
'\\' => {
if let Some(next) = chars.next() {
cur.push(next);
}
}
'"' => in_quotes = false,
_ => cur.push(c),
}
continue;
}
match c {
'"' => {
in_quotes = true;
quoted = true;
}
'{' => return None,
',' => {
out.push(finish_array_element(&cur, quoted));
cur.clear();
quoted = false;
}
_ => cur.push(c),
}
}
if in_quotes {
return None; }
out.push(finish_array_element(&cur, quoted));
Some(out)
}
fn finish_array_element(raw: &str, quoted: bool) -> Option<String> {
if !quoted && raw == "NULL" {
None
} else {
Some(raw.to_owned())
}
}
fn hex_decode(s: &str) -> Result<Vec<u8>, FaucetError> {
if !s.len().is_multiple_of(2) {
return Err(FaucetError::Source(format!(
"pgoutput: bytea hex has odd length: {s:?}"
)));
}
let mut out = Vec::with_capacity(s.len() / 2);
for i in (0..s.len()).step_by(2) {
out.push(
u8::from_str_radix(&s[i..i + 2], 16)
.map_err(|e| FaucetError::Source(format!("pgoutput: bytea hex {s:?}: {e}")))?,
);
}
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn bool_t_and_f() {
assert_eq!(text_to_json(OID_BOOL, "t").unwrap(), json!(true));
assert_eq!(text_to_json(OID_BOOL, "f").unwrap(), json!(false));
assert!(text_to_json(OID_BOOL, "yes").is_err());
}
#[test]
fn integer_types() {
assert_eq!(text_to_json(OID_INT2, "32000").unwrap(), json!(32000));
assert_eq!(text_to_json(OID_INT4, "-1").unwrap(), json!(-1));
assert_eq!(
text_to_json(OID_INT8, "9223372036854775807").unwrap(),
json!(9223372036854775807_i64)
);
assert!(text_to_json(OID_INT4, "abc").is_err());
}
#[test]
fn floats() {
assert_eq!(text_to_json(OID_FLOAT8, "3.5").unwrap(), json!(3.5));
assert_eq!(text_to_json(OID_FLOAT8, "NaN").unwrap(), json!("NaN"));
assert_eq!(
text_to_json(OID_FLOAT4, "Infinity").unwrap(),
json!("Infinity")
);
assert_eq!(
text_to_json(OID_FLOAT8, "-Infinity").unwrap(),
json!("-Infinity")
);
}
#[test]
fn int_array_decodes_to_json_array() {
assert_eq!(text_to_json(1007, "{1,2,3}").unwrap(), json!([1, 2, 3]));
assert_eq!(text_to_json(1007, "{}").unwrap(), json!([]));
assert_eq!(text_to_json(1016, "{-9,0,9}").unwrap(), json!([-9, 0, 9]));
}
#[test]
fn text_array_handles_quotes_nulls_and_commas() {
assert_eq!(
text_to_json(1009, r#"{a,"b,c",NULL,"NULL"}"#).unwrap(),
json!(["a", "b,c", null, "NULL"])
);
assert_eq!(
text_to_json(1009, r#"{"he said \"hi\""}"#).unwrap(),
json!(["he said \"hi\""])
);
}
#[test]
fn bool_array_decodes() {
assert_eq!(
text_to_json(1000, "{t,f,t}").unwrap(),
json!([true, false, true])
);
}
#[test]
fn multidimensional_array_falls_back_to_string() {
assert_eq!(
text_to_json(1007, "{{1,2},{3,4}}").unwrap(),
json!("{{1,2},{3,4}}")
);
}
#[test]
fn numeric_kept_as_string() {
assert_eq!(
text_to_json(OID_NUMERIC, "12345.67890").unwrap(),
json!("12345.67890")
);
}
#[test]
fn bytea_base64() {
assert_eq!(
text_to_json(OID_BYTEA, "\\xDEADBEEF").unwrap(),
json!("3q2+7w==")
);
assert!(text_to_json(OID_BYTEA, "deadbeef").is_err()); assert!(text_to_json(OID_BYTEA, "\\xZZ").is_err()); }
#[test]
fn json_columns_parsed() {
assert_eq!(
text_to_json(OID_JSON, r#"{"a":1}"#).unwrap(),
json!({"a": 1})
);
assert_eq!(
text_to_json(OID_JSONB, r#"[1,2,3]"#).unwrap(),
json!([1, 2, 3])
);
}
#[test]
fn unknown_oid_falls_back_to_string() {
assert_eq!(
text_to_json(99999, "2026-05-17 12:34:56+00").unwrap(),
json!("2026-05-17 12:34:56+00")
);
}
}