use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use super::ExprDataType;
use crate::error::{Error, Result};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ColumnMeta {
pub name: String,
pub data_type: ExprDataType,
pub nullable: bool,
pub description: Option<String>,
}
impl ColumnMeta {
pub fn new(
name: impl Into<String>,
data_type: ExprDataType,
nullable: bool,
description: Option<String>,
) -> Self {
Self {
name: name.into(),
data_type,
nullable,
description,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExprSchema {
columns: HashMap<String, ColumnMeta>,
}
impl ExprSchema {
pub fn new() -> Self {
Self {
columns: HashMap::new(),
}
}
pub fn add_column(&mut self, meta: ColumnMeta) -> &mut Self {
self.columns.insert(meta.name.clone(), meta);
self
}
pub fn column(&self, name: &str) -> Option<&ColumnMeta> {
self.columns.get(name)
}
pub fn column_names(&self) -> Vec<&str> {
self.columns.keys().map(|s| s.as_str()).collect()
}
pub fn has_column(&self, name: &str) -> bool {
self.columns.contains_key(name)
}
pub fn len(&self) -> usize {
self.columns.len()
}
pub fn is_empty(&self) -> bool {
self.columns.is_empty()
}
pub fn columns(&self) -> &HashMap<String, ColumnMeta> {
&self.columns
}
#[cfg(feature = "distributed")]
pub fn from_arrow_schema(schema: &arrow::datatypes::Schema) -> Result<Self> {
let mut result = Self::new();
for field in schema.fields() {
let name = field.name().clone();
let data_type = arrow_type_to_expr_type(field.data_type())?;
let nullable = field.is_nullable();
let meta = ColumnMeta::new(name, data_type, nullable, None);
result.add_column(meta);
}
Ok(result)
}
#[cfg(feature = "distributed")]
pub fn to_arrow_schema(&self) -> Result<arrow::datatypes::Schema> {
let mut fields = Vec::with_capacity(self.columns.len());
for meta in self.columns.values() {
let arrow_type = expr_type_to_arrow_type(&meta.data_type)?;
let field = arrow::datatypes::Field::new(&meta.name, arrow_type, meta.nullable);
fields.push(field);
}
Ok(arrow::datatypes::Schema::new(fields))
}
}
#[cfg(feature = "distributed")]
pub fn arrow_type_to_expr_type(arrow_type: &arrow::datatypes::DataType) -> Result<ExprDataType> {
match arrow_type {
arrow::datatypes::DataType::Boolean => Ok(ExprDataType::Boolean),
arrow::datatypes::DataType::Int8
| arrow::datatypes::DataType::Int16
| arrow::datatypes::DataType::Int32
| arrow::datatypes::DataType::Int64
| arrow::datatypes::DataType::UInt8
| arrow::datatypes::DataType::UInt16
| arrow::datatypes::DataType::UInt32
| arrow::datatypes::DataType::UInt64 => Ok(ExprDataType::Integer),
arrow::datatypes::DataType::Float16
| arrow::datatypes::DataType::Float32
| arrow::datatypes::DataType::Float64 => Ok(ExprDataType::Float),
arrow::datatypes::DataType::Utf8 | arrow::datatypes::DataType::LargeUtf8 => {
Ok(ExprDataType::String)
}
arrow::datatypes::DataType::Date32 | arrow::datatypes::DataType::Date64 => {
Ok(ExprDataType::Date)
}
arrow::datatypes::DataType::Timestamp(_, _) => Ok(ExprDataType::Timestamp),
_ => Err(Error::NotImplemented(format!(
"Conversion of Arrow data type {:?} to expression type is not implemented",
arrow_type
))),
}
}
#[cfg(feature = "distributed")]
pub fn expr_type_to_arrow_type(expr_type: &ExprDataType) -> Result<arrow::datatypes::DataType> {
match expr_type {
ExprDataType::Boolean => Ok(arrow::datatypes::DataType::Boolean),
ExprDataType::Integer => Ok(arrow::datatypes::DataType::Int64),
ExprDataType::Float => Ok(arrow::datatypes::DataType::Float64),
ExprDataType::String => Ok(arrow::datatypes::DataType::Utf8),
ExprDataType::Date => Ok(arrow::datatypes::DataType::Date32),
ExprDataType::Timestamp => Ok(arrow::datatypes::DataType::Timestamp(
arrow::datatypes::TimeUnit::Microsecond,
None,
)),
}
}