use nodedb_sql::ddl_ast::statement::CopyFormat;
use sonic_rs;
use crate::control::server::pgwire::types::sqlstate_error;
use pgwire::error::PgWireResult;
pub(super) fn serialize_rows(
rows: &[serde_json::Value],
format: &CopyFormat,
delimiter: char,
header: bool,
) -> PgWireResult<Vec<u8>> {
match format {
CopyFormat::Ndjson => serialize_ndjson(rows),
CopyFormat::JsonArray => serialize_json_array(rows),
CopyFormat::Csv => serialize_csv(rows, delimiter, header),
}
}
fn serialize_ndjson(rows: &[serde_json::Value]) -> PgWireResult<Vec<u8>> {
let mut out = Vec::with_capacity(rows.len() * 64);
for row in rows {
let line = sonic_rs::to_vec(row).map_err(|e| {
sqlstate_error("XX000", &format!("COPY TO: JSON serialization error: {e}"))
})?;
out.extend_from_slice(&line);
out.push(b'\n');
}
Ok(out)
}
fn serialize_json_array(rows: &[serde_json::Value]) -> PgWireResult<Vec<u8>> {
let arr = serde_json::Value::Array(rows.to_vec());
let bytes = sonic_rs::to_vec(&arr).map_err(|e| {
sqlstate_error(
"XX000",
&format!("COPY TO: JSON array serialization error: {e}"),
)
})?;
Ok(bytes)
}
fn serialize_csv(
rows: &[serde_json::Value],
delimiter: char,
header: bool,
) -> PgWireResult<Vec<u8>> {
if rows.is_empty() {
return Ok(Vec::new());
}
let columns: Vec<String> = match &rows[0] {
serde_json::Value::Object(map) => {
let mut cols: Vec<String> = map.keys().cloned().collect();
cols.sort();
cols
}
_ => {
return Err(sqlstate_error(
"22P02",
"COPY TO CSV: expected JSON objects as rows",
));
}
};
let mut out = Vec::with_capacity(rows.len() * columns.len() * 16);
if header {
let hdr = csv_row(
&columns.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
delimiter,
);
out.extend_from_slice(hdr.as_bytes());
out.push(b'\n');
}
for (idx, row) in rows.iter().enumerate() {
let obj = match row.as_object() {
Some(m) => m,
None => {
return Err(sqlstate_error(
"22P02",
&format!("COPY TO CSV: row {idx} is not a JSON object"),
));
}
};
let values: Vec<String> = columns
.iter()
.map(|col| json_value_to_csv_field(obj.get(col).unwrap_or(&serde_json::Value::Null)))
.collect();
let line = csv_row(
&values.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
delimiter,
);
out.extend_from_slice(line.as_bytes());
out.push(b'\n');
}
Ok(out)
}
fn csv_row(fields: &[&str], delimiter: char) -> String {
let parts: Vec<String> = fields.iter().map(|f| csv_quote(f, delimiter)).collect();
parts.join(&delimiter.to_string())
}
fn csv_quote(s: &str, delimiter: char) -> String {
if s.contains(delimiter) || s.contains('"') || s.contains('\r') || s.contains('\n') {
format!("\"{}\"", s.replace('"', "\"\""))
} else {
s.to_string()
}
}
fn json_value_to_csv_field(val: &serde_json::Value) -> String {
match val {
serde_json::Value::Null => String::new(),
serde_json::Value::Bool(b) => b.to_string(),
serde_json::Value::Number(n) => n.to_string(),
serde_json::Value::String(s) => s.clone(),
serde_json::Value::Array(_) | serde_json::Value::Object(_) => {
sonic_rs::to_string(val).unwrap_or_default()
}
}
}