pub use postgres_to_polars_derive::IntoDataFrame;
use futures::TryStreamExt;
use polars::chunked_array::builder::{ListBuilderTrait, ListStringChunkedBuilder};
use polars::prelude::{Column, DataFrame, IntoSeries, NamedFrom, PlSmallStr, Series};
const DEFAULT_CAPACITY: usize = 1024;
pub trait VecToColumn {
fn to_column(name: &str, data: Self) -> Column;
}
macro_rules! impl_vec_to_column {
($t:ty) => {
impl VecToColumn for Vec<$t> {
fn to_column(name: &str, data: Self) -> Column {
Series::new(PlSmallStr::from(name), &data).into()
}
}
};
}
impl_vec_to_column!(i32);
impl_vec_to_column!(i64);
impl_vec_to_column!(f32);
impl_vec_to_column!(f64);
impl_vec_to_column!(bool);
impl_vec_to_column!(String);
impl_vec_to_column!(Option<i32>);
impl_vec_to_column!(Option<i64>);
impl_vec_to_column!(Option<f32>);
impl_vec_to_column!(Option<f64>);
impl_vec_to_column!(Option<bool>);
impl_vec_to_column!(Option<String>);
impl_vec_to_column!(chrono::NaiveDate);
impl_vec_to_column!(chrono::NaiveDateTime);
impl_vec_to_column!(chrono::NaiveTime);
impl_vec_to_column!(Option<chrono::NaiveDate>);
impl_vec_to_column!(Option<chrono::NaiveDateTime>);
impl_vec_to_column!(Option<chrono::NaiveTime>);
impl VecToColumn for Vec<chrono::DateTime<chrono::Utc>> {
fn to_column(name: &str, data: Self) -> Column {
let naive: Vec<chrono::NaiveDateTime> = data.into_iter().map(|dt| dt.naive_utc()).collect();
Series::new(PlSmallStr::from(name), &naive).into()
}
}
impl VecToColumn for Vec<Option<chrono::DateTime<chrono::Utc>>> {
fn to_column(name: &str, data: Self) -> Column {
let naive: Vec<Option<chrono::NaiveDateTime>> = data
.into_iter()
.map(|opt| opt.map(|dt| dt.naive_utc()))
.collect();
Series::new(PlSmallStr::from(name), &naive).into()
}
}
impl VecToColumn for Vec<Option<Vec<String>>> {
fn to_column(name: &str, data: Self) -> Column {
let name = PlSmallStr::from(name);
let values_cap = data.iter().map(|o| o.as_ref().map_or(0, |v| v.len())).sum();
let mut builder = ListStringChunkedBuilder::new(name, data.len(), values_cap);
for opt in &data {
match opt {
Some(v) => builder.append_values_iter(v.iter().map(|s| s.as_str())),
None => builder.append_null(),
}
}
builder.finish().into_series().into()
}
}
impl VecToColumn for Vec<Vec<String>> {
fn to_column(name: &str, data: Self) -> Column {
let name = PlSmallStr::from(name);
let values_cap = data.iter().map(|v| v.len()).sum();
let mut builder = ListStringChunkedBuilder::new(name, data.len(), values_cap);
for v in &data {
builder.append_values_iter(v.iter().map(|s| s.as_str()));
}
builder.finish().into_series().into()
}
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("sqlx error: {0}")]
Sqlx(#[from] sqlx::Error),
#[error("polars error: {0}")]
Polars(#[from] polars::prelude::PolarsError),
}
pub trait HasDataFrameBuilder: Sized {
type Builder: DataFrameBuilder<Self> + Send;
fn dataframe_builder(capacity: usize) -> Self::Builder;
}
pub trait DataFrameBuilder<T>: Sized {
fn push(&mut self, row: T);
fn build(self) -> Result<DataFrame, polars::prelude::PolarsError>;
}
pub trait StreamToDataFrame: Sized {
fn to_dataframe<T: HasDataFrameBuilder<Builder: Send>>(
self,
capacity: usize,
) -> impl std::future::Future<Output = Result<DataFrame, Error>> + Send
where
Self: futures::Stream<Item = Result<T, sqlx::Error>> + Unpin + Send;
fn to_dataframe_default<T: HasDataFrameBuilder<Builder: Send>>(
self,
) -> impl std::future::Future<Output = Result<DataFrame, Error>> + Send
where
Self: futures::Stream<Item = Result<T, sqlx::Error>> + Unpin + Send;
}
impl<S> StreamToDataFrame for S
where
S: Sized,
{
async fn to_dataframe<T: HasDataFrameBuilder<Builder: Send>>(
mut self,
capacity: usize,
) -> Result<DataFrame, Error>
where
Self: futures::Stream<Item = Result<T, sqlx::Error>> + Unpin + Send,
{
let mut builder = T::dataframe_builder(capacity);
while let Some(row) = self.try_next().await? {
builder.push(row);
}
Ok(builder.build()?)
}
async fn to_dataframe_default<T: HasDataFrameBuilder<Builder: Send>>(
self,
) -> Result<DataFrame, Error>
where
Self: futures::Stream<Item = Result<T, sqlx::Error>> + Unpin + Send,
{
self.to_dataframe(DEFAULT_CAPACITY).await
}
}