pub mod error;
mod store;
mod store_mut;
use {
error::{CsvStorageError, ResultExt},
gluesql_core::{
ast::{ColumnDef, ColumnUniqueOption, DataType},
data::{Key, Schema, Value},
error::Result,
parse_sql::parse_data_type,
store::{
AlterTable, CustomFunction, CustomFunctionMut, DataRow, Index, IndexMut, Metadata,
Transaction,
},
translate::translate_data_type,
},
std::{
collections::HashMap,
fs::{self, File},
io::Read,
path::{Path, PathBuf},
},
};
type RowIter = Box<dyn Iterator<Item = Result<(Key, DataRow)>>>;
pub struct CsvStorage {
pub path: PathBuf,
}
impl CsvStorage {
pub fn new<T: AsRef<Path>>(path: T) -> Result<Self> {
let path = path.as_ref();
fs::create_dir_all(path).map_storage_err()?;
let path = PathBuf::from(path);
Ok(Self { path })
}
fn fetch_schema(&self, table_name: &str) -> Result<Option<(Schema, bool)>> {
let schema_path = self.schema_path(table_name);
if !schema_path.exists() {
let data_path = self.data_path(table_name);
let types_path = self.types_path(table_name);
let column_defs = match (types_path.exists(), data_path.exists()) {
(false, false) => return Ok(None),
(false, true) => Some(
csv::Reader::from_path(data_path)
.map_storage_err()?
.headers()
.map_storage_err()?
.into_iter()
.map(|header| ColumnDef {
name: header.to_string(),
data_type: DataType::Text,
unique: None,
default: None,
nullable: true,
comment: None,
})
.collect::<Vec<_>>(),
),
(true, _) => None,
};
let schema = Schema {
table_name: table_name.to_owned(),
column_defs,
indexes: Vec::new(),
engine: None,
foreign_keys: Vec::new(),
comment: None,
};
return Ok(Some((schema, true)));
}
let mut file = File::open(&schema_path).map_storage_err()?;
let mut ddl = String::new();
file.read_to_string(&mut ddl).map_storage_err()?;
let schema = Schema::from_ddl(&ddl)?;
if schema.table_name != table_name {
return Err(CsvStorageError::TableNameDoesNotMatchWithFile.into());
}
Ok(Some((schema, false)))
}
fn path_by(&self, table_name: &str, extension: &str) -> PathBuf {
let path = self.path.as_path();
let mut path = path.join(table_name);
path.set_extension(extension);
path
}
fn schema_path(&self, table_name: &str) -> PathBuf {
self.path_by(table_name, "sql")
}
fn data_path(&self, table_name: &str) -> PathBuf {
self.path_by(table_name, "csv")
}
fn tmp_data_path(&self, table_name: &str) -> PathBuf {
self.path_by(table_name, "tmp.csv")
}
fn types_path(&self, table_name: &str) -> PathBuf {
self.path_by(table_name, "types.csv")
}
fn tmp_types_path(&self, table_name: &str) -> PathBuf {
self.path_by(table_name, "types.tmp.csv")
}
fn scan_data(&self, table_name: &str) -> Result<(Option<Vec<String>>, RowIter)> {
let data_path = self.data_path(table_name);
let (schema, generated) = match (self.fetch_schema(table_name)?, data_path.exists()) {
(None, _) | (_, false) => return Ok((None, Box::new(std::iter::empty()))),
(Some(v), true) => v,
};
let mut data_rdr = csv::Reader::from_path(data_path).map_storage_err()?;
let mut fetch_data_header_columns = || -> Result<Vec<String>> {
Ok(data_rdr
.headers()
.map_storage_err()?
.into_iter()
.map(|header| header.to_string())
.collect::<Vec<_>>())
};
if let Schema {
column_defs: Some(column_defs),
..
} = schema
{
let columns = column_defs
.iter()
.map(|column_def| column_def.name.to_owned())
.collect::<Vec<_>>();
let rows = data_rdr
.into_records()
.enumerate()
.map(move |(index, record)| {
let mut key: Option<Key> = None;
let values = record
.map_storage_err()?
.into_iter()
.zip(column_defs.iter())
.map(|(value, column_def)| {
let value = match value {
"NULL" => Value::Null,
_ => Value::Str(value.to_owned()),
};
let value = match &column_def.data_type {
DataType::Text => value,
data_type => value.cast(data_type)?,
};
if column_def.unique == Some(ColumnUniqueOption { is_primary: true }) {
key = Key::try_from(&value).map(Some)?;
}
Ok(value)
})
.collect::<Result<Vec<Value>>>()?;
let key = key.unwrap_or(Key::U64(index as u64));
let row = DataRow::Vec(values);
Ok((key, row))
});
Ok((Some(columns), Box::new(rows)))
} else if self.types_path(table_name).exists() {
let types_path = self.types_path(table_name);
let types_rdr = csv::Reader::from_path(types_path)
.map_storage_err()?
.into_records();
let columns = fetch_data_header_columns()?;
let rows = data_rdr.into_records().zip(types_rdr).enumerate().map(
move |(index, (record, types))| {
let key = Key::U64(index as u64);
let record = record.map_storage_err()?;
let types = types.map_storage_err()?;
record
.into_iter()
.zip(columns.iter())
.zip(&types)
.filter_map(|((value, column), data_type)| {
if data_type.is_empty() {
return None;
}
let value = if data_type == "NULL" {
Ok(Value::Null)
} else {
parse_data_type(data_type).and_then(|data_type| {
let data_type = translate_data_type(&data_type)?;
let value = Value::Str(value.to_owned());
match data_type {
DataType::Text => Ok(value),
data_type => value.cast(&data_type),
}
})
};
Some(value.map(|value| (column.clone(), value)))
})
.collect::<Result<HashMap<String, Value>>>()
.map(DataRow::Map)
.map(|row| (key, row))
},
);
Ok((None, Box::new(rows)))
} else {
let columns = fetch_data_header_columns()?;
let rows = {
let columns = columns.clone();
data_rdr
.into_records()
.enumerate()
.map(move |(index, record)| {
let key = Key::U64(index as u64);
let row = record
.map_storage_err()?
.into_iter()
.zip(columns.iter())
.map(|(value, column)| (column.clone(), Value::Str(value.to_owned())))
.collect::<HashMap<String, Value>>();
Ok((key, DataRow::Map(row)))
})
};
Ok((generated.then_some(columns), Box::new(rows)))
}
}
}
impl AlterTable for CsvStorage {}
impl CustomFunction for CsvStorage {}
impl CustomFunctionMut for CsvStorage {}
impl Index for CsvStorage {}
impl IndexMut for CsvStorage {}
impl Transaction for CsvStorage {}
impl Metadata for CsvStorage {}