use crate::column_info::ColumnBaseInfo;
use crate::to_json::PgRowParse;
use base64::{engine::general_purpose, Engine};
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
use rust_decimal::Decimal;
use serde_json::{json, Value as JsonValue};
use sqlx::postgres::{PgRow, PgTypeInfo};
use sqlx::types::uuid;
use sqlx::{Column, Row, TypeInfo};
use crate::decode::decode_auto;
use super::f64_to_json_safe;
crate::impl_to_json!(PgRow, PgRowParse);
pub fn determine_parsing_methods(row: &PgRow) -> anyhow::Result<PgRowParse> {
let customizer = super::get_customizer();
let columns = row.columns();
let mut methods = Vec::with_capacity(columns.len());
let mut new_columns: Vec<ColumnBaseInfo> = Vec::with_capacity(columns.len());
for col in columns {
let col_index = col.ordinal();
let col_name = col.name();
let type_info = col.type_info();
let field_type = detect_pg_type(type_info);
let method: fn(&PgRow, usize) -> JsonValue = customizer
.customize_pg(field_type)
.unwrap_or_else(|| default_pg_method(field_type));
new_columns.push(ColumnBaseInfo {
name: col_name.to_string(),
r#type: field_type.to_string(),
index: col_index as u64,
});
methods.push(method);
}
Ok(PgRowParse {
methods,
columns: new_columns,
})
}
#[inline]
fn default_pg_method(field_type: &str) -> fn(&PgRow, usize) -> JsonValue {
match field_type {
"TEXT" => parse_text_value,
"INT" => parse_integer_value,
"FLOAT" => parse_float_value,
"NUMERIC" => parse_decimal_value,
"BOOL" => parse_bool_value,
"DATE" => parse_date_value,
"TIMESTAMP" => parse_datetime_value,
"TIMESTAMPTZ" => parse_utc_value,
"TIME" => parse_time_value,
"JSON" => parse_json_value,
"BYTEA" => parse_bytea_value,
"UUID" => parse_uuid_value,
"ARRAY" => parse_array,
"VECTOR" => parse_vector_value,
"SPARSEVEC" => parse_sparsevec_value,
"BIT" => parse_bit_value,
"GEOMETRY" | "GEOGRAPHY" | "HSTORE" => parse_text_value,
"ENUM" | "COMPOSITE" | "PSEUDO" => parse_text_value,
"RANGE" => parse_range_value,
_ => parse_text_value,
}
}
pub fn detect_pg_type(type_info: &PgTypeInfo) -> &'static str {
let kind: &sqlx::postgres::PgTypeKind = type_info.kind();
match kind {
sqlx::postgres::PgTypeKind::Simple => {
let name = type_info.name().to_lowercase();
match name.as_str() {
"int2" | "smallint" | "smallserial" | "serial2"
| "int4" | "integer" | "serial" | "serial4"
| "int8" | "bigint" | "bigserial" | "serial8" => "INT",
"float4" | "real" | "float8" | "double precision" => "FLOAT",
"numeric" | "decimal" => "NUMERIC",
"bool" | "boolean" => "BOOL",
"text" | "varchar" | "char" | "bpchar" | "citext" | "name" => "TEXT",
"date" => "DATE",
"timestamp" | "timestamp without time zone" => "TIMESTAMP",
"timestamptz" | "timestamp with time zone" => "TIMESTAMPTZ",
"time" | "timetz" | "time without time zone" => "TIME",
"jsonb" | "json" => "JSON",
"bytea" => "BYTEA",
"uuid" => "UUID",
"interval" | "money" | "inet" | "cidr" | "macaddr" | "xml" => "TEXT",
"geometry" | "geography" => "GEOMETRY",
"hstore" => "HSTORE",
"vector" | "halfvec" => "VECTOR",
"sparsevec" => "SPARSEVEC",
"bit" | "varbit" | "bit varying" => "BIT",
_ => "TEXT",
}
}
sqlx::postgres::PgTypeKind::Array(_) => "ARRAY",
sqlx::postgres::PgTypeKind::Enum(_) => "ENUM",
sqlx::postgres::PgTypeKind::Composite(_) => "COMPOSITE",
sqlx::postgres::PgTypeKind::Range(_) => "RANGE",
sqlx::postgres::PgTypeKind::Domain(inner) => detect_pg_type(inner),
sqlx::postgres::PgTypeKind::Pseudo => "PSEUDO",
}
}
fn parse_text_value(row: &PgRow, col_index: usize) -> JsonValue {
if let Ok(Some(v)) = row.try_get::<Option<String>, _>(col_index) {
JsonValue::String(v)
} else {
JsonValue::Null
}
}
fn parse_uuid_value(row: &PgRow, col_index: usize) -> JsonValue {
if let Ok(Some(v)) = row.try_get::<Option<uuid::Uuid>, _>(col_index) {
JsonValue::String(v.to_string())
} else {
JsonValue::Null
}
}
fn parse_integer_value(row: &PgRow, col_index: usize) -> JsonValue {
if let Ok(Some(i)) = row.try_get::<Option<i64>, _>(col_index) {
return json!(i);
}
if let Ok(Some(i)) = row.try_get::<Option<i32>, _>(col_index) {
return json!(i as i64);
}
if let Ok(Some(i)) = row.try_get::<Option<i16>, _>(col_index) {
return json!(i as i64);
}
JsonValue::Null
}
fn parse_float_value(row: &PgRow, col_index: usize) -> JsonValue {
match row.try_get::<Option<f64>, _>(col_index) {
Ok(Some(f)) => f64_to_json_safe(f),
_ => JsonValue::Null,
}
}
fn parse_bool_value(row: &PgRow, col_index: usize) -> JsonValue {
match row.try_get::<Option<bool>, _>(col_index) {
Ok(Some(b)) => json!(b),
_ => JsonValue::Null,
}
}
fn parse_date_value(row: &PgRow, col_index: usize) -> JsonValue {
match row.try_get::<Option<NaiveDate>, _>(col_index) {
Ok(Some(d)) => json!(d.format("%Y-%m-%d").to_string()),
_ => JsonValue::Null,
}
}
fn parse_datetime_value(row: &PgRow, col_index: usize) -> JsonValue {
match row.try_get::<Option<NaiveDateTime>, _>(col_index) {
Ok(Some(dt)) => json!(dt.format("%Y-%m-%d %H:%M:%S").to_string()),
_ => JsonValue::Null,
}
}
fn parse_utc_value(row: &PgRow, col_index: usize) -> JsonValue {
match row.try_get::<Option<DateTime<Utc>>, _>(col_index) {
Ok(Some(dt)) => json!(dt.to_rfc3339()),
Ok(None) => JsonValue::Null,
Err(_) => JsonValue::Null,
}
}
fn parse_time_value(row: &PgRow, col_index: usize) -> JsonValue {
if let Ok(Some(t)) = row.try_get::<Option<NaiveTime>, _>(col_index) {
return JsonValue::String(t.format("%H:%M:%S").to_string());
}
if let Ok(Some(t)) = row.try_get::<Option<String>, _>(col_index) {
return JsonValue::String(t);
}
JsonValue::Null
}
fn parse_json_value(row: &PgRow, col_index: usize) -> JsonValue {
match row.try_get::<Option<JsonValue>, _>(col_index) {
Ok(Some(j)) => j,
_ => JsonValue::Null,
}
}
fn parse_decimal_value(row: &PgRow, col_index: usize) -> JsonValue {
match row.try_get::<Option<Decimal>, _>(col_index) {
Ok(Some(d)) => {
let s = d.to_string();
json!(s)
}
_ => JsonValue::Null,
}
}
fn parse_bytea_value(row: &PgRow, col_index: usize) -> JsonValue {
match row.try_get::<Option<Vec<u8>>, _>(col_index) {
Ok(Some(b)) => {
let is_text = super::blob_is_text(&b);
if is_text {
JsonValue::String(decode_auto(&b))
} else {
JsonValue::String(general_purpose::STANDARD.encode(b))
}
}
_ => JsonValue::Null,
}
}
fn parse_array(row: &PgRow, col_index: usize) -> JsonValue {
match row.try_get::<Option<String>, _>(col_index) {
Ok(Some(d)) => parse_postgres_array(&d),
_ => JsonValue::Null,
}
}
fn parse_postgres_array(input: &str) -> JsonValue {
let s = input.trim();
if s.is_empty() || s.eq_ignore_ascii_case("null") || s == "{}" {
return JsonValue::Array(Vec::new());
}
if !s.starts_with('{') || !s.ends_with('}') {
return JsonValue::String(s.to_owned());
}
let content = &s[1..s.len() - 1];
if !content.contains('"') {
return JsonValue::Array(
content
.split(',')
.map(|item| parse_array_element_fast(item.trim()))
.collect(),
);
}
let mut items = Vec::with_capacity(4);
let mut current = String::with_capacity(16);
let mut in_quotes = false;
let mut prev_escape = false;
for c in content.chars() {
if prev_escape {
current.push(c);
prev_escape = false;
continue;
}
match c {
'\\' if in_quotes => {
prev_escape = true;
}
'"' => {
in_quotes = !in_quotes;
}
',' if !in_quotes => {
let trimmed = current.trim();
items.push(if trimmed.eq_ignore_ascii_case("null") {
JsonValue::Null
} else {
parse_array_element_owned(trimmed)
});
current.clear();
}
_ => {
current.push(c);
}
}
}
if !current.is_empty() {
let trimmed = current.trim();
items.push(if trimmed.eq_ignore_ascii_case("null") {
JsonValue::Null
} else {
parse_array_element_owned(trimmed)
});
}
JsonValue::Array(items)
}
#[inline]
#[allow(dead_code)]
fn parse_array_element_fast(trimmed: &str) -> JsonValue {
match trimmed {
"" | "NULL" | "null" => JsonValue::Null,
s => {
if let Ok(n) = s.parse::<i64>() {
return JsonValue::Number(n.into());
}
if let Ok(n) = s.parse::<f64>() {
if let Some(num) = serde_json::Number::from_f64(n) {
return JsonValue::Number(num);
}
}
JsonValue::String(s.to_owned())
}
}
}
#[inline]
#[allow(dead_code)]
fn parse_array_element_owned(s: &str) -> JsonValue {
let trimmed = s.trim();
match trimmed {
"" | "NULL" | "null" => JsonValue::Null,
_ => {
if let Ok(n) = trimmed.parse::<i64>() {
JsonValue::Number(n.into())
} else if let Ok(n) = trimmed.parse::<f64>() {
f64_to_json_safe(n)
} else {
JsonValue::String(trimmed.to_owned())
}
}
}
}
fn parse_range_value(row: &PgRow, col_index: usize) -> JsonValue {
if let Ok(Some(s)) = row.try_get::<Option<String>, _>(col_index) {
let trimmed = s.trim();
if trimmed == "empty" || trimmed.is_empty() {
return json!({"empty": true});
}
if trimmed.len() >= 3 {
let lower_inc = trimmed.starts_with('[');
let upper_inc = trimmed.ends_with(']');
let inner = &trimmed[1..trimmed.len() - 1];
if let Some((lower, upper)) = inner.split_once(',') {
let lower = lower.trim();
let upper = upper.trim();
let lower_val = if lower.is_empty() {
JsonValue::Null
} else {
parse_range_element(lower)
};
let upper_val = if upper.is_empty() {
JsonValue::Null
} else {
parse_range_element(upper)
};
return json!({
"lower": lower_val,
"upper": upper_val,
"lower_inc": lower_inc,
"upper_inc": upper_inc
});
}
}
JsonValue::String(s)
} else {
JsonValue::Null
}
}
#[inline]
fn parse_range_element(s: &str) -> JsonValue {
if let Ok(i) = s.parse::<i64>() {
return json!(i);
}
if let Ok(f) = s.parse::<f64>() {
return super::f64_to_json_safe(f);
}
JsonValue::String(s.to_string())
}
fn parse_vector_value(row: &PgRow, col_index: usize) -> JsonValue {
if let Ok(Some(s)) = row.try_get::<Option<String>, _>(col_index) {
return super::parse_vector_string(&s);
}
if let Ok(Some(bytes)) = row.try_get::<Option<Vec<u8>>, _>(col_index) {
return super::parse_vector_bytes(&bytes);
}
JsonValue::Null
}
fn parse_sparsevec_value(row: &PgRow, col_index: usize) -> JsonValue {
if let Ok(Some(s)) = row.try_get::<Option<String>, _>(col_index) {
return parse_sparsevec_string(&s);
}
JsonValue::Null
}
fn parse_sparsevec_string(s: &str) -> JsonValue {
let trimmed = s.trim();
if let Some((entries_part, dim_part)) = trimmed.rsplit_once('/') {
let dimensions: i64 = match dim_part.trim().parse() {
Ok(d) => d,
Err(_) => return JsonValue::String(trimmed.to_string()),
};
let entries_str = entries_part.trim();
let inner = if entries_str.starts_with('{') && entries_str.ends_with('}') {
&entries_str[1..entries_str.len() - 1]
} else {
entries_str
};
let mut indices = Vec::new();
let mut values = Vec::new();
if !inner.is_empty() {
for pair in inner.split(',') {
let pair = pair.trim();
if let Some((idx_str, val_str)) = pair.split_once(':') {
if let Ok(idx) = idx_str.trim().parse::<i64>() {
indices.push(json!(idx));
if let Ok(val) = val_str.trim().parse::<f64>() {
values.push(super::f64_to_json_safe(val));
} else {
values.push(JsonValue::String(val_str.trim().to_string()));
}
}
}
}
}
return json!({
"dimensions": dimensions,
"indices": indices,
"values": values
});
}
JsonValue::String(trimmed.to_string())
}
fn parse_bit_value(row: &PgRow, col_index: usize) -> JsonValue {
if let Ok(Some(s)) = row.try_get::<Option<String>, _>(col_index) {
return JsonValue::String(s);
}
if let Ok(Some(bytes)) = row.try_get::<Option<Vec<u8>>, _>(col_index) {
let bit_str: String = bytes.iter()
.map(|b| format!("{:08b}", b))
.collect();
return JsonValue::String(bit_str);
}
JsonValue::Null
}