use crate::schema::{ColumnId, ColumnType, TableSchema};
use smallvec::SmallVec;
use super::mysql_insert::{FkRef, PkValue};
pub type PkTuple = SmallVec<[PkValue; 2]>;
#[derive(Debug, Clone)]
pub struct ParsedCopyRow {
pub raw: Vec<u8>,
pub pk: Option<PkTuple>,
pub fk_values: Vec<(FkRef, PkTuple)>,
pub all_values: Vec<PkValue>,
pub column_map: Vec<Option<usize>>,
}
impl ParsedCopyRow {
pub fn get_column_value(&self, schema_col_index: usize) -> Option<&PkValue> {
self.column_map
.get(schema_col_index)
.and_then(|v| *v)
.and_then(|val_idx| self.all_values.get(val_idx))
}
}
pub struct CopyParser<'a> {
data: &'a [u8],
table_schema: Option<&'a TableSchema>,
column_order: Vec<Option<ColumnId>>,
}
impl<'a> CopyParser<'a> {
pub fn new(data: &'a [u8]) -> Self {
Self {
data,
table_schema: None,
column_order: Vec::new(),
}
}
pub fn with_schema(mut self, schema: &'a TableSchema) -> Self {
self.table_schema = Some(schema);
self
}
pub fn with_column_order(mut self, columns: Vec<String>) -> Self {
if let Some(schema) = self.table_schema {
self.column_order = columns
.iter()
.map(|name| schema.get_column_id(name))
.collect();
}
self
}
pub fn parse_rows(&mut self) -> anyhow::Result<Vec<ParsedCopyRow>> {
if self.column_order.is_empty() {
if let Some(schema) = self.table_schema {
self.column_order = schema.columns.iter().map(|c| Some(c.ordinal)).collect();
}
}
let mut rows = Vec::new();
let mut pos = 0;
while pos < self.data.len() {
let line_end = self.data[pos..]
.iter()
.position(|&b| b == b'\n')
.map(|p| pos + p)
.unwrap_or(self.data.len());
let line = &self.data[pos..line_end];
if line == b"\\." || line.is_empty() {
pos = line_end + 1;
continue;
}
if let Some(row) = self.parse_row(line)? {
rows.push(row);
}
pos = line_end + 1;
}
Ok(rows)
}
fn parse_row(&self, line: &[u8]) -> anyhow::Result<Option<ParsedCopyRow>> {
let raw = line.to_vec();
let values: Vec<CopyValue> = self.split_and_parse_values(line);
let (pk, fk_values, all_values, column_map) = if let Some(schema) = self.table_schema {
let (pk, fk_values, all_values) = self.extract_pk_fk(&values, schema);
let column_map = self.build_column_map(schema);
(pk, fk_values, all_values, column_map)
} else {
(None, Vec::new(), Vec::new(), Vec::new())
};
Ok(Some(ParsedCopyRow {
raw,
pk,
fk_values,
all_values,
column_map,
}))
}
fn build_column_map(&self, schema: &TableSchema) -> Vec<Option<usize>> {
let mut map = vec![None; schema.columns.len()];
for (val_idx, col_id_opt) in self.column_order.iter().enumerate() {
if let Some(col_id) = col_id_opt {
let ordinal = col_id.0 as usize;
if ordinal < map.len() {
map[ordinal] = Some(val_idx);
}
}
}
map
}
fn split_and_parse_values(&self, line: &[u8]) -> Vec<CopyValue> {
let mut values = Vec::new();
let mut start = 0;
for (i, &b) in line.iter().enumerate() {
if b == b'\t' {
values.push(self.parse_copy_value(&line[start..i]));
start = i + 1;
}
}
if start <= line.len() {
values.push(self.parse_copy_value(&line[start..]));
}
values
}
fn parse_copy_value(&self, value: &[u8]) -> CopyValue {
if value == b"\\N" {
return CopyValue::Null;
}
let decoded = self.decode_copy_escapes(value);
if let Ok(s) = std::str::from_utf8(&decoded) {
if let Ok(n) = s.parse::<i64>() {
return CopyValue::Integer(n);
}
if let Ok(n) = s.parse::<i128>() {
return CopyValue::BigInteger(n);
}
}
CopyValue::Text(decoded)
}
pub fn decode_copy_escapes(&self, value: &[u8]) -> Vec<u8> {
let mut result = Vec::with_capacity(value.len());
let mut i = 0;
while i < value.len() {
if value[i] == b'\\' && i + 1 < value.len() {
let next = value[i + 1];
let decoded = match next {
b'n' => b'\n',
b'r' => b'\r',
b't' => b'\t',
b'\\' => b'\\',
b'N' => {
result.push(b'\\');
result.push(b'N');
i += 2;
continue;
}
_ => {
result.push(b'\\');
result.push(next);
i += 2;
continue;
}
};
result.push(decoded);
i += 2;
} else {
result.push(value[i]);
i += 1;
}
}
result
}
fn extract_pk_fk(
&self,
values: &[CopyValue],
schema: &TableSchema,
) -> (Option<PkTuple>, Vec<(FkRef, PkTuple)>, Vec<PkValue>) {
let mut pk_values = PkTuple::new();
let mut fk_values = Vec::new();
let all_values: Vec<PkValue> = values
.iter()
.enumerate()
.map(|(idx, v)| {
let col = self
.column_order
.get(idx)
.and_then(|c| *c)
.and_then(|id| schema.column(id));
self.value_to_pk(v, col)
})
.collect();
for (idx, col_id_opt) in self.column_order.iter().enumerate() {
if let Some(col_id) = col_id_opt {
if schema.is_pk_column(*col_id) {
if let Some(value) = values.get(idx) {
let pk_val = self.value_to_pk(value, schema.column(*col_id));
pk_values.push(pk_val);
}
}
}
}
for (fk_idx, fk) in schema.foreign_keys.iter().enumerate() {
if fk.referenced_table_id.is_none() {
continue;
}
let mut fk_tuple = PkTuple::new();
let mut all_non_null = true;
for &col_id in &fk.columns {
if let Some(idx) = self.column_order.iter().position(|&c| c == Some(col_id)) {
if let Some(value) = values.get(idx) {
let pk_val = self.value_to_pk(value, schema.column(col_id));
if pk_val.is_null() {
all_non_null = false;
break;
}
fk_tuple.push(pk_val);
}
}
}
if all_non_null && !fk_tuple.is_empty() {
fk_values.push((
FkRef {
table_id: schema.id.0,
fk_index: fk_idx as u16,
},
fk_tuple,
));
}
}
let pk = if pk_values.is_empty() || pk_values.iter().any(|v| v.is_null()) {
None
} else {
Some(pk_values)
};
(pk, fk_values, all_values)
}
fn value_to_pk(&self, value: &CopyValue, col: Option<&crate::schema::Column>) -> PkValue {
match value {
CopyValue::Null => PkValue::Null,
CopyValue::Integer(n) => PkValue::Int(*n),
CopyValue::BigInteger(n) => PkValue::BigInt(*n),
CopyValue::Text(bytes) => {
let s = String::from_utf8_lossy(bytes);
if let Some(col) = col {
match col.col_type {
ColumnType::Int => {
if let Ok(n) = s.parse::<i64>() {
return PkValue::Int(n);
}
}
ColumnType::BigInt => {
if let Ok(n) = s.parse::<i128>() {
return PkValue::BigInt(n);
}
}
_ => {}
}
}
PkValue::Text(s.into_owned().into_boxed_str())
}
}
}
}
#[derive(Debug, Clone)]
enum CopyValue {
Null,
Integer(i64),
BigInteger(i128),
Text(Vec<u8>),
}
pub fn parse_copy_columns(header: &str) -> Vec<String> {
if let Some(start) = header.find('(') {
if let Some(end) = header.find(')') {
let cols = &header[start + 1..end];
return cols
.split(',')
.map(|c| c.trim().trim_matches('"').to_string())
.collect();
}
}
Vec::new()
}
pub fn parse_postgres_copy_rows(
data: &[u8],
schema: &TableSchema,
column_order: Vec<String>,
) -> anyhow::Result<Vec<ParsedCopyRow>> {
let mut parser = CopyParser::new(data)
.with_schema(schema)
.with_column_order(column_order);
parser.parse_rows()
}