use polars::{
prelude::{DataType, ListChunked, NamedFrom, TimeUnit},
series::{IntoSeries, Series},
};
use postgres_protocol::message::backend::Field;
use crate::utils::text_array::parse_text_array;
#[derive(Debug, Clone)]
pub struct ColumnResult<T> {
pub name: String,
pub data: Vec<Option<T>>,
}
impl<T> ColumnResult<T> {
pub fn new(name: String) -> Self {
ColumnResult {
name,
data: Vec::with_capacity(1024),
}
}
pub fn push(&mut self, value: T) {
self.data.push(Some(value));
self.extend_vec();
}
pub fn push_null(&mut self) {
self.data.push(None);
self.extend_vec();
}
fn extend_vec(&mut self) {
if self.data.len() == self.data.capacity() {
self.data.reserve(1024);
}
}
fn clone_empty(&self) -> Self {
ColumnResult {
name: self.name.clone(),
data: Vec::with_capacity(1024),
}
}
}
#[derive(Debug, Clone)]
pub enum ColumnStorage {
Ints(ColumnResult<i32>),
Texts(ColumnResult<String>),
Bools(ColumnResult<bool>),
Dates(ColumnResult<i32>), TextArray(ColumnResult<Vec<Option<String>>>),
Timestamps(ColumnResult<i64>), Doubles(ColumnResult<f64>),
TimestampsWtz(ColumnResult<i64>), Times(ColumnResult<i64>), Bytes(ColumnResult<Vec<u8>>), }
pub fn column_from_field(field: &Field) -> ColumnStorage {
let name = String::from(field.name());
let oid = field.type_oid();
match oid {
23 => ColumnStorage::Ints(ColumnResult::new(name)), 25 | 1043 => ColumnStorage::Texts(ColumnResult::new(name)), 16 => ColumnStorage::Bools(ColumnResult::new(name)), 1082 => ColumnStorage::Dates(ColumnResult::new(name)), 1009 => ColumnStorage::TextArray(ColumnResult::new(name)), 1184 => ColumnStorage::Timestamps(ColumnResult::new(name)), 701 => ColumnStorage::Doubles(ColumnResult::new(name)), 1114 => ColumnStorage::TimestampsWtz(ColumnResult::new(name)), 1083 => ColumnStorage::Times(ColumnResult::new(name)), _ => {
println!(
"⚠️ Unknown type column: name={}, type_oid={}, format={}",
field.name(),
field.type_oid(),
field.format()
);
ColumnStorage::Bytes(ColumnResult::new(name))
} }
}
pub fn push_column_value(column: &mut ColumnStorage, value: Option<&[u8]>) {
match column {
ColumnStorage::Ints(col) => match value {
Some(bytes) if bytes.len() == 4 => {
let val = i32::from_be_bytes(bytes.try_into().unwrap());
col.push(val);
}
_ => col.push_null(),
},
ColumnStorage::Texts(col) => match value {
Some(bytes) => {
let val = std::str::from_utf8(bytes).unwrap().to_string();
col.push(val);
}
_ => col.push_null(),
},
ColumnStorage::Bools(col) => match value {
Some(&[b]) => col.push(b != 0),
_ => col.push_null(),
},
ColumnStorage::Bytes(col) => match value {
Some(bytes) => col.push(bytes.to_vec()),
_ => col.push_null(),
},
ColumnStorage::Dates(col) => match value {
Some(bytes) if bytes.len() == 4 => {
let pg_days = i32::from_be_bytes(bytes.try_into().unwrap()); let unix_days = pg_days + 10957; col.push(unix_days);
}
_ => col.push_null(),
},
ColumnStorage::TextArray(col) => match value {
Some(bytes) => {
let val = parse_text_array(bytes).unwrap();
col.push(val);
}
_ => col.push_null(),
},
ColumnStorage::Timestamps(col) => match value {
Some(bytes) if bytes.len() == 8 => {
let pg_microseconds = i64::from_be_bytes(bytes.try_into().unwrap());
let unix_microseconds = pg_microseconds + 946684800_000_000; col.push(unix_microseconds);
}
_ => col.push_null(),
},
ColumnStorage::Doubles(col) => match value {
Some(bytes) if bytes.len() == 8 => {
let val = f64::from_be_bytes(bytes.try_into().unwrap());
col.push(val);
}
_ => col.push_null(),
},
ColumnStorage::TimestampsWtz(col) => match value {
Some(bytes) if bytes.len() == 8 => {
let micros_pg_epoch = i64::from_be_bytes(bytes.try_into().unwrap());
let micros_unix_epoch = micros_pg_epoch + 946684800_000_000; col.push(micros_unix_epoch);
}
_ => col.push_null(),
},
ColumnStorage::Times(col) => match value {
Some(bytes) if bytes.len() == 8 => {
let micros_since_midnight = i64::from_be_bytes(bytes.try_into().unwrap());
let nanos_since_midnight = micros_since_midnight * 1000;
col.push(nanos_since_midnight);
}
_ => col.push_null(),
},
}
}
pub fn column_to_series(column: ColumnStorage) -> Series {
match column {
ColumnStorage::Ints(col) => Series::new(col.name.into(), &col.data),
ColumnStorage::Texts(col) => Series::new(col.name.into(), &col.data),
ColumnStorage::Bools(col) => Series::new(col.name.into(), &col.data),
ColumnStorage::Bytes(col) => Series::new(col.name.into(), &col.data),
ColumnStorage::Dates(col) => Series::new(col.name.into(), &col.data)
.cast(&DataType::Date)
.unwrap(),
ColumnStorage::TextArray(col) => text_array_to_series(col.name.as_str(), col.data),
ColumnStorage::Timestamps(col) => Series::new(col.name.into(), &col.data)
.cast(&DataType::Datetime(TimeUnit::Microseconds, None))
.unwrap(),
ColumnStorage::Doubles(col) => Series::new(col.name.into(), &col.data),
ColumnStorage::TimestampsWtz(col) => Series::new(col.name.into(), &col.data)
.cast(&DataType::Datetime(TimeUnit::Microseconds, None))
.unwrap(),
ColumnStorage::Times(col) => Series::new(col.name.into(), &col.data)
.cast(&DataType::Time)
.unwrap(),
}
}
pub fn text_array_to_series(name: &str, data: Vec<Option<Vec<Option<String>>>>) -> Series {
let list_chunked: ListChunked = data
.into_iter()
.map(|maybe_vec| maybe_vec.map(|v| Series::new("".into(), v)))
.collect();
list_chunked.into_series().with_name(name.into())
}
pub fn clone_storages(columns: &Vec<ColumnStorage>) -> Vec<ColumnStorage> {
let mut copy = Vec::with_capacity(columns.len());
for column in columns {
copy.push(match column {
ColumnStorage::Ints(col) => ColumnStorage::Ints(col.clone_empty()),
ColumnStorage::Texts(col) => ColumnStorage::Texts(col.clone_empty()),
ColumnStorage::Bools(col) => ColumnStorage::Bools(col.clone_empty()),
ColumnStorage::Bytes(col) => ColumnStorage::Bytes(col.clone_empty()),
ColumnStorage::Dates(col) => ColumnStorage::Dates(col.clone_empty()),
ColumnStorage::TextArray(col) => ColumnStorage::TextArray(col.clone_empty()),
ColumnStorage::Timestamps(col) => ColumnStorage::Timestamps(col.clone_empty()),
ColumnStorage::Doubles(col) => ColumnStorage::Doubles(col.clone_empty()),
ColumnStorage::TimestampsWtz(col) => ColumnStorage::TimestampsWtz(col.clone_empty()),
ColumnStorage::Times(col) => ColumnStorage::Times(col.clone_empty()),
});
}
copy
}