use std::{path::PathBuf, sync::Arc};
use anyhow::{Context, Result, anyhow, bail, ensure};
use arrow_array::{
ArrayRef, BooleanArray, Float32Array, Int16Array, Int32Array, RecordBatch, StringArray,
UInt16Array, UInt32Array, UInt64Array,
builder::{
Float32Builder, Int16Builder, Int32Builder, ListBuilder, StringBuilder, UInt16Builder,
UInt32Builder, UInt64Builder,
},
};
use crate::{
dat::ivy_schema::{ColumnSchema, DatTableSchema, SchemaCollection},
file_parsers::{
FileParser,
dat::{DatParser, types::DatFile},
},
fs::{FS, FileSystem},
};
fn parse_foreignrow(bytes: &[u8]) -> u64 {
u128::from_le_bytes(bytes.try_into().unwrap()) as u64
}
fn parse_maybe_foreignrow(bytes: &[u8]) -> Option<u64> {
if bytes == [0xfe; 16] {
None
} else {
Some(parse_foreignrow(bytes))
}
}
fn parse_maybe_row(bytes: &[u8]) -> Option<u64> {
if bytes == [0xfe; 8] {
None
} else {
Some(parse_u64(bytes))
}
}
fn parse_u64(bytes: &[u8]) -> u64 {
u64::from_le_bytes(bytes.try_into().unwrap())
}
fn parse_u32(bytes: &[u8]) -> u32 {
u32::from_le_bytes(bytes.try_into().unwrap())
}
fn parse_i32(bytes: &[u8]) -> i32 {
i32::from_le_bytes(bytes.try_into().unwrap())
}
fn parse_f32(bytes: &[u8]) -> f32 {
f32::from_le_bytes(bytes.try_into().unwrap())
}
fn parse_u16(bytes: &[u8]) -> u16 {
u16::from_le_bytes(bytes.try_into().unwrap())
}
fn parse_i16(bytes: &[u8]) -> i16 {
i16::from_le_bytes(bytes.try_into().unwrap())
}
fn parse_bool(bytes: &[u8]) -> Result<bool> {
assert!(bytes.len() == 1);
ensure!(bytes[0] < 2, "Invalid boolean value: {:?}", bytes[0]);
Ok(bytes[0] == 1)
}
fn parse_column(
table: &DatFile,
column: &ColumnSchema,
cur_offset: usize,
) -> Result<(usize, Result<ArrayRef>)> {
let (bytes_taken, series) = match (column.array, column.interval) {
(true, false) => {
let series = match column.column_type.as_str() {
"array" => Err(anyhow!("Unknown array type")),
"string" => table
.view_col_as_array_of_strings(cur_offset)?
.collect::<Result<Vec<_>>>()
.map(|s| {
let mut builder = ListBuilder::new(StringBuilder::new());
for row in s {
for val in row {
builder.values().append_option(val)
}
builder.append(true);
}
builder.finish()
}),
"foreignrow" => table
.view_col_as_array_of(cur_offset, 16, parse_foreignrow)?
.collect::<Result<Vec<_>>>()
.map(|s| {
let mut builder = ListBuilder::new(UInt64Builder::new());
for row in s {
for val in row {
builder.values().append_value(val)
}
builder.append(true);
}
builder.finish()
}),
"row" => table
.view_col_as_array_of(cur_offset, 8, parse_maybe_row)?
.collect::<Result<Vec<_>>>()
.map(|s| {
let mut builder = ListBuilder::new(UInt64Builder::new());
for row in s {
for val in row {
builder.values().append_option(val)
}
builder.append(true);
}
builder.finish()
}),
"enumrow" => table
.view_col_as_array_of(cur_offset, 4, parse_u32)?
.collect::<Result<Vec<_>>>()
.map(|s| {
let mut builder = ListBuilder::new(UInt32Builder::new());
for row in s {
for val in row {
builder.values().append_value(val)
}
builder.append(true);
}
builder.finish()
}),
"u32" => table
.view_col_as_array_of(cur_offset, 4, parse_u32)?
.collect::<Result<Vec<_>>>()
.map(|s| {
let mut builder = ListBuilder::new(UInt32Builder::new());
for row in s {
for val in row {
builder.values().append_value(val)
}
builder.append(true);
}
builder.finish()
}),
"f32" => table
.view_col_as_array_of(cur_offset, 4, parse_f32)?
.collect::<Result<Vec<_>>>()
.map(|s| {
let mut builder = ListBuilder::new(Float32Builder::new());
for row in s {
for val in row {
builder.values().append_value(val)
}
builder.append(true);
}
builder.finish()
}),
"i32" => table
.view_col_as_array_of(cur_offset, 4, parse_i32)?
.collect::<Result<Vec<_>>>()
.map(|s| {
let mut builder = ListBuilder::new(Int32Builder::new());
for row in s {
for val in row {
builder.values().append_value(val)
}
builder.append(true);
}
builder.finish()
}),
"i16" => table
.view_col_as_array_of(cur_offset, 2, parse_i16)?
.collect::<Result<Vec<_>>>()
.map(|s| {
let mut builder = ListBuilder::new(Int16Builder::new());
for row in s {
for val in row {
builder.values().append_value(val)
}
builder.append(true);
}
builder.finish()
}),
"u16" => table
.view_col_as_array_of(cur_offset, 2, parse_u16)?
.collect::<Result<Vec<_>>>()
.map(|s| {
let mut builder = ListBuilder::new(UInt16Builder::new());
for row in s {
for val in row {
builder.values().append_value(val)
}
builder.append(true);
}
builder.finish()
}),
_ => bail!("Unknown column type: {:?}", column),
}
.map(|s| Arc::new(s) as _);
(16, series)
}
(false, true) => match column.column_type.as_str() {
"i32" => {
let series = table.view_col(cur_offset, 8).map(|values| {
let mut builder = ListBuilder::new(Int32Builder::new());
values.for_each(|bytes| {
bytes
.chunks_exact(4)
.map(parse_i32)
.for_each(|val| builder.values().append_value(val));
builder.append(true);
});
Arc::new(builder.finish()) as _
});
(8, series)
}
_ => bail!("Unknown column type: {:?}", column),
},
(false, false) => match column.column_type.as_str() {
"string" => {
let series = table
.view_col_as_string(cur_offset)
.and_then(|strings| strings.collect::<Result<Vec<_>>>())
.map(|s| Arc::new(StringArray::from(s)) as _);
(8, series)
}
"foreignrow" => {
let series = table
.view_col(cur_offset, 16)
.map(|items| items.map(parse_maybe_foreignrow).collect::<Vec<_>>())
.map(|s| Arc::new(UInt64Array::from(s)) as _);
(16, series)
}
"row" => {
let series = table
.view_col(cur_offset, 8)
.map(|items| items.map(parse_maybe_row).collect::<Vec<_>>())
.map(|s| Arc::new(UInt64Array::from(s)) as _);
(8, series)
}
"enumrow" => {
let series = table
.view_col(cur_offset, 4)
.map(|items| items.map(parse_u32).collect::<Vec<_>>())
.map(|s| Arc::new(UInt32Array::from(s)) as _);
(4, series)
}
"u32" => {
let series = table
.view_col(cur_offset, 4)
.map(|items| items.map(parse_u32).collect::<Vec<_>>())
.map(|s| Arc::new(UInt32Array::from(s)) as _);
(4, series)
}
"f32" => {
let series = table
.view_col(cur_offset, 4)
.map(|items| items.map(parse_f32).collect::<Vec<_>>())
.map(|s| Arc::new(Float32Array::from(s)) as _);
(4, series)
}
"i32" => {
let series = table
.view_col(cur_offset, 4)
.map(|items| items.map(parse_i32).collect::<Vec<_>>())
.map(|s| Arc::new(Int32Array::from(s)) as _);
(4, series)
}
"i16" => {
let series = table
.view_col(cur_offset, 2)
.map(|items| items.map(parse_i16).collect::<Vec<_>>())
.map(|s| Arc::new(Int16Array::from(s)) as _);
(2, series)
}
"u16" => {
let series = table
.view_col(cur_offset, 2)
.map(|items| items.map(parse_u16).collect::<Vec<_>>())
.map(|s| Arc::new(UInt16Array::from(s)) as _);
(2, series)
}
"bool" => {
let series = table
.view_col(cur_offset, 1)
.and_then(|items| items.map(parse_bool).collect::<Result<Vec<_>>>())
.map(|s| Arc::new(BooleanArray::from(s)) as _);
(1, series)
}
_ => bail!("Unknown column type: {:?}", column),
},
_ => bail!("Can't be both array and interval"),
};
Ok((bytes_taken, series))
}
pub fn parse_table(table: &DatFile, schema: &DatTableSchema) -> Result<RecordBatch> {
let column_names = schema.column_names().collect::<Vec<_>>();
let mut parsed_columns = vec![];
let mut cur_offset = 0;
for column in &schema.columns {
let (bytes_taken, series) = parse_column(table, column, cur_offset)
.with_context(|| format!("Failed to parse column: {:?}", column))?;
match series {
Ok(series) => {
log::trace!(
"Successfully parsed column at bytes {}-{}: {:?}",
cur_offset,
cur_offset + bytes_taken,
column
);
parsed_columns.push(series);
}
Err(e) => {
log::error!("Failed to parse column {:?}, skipping: {e:?}", column.name);
}
}
cur_offset += bytes_taken;
}
let df = RecordBatch::try_from_iter(column_names.into_iter().zip(parsed_columns))
.context("Failed to create df")?;
Ok(df)
}
pub fn load_parsed_table(
fs: &mut FS,
schemas: &SchemaCollection,
filename: &str,
version: u32,
) -> Result<RecordBatch> {
let schema = schemas
.tables
.iter()
.filter(|t| t.valid_for == version || t.valid_for == 3)
.find(|t| *t.name.to_lowercase() == *PathBuf::from(&filename).file_stem().unwrap())
.with_context(|| format!("Couldn't find schema for {:?}", filename))?;
let bytes = fs.read(filename)?;
let table = DatParser
.parse(&bytes)
.as_anyhow()
.context("Failed to parse table data")?;
ensure!(!table.rows.is_empty(), "Empty table");
let df = parse_table(&table, schema).context("Failed to apply schema to table")?;
Ok(df)
}