use byteorder::{NetworkEndian as NE, WriteBytesExt};
use cast;
use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc};
use csv;
use geo_types::Geometry;
use hex;
use serde_json::Value;
use std::{
io::{self, prelude::*},
str,
};
use uuid::Uuid;
use crate::common::*;
use crate::drivers::postgres_shared::{
PgColumn, PgCreateTable, PgDataType, PgScalarDataType,
};
use crate::from_csv_cell::FromCsvCell;
use crate::from_json_value::FromJsonValue;
mod write_binary;
use self::write_binary::{GeometryWithSrid, RawJson, RawJsonb, WriteBinary};
pub(crate) type BufferedWriter = io::BufWriter<Box<dyn Write>>;
pub(crate) fn copy_csv_to_pg_binary(
table: &PgCreateTable,
rdr: Box<dyn Read>,
wtr: Box<dyn Write>,
) -> Result<()> {
let mut rdr = csv::Reader::from_reader(rdr);
let mut wtr = io::BufWriter::with_capacity(BUFFER_SIZE, wtr);
let headers = rdr.headers()?;
if headers.len() != table.columns.len() {
return Err(format_err!(
"CSV file has {} columns, but schema has {}",
headers.len(),
table.columns.len(),
));
}
for (idx, (hdr, col)) in headers.iter().zip(table.columns.iter()).enumerate() {
if hdr != col.name {
return Err(format_err!(
"CSV file has column {} at position {}, but schema has {}",
hdr,
idx,
col.name,
));
}
}
wtr.write_all(b"PGCOPY\n")?;
wtr.write_all(&[0o377])?;
wtr.write_all(b"\r\n\0")?;
wtr.write_u32::<NE>(0)?; wtr.write_u32::<NE>(0)?;
for (row_idx, row) in rdr.records().enumerate() {
let row = row?;
wtr.write_i16::<NE>(cast::i16(row.len())?)?;
for (cell, col) in row.iter().zip(table.columns.iter()) {
cell_to_binary(&mut wtr, col, cell).with_context(|_| {
format!(
"could not convert row {}, column {} ({:?})",
row_idx + 1, col.name,
cell,
)
})?;
}
}
Ok(())
}
fn cell_to_binary(wtr: &mut BufferedWriter, col: &PgColumn, cell: &str) -> Result<()> {
if cell.is_empty() && col.is_nullable {
wtr.write_i32::<NE>(-1)?;
} else {
match &col.data_type {
PgDataType::Array {
dimension_count,
ty,
} => {
array_to_binary(wtr, *dimension_count, ty, cell)?;
}
PgDataType::Scalar(ty) => {
scalar_to_binary(wtr, ty, cell)?;
}
}
}
Ok(())
}
fn array_to_binary(
wtr: &mut BufferedWriter,
dimension_count: i32,
data_type: &PgScalarDataType,
cell: &str,
) -> Result<()> {
if dimension_count != 1 {
return Err(format_err!(
"arrays with {} dimensions cannot yet be written to PostgreSQL",
dimension_count,
));
}
let json = serde_json::from_str(cell).context("cannot parse JSON")?;
let json_array = match json {
Value::Array(json_array) => json_array,
other => return Err(format_err!("expected JSON array, found {}", other)),
};
let mut buffer = vec![];
wtr.write_value(&mut buffer, |wtr| {
wtr.write_i32::<NE>(dimension_count)?;
wtr.write_i32::<NE>(1)?;
wtr.write_i32::<NE>(data_type.oid()?)?;
wtr.write_i32::<NE>(cast::i32(json_array.len())?)?;
wtr.write_i32::<NE>(1)?;
for elem in &json_array {
match elem {
Value::Null => {
wtr.write_i32::<NE>(-1)?;
}
other => {
json_to_binary(wtr, data_type, other)?;
}
}
}
Ok(())
})?;
Ok(())
}
fn json_to_binary<W: Write>(
wtr: &mut W,
data_type: &PgScalarDataType,
json: &Value,
) -> Result<()> {
match data_type {
PgScalarDataType::Boolean => write_json_as_binary::<bool, W>(wtr, json),
PgScalarDataType::Date => write_json_as_binary::<NaiveDate, W>(wtr, json),
PgScalarDataType::Numeric => Err(format_err!(
"cannot use `numeric` arrays with PostgreSQL yet",
)),
PgScalarDataType::Real => write_json_as_binary::<f32, W>(wtr, json),
PgScalarDataType::DoublePrecision => write_json_as_binary::<f64, W>(wtr, json),
PgScalarDataType::Geometry(srid) => {
let geometry = Geometry::<f64>::from_json_value(json)?;
let value = GeometryWithSrid {
geometry: &geometry,
srid: *srid,
};
value.write_binary(wtr)
}
PgScalarDataType::Smallint => write_json_as_binary::<i16, W>(wtr, json),
PgScalarDataType::Int => write_json_as_binary::<i32, W>(wtr, json),
PgScalarDataType::Bigint => write_json_as_binary::<i64, W>(wtr, json),
PgScalarDataType::Json => Err(format_err!(
"PostgreSQL arrays with json elements not supported (try jsonb)",
)),
PgScalarDataType::Jsonb => {
let serialized = serde_json::to_string(json)?;
RawJsonb(&serialized).write_binary(wtr)
}
PgScalarDataType::Text => match json {
Value::String(s) => s.as_str().write_binary(wtr),
_ => Err(format_err!("expected JSON string, found {}", json)),
},
PgScalarDataType::TimestampWithoutTimeZone => {
write_json_as_binary::<NaiveDateTime, W>(wtr, json)
}
PgScalarDataType::TimestampWithTimeZone => {
write_json_as_binary::<DateTime<Utc>, W>(wtr, json)
}
PgScalarDataType::Uuid => write_json_as_binary::<Uuid, W>(wtr, json),
}
}
fn write_json_as_binary<T, W>(wtr: &mut W, json: &Value) -> Result<()>
where
T: FromJsonValue + WriteBinary,
W: Write,
{
let value = T::from_json_value(json)?;
value.write_binary(wtr)
}
fn scalar_to_binary(
wtr: &mut BufferedWriter,
data_type: &PgScalarDataType,
cell: &str,
) -> Result<()> {
match data_type {
PgScalarDataType::Boolean => write_cell_as_binary::<bool>(wtr, cell),
PgScalarDataType::Date => write_cell_as_binary::<NaiveDate>(wtr, cell),
PgScalarDataType::Numeric => {
Err(format_err!(
"cannot use numeric columns with PostgreSQL yet",
))
}
PgScalarDataType::Real => write_cell_as_binary::<f32>(wtr, cell),
PgScalarDataType::DoublePrecision => write_cell_as_binary::<f64>(wtr, cell),
PgScalarDataType::Geometry(srid) => {
if !cell.is_empty() && cell.as_bytes()[0].is_ascii_hexdigit() {
let bytes = hex::decode(cell).context("not valid GeoJSON or EWKB")?;
(&bytes[..]).write_binary(wtr)
} else {
let geometry = Geometry::<f64>::from_csv_cell(cell)?;
let value = GeometryWithSrid {
geometry: &geometry,
srid: *srid,
};
value.write_binary(wtr)
}
}
PgScalarDataType::Smallint => write_cell_as_binary::<i16>(wtr, cell),
PgScalarDataType::Int => write_cell_as_binary::<i32>(wtr, cell),
PgScalarDataType::Bigint => write_cell_as_binary::<i64>(wtr, cell),
PgScalarDataType::Json => {
let value = RawJson(cell);
value.write_binary(wtr)
}
PgScalarDataType::Jsonb => {
let value = RawJsonb(cell);
value.write_binary(wtr)
}
PgScalarDataType::Text => cell.write_binary(wtr),
PgScalarDataType::TimestampWithoutTimeZone => {
write_cell_as_binary::<NaiveDateTime>(wtr, cell)
}
PgScalarDataType::TimestampWithTimeZone => {
write_cell_as_binary::<DateTime<Utc>>(wtr, cell)
}
PgScalarDataType::Uuid => write_cell_as_binary::<Uuid>(wtr, cell),
}
}
#[test]
fn parse_ewkb_fallback() {
use crate::schema::Srid;
let cell = "0101000020E61000000000806A7CC351C093985E78E32E4540";
let mut out = BufferedWriter::new(Box::new(vec![]));
scalar_to_binary(&mut out, &PgScalarDataType::Geometry(Srid::wgs84()), cell)
.unwrap();
}
fn write_cell_as_binary<T: FromCsvCell + WriteBinary>(
wtr: &mut BufferedWriter,
cell: &str,
) -> Result<()> {
let value = T::from_csv_cell(cell)?;
value.write_binary(wtr)
}
pub(crate) trait WriteExt {
fn write_len(&mut self, len: usize) -> Result<()>;
fn write_value<F>(&mut self, buffer: &mut Vec<u8>, f: F) -> Result<()>
where
F: FnOnce(&mut Vec<u8>) -> Result<()>;
}
impl<'a, W: Write + 'a> WriteExt for W {
fn write_len(&mut self, len: usize) -> Result<()> {
self.write_i32::<NE>(cast::i32(len)?)?;
Ok(())
}
fn write_value<F>(&mut self, buffer: &mut Vec<u8>, f: F) -> Result<()>
where
F: FnOnce(&mut Vec<u8>) -> Result<()>,
{
assert!(buffer.is_empty());
let result = f(buffer);
if result.is_ok() {
self.write_len(buffer.len())?;
self.write_all(buffer)?;
}
buffer.clear();
result
}
}