use arrow::datatypes::{Field as ArrowField, Schema as ArrowSchema};
use evolution_builder::builder::{Builder, ColumnBuilderRef};
use evolution_common::datatype::DataType;
use evolution_common::error::Result;
use serde::{Deserialize, Serialize};
use std::fs;
use std::path::PathBuf;
use crate::column::FixedColumn;
pub trait Schema {}
pub type SchemaRef = Box<dyn Schema>;
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct FixedSchema {
name: String,
version: usize,
columns: Vec<FixedColumn>,
}
impl FixedSchema {
pub fn new(name: String, version: usize, columns: Vec<FixedColumn>) -> Self {
Self {
name,
version,
columns,
}
}
pub fn from_path(path: PathBuf) -> Result<Self> {
let schema: Self = serde_json::from_slice(&fs::read(path)?)?;
Ok(schema)
}
pub fn name(&self) -> &str {
&self.name
}
pub fn version(&self) -> usize {
self.version
}
pub fn columns(&self) -> &Vec<FixedColumn> {
&self.columns
}
pub fn num_columns(&self) -> usize {
self.columns.len()
}
pub fn row_length(&self) -> usize {
self.columns.iter().map(|c| c.length()).sum()
}
pub fn column_names(&self) -> Vec<&String> {
self.columns
.iter()
.map(|c| c.name())
.collect::<Vec<&String>>()
}
pub fn column_offsets(&self) -> Vec<usize> {
self.columns
.iter()
.map(|c| c.offset())
.collect::<Vec<usize>>()
}
pub fn column_lengths(&self) -> Vec<usize> {
self.columns
.iter()
.map(|c| c.length())
.collect::<Vec<usize>>()
}
pub fn nullable_columns(&self) -> Vec<&FixedColumn> {
self.columns
.iter()
.filter(|c| c.is_nullable())
.collect::<Vec<&FixedColumn>>()
}
pub fn not_nullable_columns(&self) -> Vec<&FixedColumn> {
self.columns
.iter()
.filter(|c| !c.is_nullable())
.collect::<Vec<&FixedColumn>>()
}
pub fn dtypes(&self) -> Vec<DataType> {
self.columns
.iter()
.map(|c| c.dtype())
.collect::<Vec<DataType>>()
}
pub fn iter(&self) -> FixedSchemaIterator {
FixedSchemaIterator {
columns: &self.columns,
index: 0,
}
}
pub fn into_arrow_schema(self) -> ArrowSchema {
let fields = self
.columns
.iter()
.map(|c| ArrowField::new(c.name(), c.as_arrow_dtype(), c.is_nullable()))
.collect::<Vec<ArrowField>>();
ArrowSchema::new(fields)
}
pub fn into_builder<T>(self) -> T
where
T: Builder,
{
let column_builders = self
.columns
.iter()
.map(|c| c.as_column_builder())
.collect::<Vec<ColumnBuilderRef>>();
T::from(column_builders)
}
}
impl Schema for FixedSchema {}
pub struct FixedSchemaIterator<'a> {
columns: &'a Vec<FixedColumn>,
index: usize,
}
impl<'a> Iterator for FixedSchemaIterator<'a> {
type Item = &'a FixedColumn;
fn next(&mut self) -> Option<Self::Item> {
if self.index < self.columns.len() {
Some(
&self.columns[{
self.index += 1;
self.index - 1
}],
)
} else {
None
}
}
}
#[cfg(test)]
mod tests_schema {
use super::*;
use padder::{Alignment, Symbol};
#[test]
fn test_deserialize_schema_from_file() {
let mut path: PathBuf = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("res/test_valid_schema.json");
let columns: Vec<FixedColumn> = vec![
FixedColumn::new(
String::from("id"),
0 as usize,
9 as usize,
DataType::Int32,
Alignment::Right,
Symbol::Whitespace,
false,
),
FixedColumn::new(
String::from("NotCoolColumn"),
9 as usize,
149 as usize,
DataType::LargeUtf8,
Alignment::Left,
Symbol::Five,
false,
),
];
let a: FixedSchema = FixedSchema::from_path(path).unwrap();
let b: FixedSchema = FixedSchema::new(String::from("ValidTestSchema"), 8914781578, columns);
assert_eq!(a.name(), b.name());
assert_ne!(a.version(), b.version());
let a_columns: &Vec<FixedColumn> = a.columns();
let b_columns: &Vec<FixedColumn> = b.columns();
assert_eq!(a_columns[0].name(), b_columns[0].name());
assert_eq!(a_columns[0].offset(), b_columns[0].offset());
assert_eq!(a_columns[0].length(), b_columns[0].length());
assert_eq!(a_columns[0].dtype(), b_columns[0].dtype());
assert_eq!(a_columns[0].alignment(), b_columns[0].alignment());
assert_eq!(a_columns[0].pad_symbol(), b_columns[0].pad_symbol());
assert_eq!(a_columns[0].is_nullable(), b_columns[0].is_nullable());
assert_ne!(a_columns[1].name(), b_columns[1].name());
assert_eq!(a_columns[1].offset(), b_columns[1].offset());
assert_ne!(a_columns[1].length(), b_columns[1].length());
assert_ne!(a_columns[1].dtype(), b_columns[1].dtype());
assert_ne!(a_columns[1].alignment(), b_columns[1].alignment());
assert_ne!(a_columns[1].pad_symbol(), b_columns[1].pad_symbol());
assert_eq!(a_columns[1].is_nullable(), b_columns[1].is_nullable());
assert_ne!(a.num_columns(), b.num_columns());
assert_eq!(a.row_length(), b.row_length());
assert_ne!(a.column_names(), b.column_names());
assert_ne!(a.column_offsets(), b.column_offsets());
assert_ne!(a.column_lengths(), b.column_lengths());
assert_ne!(a.nullable_columns(), b.nullable_columns());
assert_ne!(a.not_nullable_columns(), b.not_nullable_columns());
assert_ne!(a.dtypes(), b.dtypes());
}
#[test]
#[should_panic]
fn test_deserialize_invalid_schema_from_file() {
let mut path: PathBuf = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("res/test_invalid_schema.json");
let _: FixedSchema = FixedSchema::from_path(path).unwrap();
}
#[test]
fn test_iterate_schema_columns() {
let columns: Vec<FixedColumn> = vec![
FixedColumn::new(
String::from("id"),
0 as usize,
9 as usize,
DataType::Int32,
Alignment::Right,
Symbol::Whitespace,
false,
),
FixedColumn::new(
String::from("NotCoolColumn"),
9 as usize,
149 as usize,
DataType::LargeUtf8,
Alignment::Left,
Symbol::Five,
false,
),
];
let a: FixedSchema = FixedSchema::new(String::from("ValidTestSchema"), 8914781578, columns);
let mut iterator: FixedSchemaIterator = a.iter();
let c1: &FixedColumn = iterator.next().unwrap();
assert_eq!("id", c1.name());
assert_eq!(9, c1.length());
let c2: &FixedColumn = iterator.next().unwrap();
assert_eq!("NotCoolColumn", c2.name());
assert_ne!(DataType::Boolean, c2.dtype());
assert_eq!(None, iterator.next());
}
}