use polars::{
prelude::{Column, DataType, IntoColumn, ListChunked, NamedFrom, TimeUnit},
series::{IntoSeries, Series},
};
use postgres_protocol::message::backend::Field;
use tracing::warn;
use crate::{PgToPlError, PgToPlResult, utils::text_array::parse_text_array};
#[derive(Debug, Clone)]
pub struct ColumnResult<T> {
pub name: String,
pub data: Vec<Option<T>>,
}
impl<T> ColumnResult<T> {
pub fn new(name: String) -> Self {
ColumnResult {
name,
data: Vec::with_capacity(1024),
}
}
pub fn push(&mut self, value: T) {
self.data.push(Some(value));
self.extend_vec();
}
pub fn push_null(&mut self) {
self.data.push(None);
self.extend_vec();
}
fn extend_vec(&mut self) {
if self.data.len() == self.data.capacity() {
self.data.reserve(1024);
}
}
fn clone_empty(&self) -> Self {
ColumnResult {
name: self.name.clone(),
data: Vec::with_capacity(1024),
}
}
}
#[derive(Debug, Clone)]
pub enum ColumnStorage {
Ints(ColumnResult<i32>),
Texts(ColumnResult<String>),
Bools(ColumnResult<bool>),
Dates(ColumnResult<i32>), TextArray(ColumnResult<Vec<Option<String>>>),
Timestamps(ColumnResult<i64>), Doubles(ColumnResult<f64>),
TimestampsWtz(ColumnResult<i64>), Times(ColumnResult<i64>), Bytes(ColumnResult<Vec<u8>>), }
pub fn column_from_field(field: &Field) -> ColumnStorage {
let name = String::from(field.name());
let oid = field.type_oid();
match oid {
23 => ColumnStorage::Ints(ColumnResult::new(name)), 25 | 1043 | 19 => ColumnStorage::Texts(ColumnResult::new(name)), 16 => ColumnStorage::Bools(ColumnResult::new(name)), 1082 => ColumnStorage::Dates(ColumnResult::new(name)), 1009 => ColumnStorage::TextArray(ColumnResult::new(name)), 1184 => ColumnStorage::Timestamps(ColumnResult::new(name)), 701 => ColumnStorage::Doubles(ColumnResult::new(name)), 1114 => ColumnStorage::TimestampsWtz(ColumnResult::new(name)), 1083 => ColumnStorage::Times(ColumnResult::new(name)), _ => {
warn!(
"⚠️ Unknown type column: name={}, type_oid={}, format={}",
field.name(),
field.type_oid(),
field.format()
);
ColumnStorage::Bytes(ColumnResult::new(name))
} }
}
pub fn push_column_value(column: &mut ColumnStorage, value: Option<&[u8]>) -> PgToPlResult<()> {
match column {
ColumnStorage::Ints(col) => match value {
Some(bytes) if bytes.len() == 4 => {
let val = i32::from_be_bytes(bytes.try_into().map_err(|e| {
PgToPlError::ColumnTypeError {
bytes: bytes.to_vec(),
type_name: "i32".to_string(),
error: e,
}
})?);
col.push(val);
}
_ => col.push_null(),
},
ColumnStorage::Texts(col) => match value {
Some(bytes) => {
let val = String::from_utf8_lossy(bytes).to_string();
col.push(val);
}
_ => col.push_null(),
},
ColumnStorage::Bools(col) => match value {
Some(&[b]) => col.push(b != 0),
_ => col.push_null(),
},
ColumnStorage::Bytes(col) => match value {
Some(bytes) => col.push(bytes.to_vec()),
_ => col.push_null(),
},
ColumnStorage::Dates(col) => match value {
Some(bytes) if bytes.len() == 4 => {
let pg_days = i32::from_be_bytes(bytes.try_into().map_err(|e| {
PgToPlError::ColumnTypeError {
bytes: bytes.to_vec(),
type_name: "i32".to_string(),
error: e,
}
})?);
let unix_days = pg_days + 10957; col.push(unix_days);
}
_ => col.push_null(),
},
ColumnStorage::TextArray(col) => match value {
Some(bytes) => {
let val = parse_text_array(bytes)?;
col.push(val);
}
_ => col.push_null(),
},
ColumnStorage::Timestamps(col) => match value {
Some(bytes) if bytes.len() == 8 => {
let pg_microseconds = i64::from_be_bytes(bytes.try_into().map_err(|e| {
PgToPlError::ColumnTypeError {
bytes: bytes.to_vec(),
type_name: "i64".to_string(),
error: e,
}
})?);
let unix_microseconds = pg_microseconds + 946684800_000_000; col.push(unix_microseconds);
}
_ => col.push_null(),
},
ColumnStorage::Doubles(col) => match value {
Some(bytes) if bytes.len() == 8 => {
let val = f64::from_be_bytes(bytes.try_into().map_err(|e| {
PgToPlError::ColumnTypeError {
bytes: bytes.to_vec(),
type_name: "f64".to_string(),
error: e,
}
})?);
col.push(val);
}
_ => col.push_null(),
},
ColumnStorage::TimestampsWtz(col) => match value {
Some(bytes) if bytes.len() == 8 => {
let micros_pg_epoch = i64::from_be_bytes(bytes.try_into().map_err(|e| {
PgToPlError::ColumnTypeError {
bytes: bytes.to_vec(),
type_name: "i64".to_string(),
error: e,
}
})?);
let micros_unix_epoch = micros_pg_epoch + 946684800_000_000; col.push(micros_unix_epoch);
}
_ => col.push_null(),
},
ColumnStorage::Times(col) => match value {
Some(bytes) if bytes.len() == 8 => {
let micros_since_midnight = i64::from_be_bytes(bytes.try_into().map_err(|e| {
PgToPlError::ColumnTypeError {
bytes: bytes.to_vec(),
type_name: "i64".to_string(),
error: e,
}
})?);
let nanos_since_midnight = micros_since_midnight * 1000;
col.push(nanos_since_midnight);
}
_ => col.push_null(),
},
};
Ok(())
}
pub fn column_to_series(column: ColumnStorage) -> PgToPlResult<Column> {
let res = match column {
ColumnStorage::Ints(col) => Series::new(col.name.into(), &col.data),
ColumnStorage::Texts(col) => Series::new(col.name.into(), &col.data),
ColumnStorage::Bools(col) => Series::new(col.name.into(), &col.data),
ColumnStorage::Bytes(col) => Series::new(col.name.into(), &col.data),
ColumnStorage::Dates(col) => {
Series::new(col.name.into(), &col.data).cast(&DataType::Date)?
}
ColumnStorage::TextArray(col) => text_array_to_series(col.name.as_str(), col.data),
ColumnStorage::Timestamps(col) => Series::new(col.name.into(), &col.data)
.cast(&DataType::Datetime(TimeUnit::Microseconds, None))?,
ColumnStorage::Doubles(col) => Series::new(col.name.into(), &col.data),
ColumnStorage::TimestampsWtz(col) => Series::new(col.name.into(), &col.data)
.cast(&DataType::Datetime(TimeUnit::Microseconds, None))?,
ColumnStorage::Times(col) => {
Series::new(col.name.into(), &col.data).cast(&DataType::Time)?
}
};
Ok(res.into_column())
}
pub fn text_array_to_series(name: &str, data: Vec<Option<Vec<Option<String>>>>) -> Series {
if data.is_empty() {
Series::new_empty(name.into(), &DataType::List(Box::new(DataType::String)))
} else {
let list_chunked: ListChunked = data
.into_iter()
.map(|maybe_vec| maybe_vec.map(|v| Series::new("".into(), v)))
.collect();
list_chunked.into_series().with_name(name.into())
}
}
pub fn clone_storages(columns: &Vec<ColumnStorage>) -> Vec<ColumnStorage> {
let mut copy = Vec::with_capacity(columns.len());
for column in columns {
copy.push(match column {
ColumnStorage::Ints(col) => ColumnStorage::Ints(col.clone_empty()),
ColumnStorage::Texts(col) => ColumnStorage::Texts(col.clone_empty()),
ColumnStorage::Bools(col) => ColumnStorage::Bools(col.clone_empty()),
ColumnStorage::Bytes(col) => ColumnStorage::Bytes(col.clone_empty()),
ColumnStorage::Dates(col) => ColumnStorage::Dates(col.clone_empty()),
ColumnStorage::TextArray(col) => ColumnStorage::TextArray(col.clone_empty()),
ColumnStorage::Timestamps(col) => ColumnStorage::Timestamps(col.clone_empty()),
ColumnStorage::Doubles(col) => ColumnStorage::Doubles(col.clone_empty()),
ColumnStorage::TimestampsWtz(col) => ColumnStorage::TimestampsWtz(col.clone_empty()),
ColumnStorage::Times(col) => ColumnStorage::Times(col.clone_empty()),
});
}
copy
}