#[cfg(feature = "arrow2_ih")]
extern crate arrow2_ih as arrow2;
use crate::{Error, Time, TimeZone};
use arrow2::array::{Array, Int64Array, Utf8Array};
use arrow2::chunk::Chunk;
use arrow2::datatypes::{DataType, Field, Schema, TimeUnit};
use arrow2::error::Error as ArrowError;
use arrow2::io::ipc::write::{StreamWriter, WriteOptions};
use chrono::{DateTime, Local, NaiveDateTime, SecondsFormat, Utc};
pub type Series = Box<(dyn Array + 'static)>;
#[derive(Default, Clone)]
pub struct DataFrame {
fields: Vec<Field>,
data: Vec<Series>,
rows: usize,
}
impl DataFrame {
#[inline]
pub fn new0(rows: usize) -> Self {
Self::new(rows, None)
}
#[inline]
pub fn new(rows: usize, cols: Option<usize>) -> Self {
Self {
data: Vec::with_capacity(cols.unwrap_or_default()),
rows,
fields: Vec::with_capacity(cols.unwrap_or_default()),
}
}
pub fn new_timeseries_from_float(
time_series: Vec<f64>,
cols: Option<usize>,
tz: TimeZone,
time_unit: TimeUnit,
) -> Self {
let mut df = Self::new(time_series.len(), cols.map(|c| c + 1));
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::cast_possible_wrap)]
let ts = Int64Array::from(
time_series
.into_iter()
.map(|v| {
Some({
match time_unit {
TimeUnit::Second => v.trunc() as i64,
TimeUnit::Millisecond => {
let t = Time::from_timestamp(v);
t.timestamp_ms() as i64
}
TimeUnit::Microsecond => {
let t = Time::from_timestamp(v);
t.timestamp_us() as i64
}
TimeUnit::Nanosecond => {
let t = Time::from_timestamp(v);
t.timestamp_ns() as i64
}
}
})
})
.collect::<Vec<Option<i64>>>(),
)
.boxed();
df.add_series("time", ts, DataType::Timestamp(time_unit, tz.into()))
.unwrap();
df
}
pub fn new_timeseries_from_float_rfc3339(time_series: Vec<f64>, cols: Option<usize>) -> Self {
let mut df = Self::new(time_series.len(), cols.map(|c| c + 1));
let ts: Vec<Option<String>> = time_series
.iter()
.map(|v| {
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::cast_sign_loss)]
let dt_utc = DateTime::<Utc>::from_utc(
NaiveDateTime::from_timestamp_opt(
v.trunc() as i64,
(v.fract() * 1_000_000_000.0) as u32,
)
.unwrap_or_default(),
Utc,
);
let dt: DateTime<Local> = DateTime::from(dt_utc);
Some(dt.to_rfc3339_opts(SecondsFormat::Secs, true))
})
.collect();
df.add_series0("time", Utf8Array::<i32>::from(ts).boxed())
.unwrap();
df
}
pub fn from_chunk(chunk: Chunk<Box<dyn Array + 'static>>, schema: &Schema) -> Self {
let data = chunk.into_arrays();
let rows = data.first().map_or(0, |v| v.len());
Self {
fields: schema.fields.clone(),
data,
rows,
}
}
pub fn from_parts(fields: Vec<Field>, data: Vec<Series>) -> Result<Self, Error> {
let rows = if let Some(x) = data.first() {
let rows = x.len();
for s in data.iter().skip(1) {
if s.len() != rows {
return Err(Error::RowsNotMatch);
}
}
rows
} else {
0
};
Ok(Self { fields, data, rows })
}
pub fn into_parts(self) -> (Vec<Field>, Vec<Series>) {
(self.fields, self.data)
}
#[inline]
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
#[inline]
pub fn names(&self) -> Vec<&str> {
self.fields.iter().map(|col| col.name.as_str()).collect()
}
#[inline]
pub fn fields(&self) -> &[Field] {
&self.fields
}
#[inline]
pub fn data(&self) -> &[Series] {
&self.data
}
pub fn add_series(
&mut self,
name: &str,
series: Series,
data_type: DataType,
) -> Result<(), Error> {
if series.len() == self.rows {
self.fields.push(Field::new(name, data_type, true));
self.data.push(series);
Ok(())
} else {
Err(Error::RowsNotMatch)
}
}
#[inline]
pub fn add_series0(&mut self, name: &str, series: Series) -> Result<(), Error> {
let dt = series.data_type().clone();
self.add_series(name, series, dt)
}
pub fn try_series_sliced(&self, offset: usize, length: usize) -> Result<Vec<Series>, Error> {
if offset + length <= self.rows {
Ok(self.data.iter().map(|d| d.sliced(offset, length)).collect())
} else {
Err(Error::OutOfBounds)
}
}
#[inline]
pub fn try_chunk_sliced(
&self,
offset: usize,
length: usize,
) -> Result<Chunk<Box<dyn Array>>, Error> {
let series = self.try_series_sliced(offset, length)?;
Ok(Chunk::new(series))
}
pub fn try_sliced(&self, offset: usize, length: usize) -> Result<Self, Error> {
if offset + length <= self.rows {
Ok(Self {
data: self.data.iter().map(|d| d.sliced(offset, length)).collect(),
rows: length,
fields: self.fields.clone(),
})
} else {
Err(Error::OutOfBounds)
}
}
#[inline]
pub fn schema(&self) -> Schema {
Schema::from(self.fields.clone())
}
#[inline]
pub fn rows(&self) -> usize {
self.rows
}
pub fn size(&self) -> usize {
let mut size = 0;
for d in &self.data {
let m = match d.data_type() {
DataType::Boolean => 1,
DataType::Int16 => 2,
DataType::Int32 | DataType::Float32 => 4,
_ => 8,
};
size += d.len() * m;
}
size
}
pub fn set_ordering(&mut self, cols: &[&str]) {
for (i, col) in cols.iter().enumerate() {
if let Some(pos) = self.fields.iter().position(|r| &r.name == col) {
if pos != i {
self.fields.swap(i, pos);
self.data.swap(i, pos);
}
}
}
}
pub fn into_ipc_parts(self) -> (Schema, Chunk<Box<dyn Array + 'static>>) {
let schema = Schema::from(self.fields);
let chunk = Chunk::new(self.data);
(schema, chunk)
}
pub fn into_ipc_block(self) -> Result<Vec<u8>, ArrowError> {
let mut buf = Vec::new();
let schema = self.schema();
let chunk = Chunk::new(self.data);
let mut writer = StreamWriter::new(&mut buf, WriteOptions::default());
writer.start(&schema, None)?;
writer.write(&chunk, None)?;
writer.finish()?;
Ok(buf)
}
}
impl From<DataFrame> for Chunk<Box<dyn Array>> {
#[inline]
fn from(df: DataFrame) -> Self {
Chunk::new(df.data)
}
}
impl TryFrom<DataFrame> for Vec<u8> {
type Error = ArrowError;
#[inline]
fn try_from(df: DataFrame) -> Result<Self, Self::Error> {
df.into_ipc_block()
}
}
#[cfg(feature = "polars")]
impl From<DataFrame> for polars::frame::DataFrame {
fn from(df: DataFrame) -> polars::frame::DataFrame {
let (fields, data) = df.into_parts();
let polars_series = unsafe {
data.into_iter()
.zip(fields)
.map(|(d, f)| {
polars::series::Series::from_chunks_and_dtype_unchecked(
&f.name,
vec![d],
&f.data_type().into(),
)
})
.collect::<Vec<polars::series::Series>>()
};
polars::frame::DataFrame::new_no_checks(polars_series)
}
}