use std::{collections::{BTreeMap, HashMap}, marker::PhantomData, ops::Deref, path::PathBuf};
use derive_more::{Display, Error, From};
use serde::{Deserialize, Serialize};
use tempest_core::{
journal::{Journal, JournalError, JournalHandle, Replayable},
tempest_str::TempestStr,
};
use tempest_io::Io;
use tempest_rt::JoinHandle;
use crate::{
catalog::schema::{
DatabaseId, DatabaseSchema, EnumSchema, EnumVariantDef, FieldDef, FieldId, FlatField,
StructSchema, TableId, TableSchema, TypeExpr, TypeId, TypeSchema, VariantId,
},
config::CatalogConfig,
row::resolved::ResolvedTable,
};
fn resolve_type_args(ref_args: &[TypeExpr], generic_args: &[TypeExpr]) -> Vec<TypeExpr> {
ref_args
.iter()
.map(|arg| match arg {
TypeExpr::GenericParam(i) => generic_args
.get(*i as usize)
.cloned()
.unwrap_or_else(|| arg.clone()),
other => other.clone(),
})
.collect()
}
#[instrument(skip_all, level = "trace")]
pub(crate) fn flatten_schema(
fields: &BTreeMap<FieldId, FieldDef>,
generic_args: &[TypeExpr],
catalog: &CatalogState,
prefix: &str,
) -> Result<Vec<FlatField>, CatalogError> {
let mut result = Vec::new();
for (_, def) in fields {
let field_name: TempestStr<'static> = if prefix.is_empty() {
def.name.clone()
} else {
TempestStr::from_owned(format!("{}{}", prefix, def.name))
.expect("dotted field name does not contain null bytes")
};
match &def.ty {
TypeExpr::Primitive(ty) => {
trace!(name = %field_name, ?ty, "flat primitive field");
result.push(FlatField { name: field_name, ty: *ty, type_args: vec![] });
}
TypeExpr::Ref(type_id, ref_args) => {
let type_schema = catalog
.get_type(*type_id)
.ok_or(CatalogError::TypeNotFound(*type_id))?;
match type_schema {
TypeSchema::Struct(struct_schema) => {
let sub_prefix = format!("{}{}.", prefix, def.name);
trace!(name = %def.name, "recursing into Ref field");
let sub = flatten_schema(&struct_schema.fields, ref_args, catalog, &sub_prefix)?;
result.extend(sub);
}
TypeSchema::Enum(_) => {
trace!(name = %field_name, "enum leaf field");
let resolved_args = resolve_type_args(ref_args, generic_args);
result.push(FlatField {
name: field_name,
ty: crate::types::TempestType::Enum(**type_id),
type_args: resolved_args,
});
}
}
}
TypeExpr::GenericParam(i) => match generic_args.get(*i as usize) {
Some(TypeExpr::Primitive(ty)) => {
trace!(name = %field_name, ?ty, "flat generic-param primitive field");
result.push(FlatField { name: field_name, ty: *ty, type_args: vec![] });
}
Some(TypeExpr::Ref(type_id, ref_args)) => {
let type_schema = catalog
.get_type(*type_id)
.ok_or(CatalogError::TypeNotFound(*type_id))?;
match type_schema {
TypeSchema::Struct(struct_schema) => {
let sub_prefix = format!("{}{}.", prefix, def.name);
trace!(name = %def.name, "recursing into generic-param Ref field");
let sub = flatten_schema(&struct_schema.fields, ref_args, catalog, &sub_prefix)?;
result.extend(sub);
}
TypeSchema::Enum(_) => {
trace!(name = %field_name, "generic-param enum leaf field");
let resolved_args = resolve_type_args(ref_args, generic_args);
result.push(FlatField {
name: field_name,
ty: crate::types::TempestType::Enum(**type_id),
type_args: resolved_args,
});
}
}
}
Some(TypeExpr::GenericParam(_)) => unreachable!("type args must be concrete"),
None => unreachable!("generic param index out of range - catalog is corrupt"),
},
}
}
Ok(result)
}
pub(crate) fn pk_path_to_flat_idx(
path: &[FieldId],
fields: &std::collections::BTreeMap<FieldId, FieldDef>,
generic_args: &[TypeExpr],
catalog: &CatalogState,
flat_fields: &[FlatField],
) -> Option<usize> {
let mut name = String::new();
let mut current_fields = fields;
let mut current_args: std::borrow::Cow<[TypeExpr]> = std::borrow::Cow::Borrowed(generic_args);
for (i, fid) in path.iter().enumerate() {
let def = current_fields.get(fid)?;
if name.is_empty() {
name.push_str(&def.name);
} else {
name.push('.');
name.push_str(&def.name);
}
if i < path.len() - 1 {
let resolved = match &def.ty {
TypeExpr::GenericParam(idx) => current_args.get(*idx as usize)?.clone(),
other => other.clone(),
};
match resolved {
TypeExpr::Ref(type_id, ref_args) => {
let type_schema = catalog.get_type(type_id)?;
let struct_schema = type_schema.as_struct()?;
current_fields = &struct_schema.fields;
current_args = std::borrow::Cow::Owned(ref_args);
}
_ => return None,
}
}
}
flat_fields.iter().position(|ff| ff.name.as_ref() == name.as_str())
}
pub mod schema;
#[cfg(test)]
mod tests;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CatalogEditV1 {
CreateDatabase((DatabaseId, DatabaseSchema)),
CreateTable((TableId, TableSchema)),
CreateType((TypeId, StructSchema)),
CreateEnum((TypeId, EnumSchema)),
Snapshot(Vec<CatalogEdit>),
}
#[repr(u16)]
#[derive(derive_more::Debug, Clone, Serialize, Deserialize)]
pub enum CatalogEdit {
#[debug("{:?}", _0)]
V1(CatalogEditV1) = 1,
}
#[derive(Debug, Display, Error, From)]
pub enum CatalogError {
#[display("journal error: {}", _0)]
JournalError(JournalError),
#[from(skip)]
#[display("database with ID {} was not found", _0)]
DatabaseNotFound(#[error(not(source))] DatabaseId),
#[from(skip)]
#[display("database with name '{}' already exists", _0)]
DatabaseAlreadyExists(#[error(not(source))] TempestStr<'static>),
#[from(skip)]
#[display("table with ID {} was not found", _0)]
TableNotFound(#[error(not(source))] TableId),
#[from(skip)]
#[display("table with name '{}' already exists inside of this scope", _0)]
TableAlreadyExists(#[error(not(source))] TempestStr<'static>),
#[from(skip)]
#[display("type with ID {} was not found", _0)]
TypeNotFound(#[error(not(source))] TypeId),
#[from(skip)]
#[display("type with name '{}' already exists inside of this scope", _0)]
TypeAlreadyExists(#[error(not(source))] TempestStr<'static>),
}
#[derive(Debug, Clone)]
pub struct CatalogState {
next_table_id: TableId,
pub tables: HashMap<TableId, TableSchema>,
next_database_id: DatabaseId,
pub databases: HashMap<DatabaseId, DatabaseSchema>,
next_type_id: TypeId,
pub types: HashMap<TypeId, TypeSchema>,
pub global_types: HashMap<TypeId, TypeSchema>,
}
impl Default for CatalogState {
fn default() -> Self {
let mut global_types = HashMap::new();
let option_schema = TypeSchema::Enum(EnumSchema {
database_id: None,
name: "Option".into(),
generic_params: vec!["T".into()],
variants: {
let mut v = BTreeMap::new();
v.insert(VariantId(0), EnumVariantDef { name: "None".into(), fields: vec![] });
v.insert(VariantId(1), EnumVariantDef { name: "Some".into(), fields: vec![TypeExpr::GenericParam(0)] });
v
},
});
global_types.insert(TypeId(u32::MAX), option_schema);
Self {
next_table_id: TableId::default(),
tables: HashMap::new(),
next_database_id: DatabaseId::default(),
databases: HashMap::new(),
next_type_id: TypeId::default(),
types: HashMap::new(),
global_types,
}
}
}
impl CatalogState {
fn create_database_edit(
&self,
schema: DatabaseSchema,
) -> Result<(DatabaseId, CatalogEdit), CatalogError> {
if self.databases.values().any(|db| db.name == schema.name) {
return Err(CatalogError::DatabaseAlreadyExists(schema.name));
}
let id = self.next_database_id;
trace!(?id, "assigned id to create database edit");
Ok((
id,
CatalogEdit::V1(CatalogEditV1::CreateDatabase((id, schema))),
))
}
fn create_table_edit(
&self,
schema: TableSchema,
) -> Result<(TableId, CatalogEdit), CatalogError> {
let db = self
.databases
.get(&schema.database_id)
.ok_or(CatalogError::DatabaseNotFound(schema.database_id))?;
let id = self.next_table_id;
trace!(?id, "assigned id to create table edit");
if db.tables.iter().any(|id| {
self.tables[id].database_id == schema.database_id && self.tables[id].name == schema.name
}) {
return Err(CatalogError::TableAlreadyExists(schema.name));
}
Ok((
id,
CatalogEdit::V1(CatalogEditV1::CreateTable((id, schema))),
))
}
fn create_type_edit(
&self,
schema: StructSchema,
) -> Result<(TypeId, CatalogEdit), CatalogError> {
if self
.types
.values()
.any(|t| t.database_id() == schema.database_id && t.name() == &schema.name)
{
return Err(CatalogError::TypeAlreadyExists(schema.name));
}
let id = self.next_type_id;
trace!(?id, "assigned id to create type edit");
Ok((id, CatalogEdit::V1(CatalogEditV1::CreateType((id, schema)))))
}
fn create_enum_edit(
&self,
schema: EnumSchema,
) -> Result<(TypeId, CatalogEdit), CatalogError> {
if self
.types
.values()
.any(|t| t.database_id() == schema.database_id && t.name() == &schema.name)
{
return Err(CatalogError::TypeAlreadyExists(schema.name));
}
let id = self.next_type_id;
trace!(?id, "assigned id to create enum edit");
Ok((id, CatalogEdit::V1(CatalogEditV1::CreateEnum((id, schema)))))
}
pub(crate) fn get_database_by_name(
&self,
name: &TempestStr,
) -> Option<(DatabaseId, &DatabaseSchema)> {
for (&id, schema) in &self.databases {
if schema.name == *name {
return Some((id, schema));
}
}
None
}
pub(crate) fn get_table_by_name(
&self,
database_id: DatabaseId,
name: &TempestStr,
) -> Option<(TableId, &TableSchema)> {
for (&id, schema) in &self.tables {
if schema.database_id == database_id && schema.name == *name {
return Some((id, schema));
}
}
None
}
pub(crate) fn get_type_by_name(
&self,
database_id: DatabaseId,
name: &TempestStr,
) -> Option<(TypeId, &TypeSchema)> {
for (&id, schema) in &self.types {
if schema.database_id() == Some(database_id) && schema.name() == name {
return Some((id, schema));
}
}
None
}
pub fn get_type(&self, id: TypeId) -> Option<&TypeSchema> {
self.types.get(&id).or_else(|| self.global_types.get(&id))
}
pub(crate) fn get_global_type_by_name(&self, name: &TempestStr) -> Option<(TypeId, &TypeSchema)> {
self.global_types.iter().find(|(_, s)| s.name() == name).map(|(&id, s)| (id, s))
}
pub fn pk_path_name(
&self,
path: &[FieldId],
table_schema: &TableSchema,
) -> String {
let struct_schema = self.get_type(table_schema.type_id)
.expect("type not found in catalog")
.as_struct()
.expect("table type must be a struct");
let mut name = String::new();
let mut current_fields = &struct_schema.fields;
let mut current_args: std::borrow::Cow<[TypeExpr]> =
std::borrow::Cow::Borrowed(&table_schema.generic_args);
for (i, fid) in path.iter().enumerate() {
let def = ¤t_fields[fid];
if name.is_empty() {
name.push_str(&def.name);
} else {
name.push('.');
name.push_str(&def.name);
}
if i < path.len() - 1 {
let resolved = match &def.ty {
TypeExpr::GenericParam(idx) => current_args[*idx as usize].clone(),
other => other.clone(),
};
if let TypeExpr::Ref(type_id, ref_args) = resolved {
if let Some(ts) = self.get_type(type_id) {
if let Some(s) = ts.as_struct() {
current_fields = &s.fields;
current_args = std::borrow::Cow::Owned(ref_args);
}
}
}
}
}
name
}
pub fn tables_in_database(
&self,
database: &str,
) -> impl Iterator<Item = (TableId, &TableSchema)> {
self.databases
.iter()
.filter(|(_, db)| Some(&db.name) == TempestStr::from_borrowed(database).ok().as_ref())
.map(|(_, db)| db.tables.iter().map(|t| (*t, &self.tables[t])))
.flatten()
}
pub fn types_in_database(
&self,
database: &str,
) -> impl Iterator<Item = (TypeId, &TypeSchema)> {
self.databases
.iter()
.filter(|(_, db)| Some(&db.name) == TempestStr::from_borrowed(database).ok().as_ref())
.map(|(_, db)| db.types.iter().map(|t| (*t, &self.types[t])))
.flatten()
}
pub(crate) fn resolved_table(&self, table_id: TableId) -> ResolvedTable<'_> {
let table_schema = self
.tables
.get(&table_id)
.expect("table not found in catalog");
let struct_schema = self
.get_type(table_schema.type_id)
.expect("type not found in catalog")
.as_struct()
.expect("table type must be a struct");
let flat_fields = flatten_schema(
&struct_schema.fields,
&table_schema.generic_args,
self,
"",
)
.expect("flat schema build failed - catalog is inconsistent");
let primary_key = table_schema.primary_key.iter()
.map(|path| pk_path_to_flat_idx(path, &struct_schema.fields, &table_schema.generic_args, self, &flat_fields)
.expect("pk path not found in flat fields - catalog is inconsistent"))
.collect();
ResolvedTable {
id: table_id,
fields: &struct_schema.fields,
generic_args: &table_schema.generic_args,
primary_key,
flat_fields,
}
}
}
impl Replayable for CatalogState {
type Edit = CatalogEdit;
#[instrument(skip_all, level = "debug")]
fn apply(&mut self, edit: Self::Edit) {
match edit {
CatalogEdit::V1(v1) => match v1 {
CatalogEditV1::CreateDatabase((id, schema)) => {
debug!(?id, ?schema, "applying create database edit");
assert!(!self.databases.contains_key(&id));
self.next_database_id = DatabaseId(*id + 1).max(self.next_database_id);
self.databases.insert(id, schema);
}
CatalogEditV1::CreateTable((id, schema)) => {
debug!(?id, ?schema, "applying create table edit");
assert!(!self.tables.contains_key(&id));
self.next_table_id = TableId(*id + 1).max(self.next_table_id);
self.databases
.get_mut(&schema.database_id)
.expect("database must exist when applying CreateTable")
.tables
.insert(id);
self.tables.insert(id, schema);
}
CatalogEditV1::CreateType((id, schema)) => {
debug!(?id, ?schema, "applying create type edit");
assert!(!self.types.contains_key(&id));
self.next_type_id = TypeId(*id + 1).max(self.next_type_id);
if let Some(db_id) = schema.database_id {
self.databases
.get_mut(&db_id)
.expect("database must exist when applying CreateType")
.types
.insert(id);
}
self.types.insert(id, TypeSchema::Struct(schema));
}
CatalogEditV1::CreateEnum((id, schema)) => {
debug!(?id, ?schema, "applying create enum edit");
assert!(!self.types.contains_key(&id));
self.next_type_id = TypeId(*id + 1).max(self.next_type_id);
if let Some(db_id) = schema.database_id {
self.databases
.get_mut(&db_id)
.expect("database must exist when applying CreateEnum")
.types
.insert(id);
}
self.types.insert(id, TypeSchema::Enum(schema));
}
CatalogEditV1::Snapshot(edits) => {
debug!(count = edits.len(), "applying snapshot edits");
for e in edits {
self.apply(e);
}
}
},
}
}
fn snapshot(&self) -> Self::Edit {
let mut edits = Vec::new();
edits.extend(self.databases.iter().map(|(id, schema)| {
CatalogEdit::V1(CatalogEditV1::CreateDatabase((id.clone(), schema.clone())))
}));
edits.extend(self.types.iter().map(|(id, schema)| match schema {
TypeSchema::Struct(s) => CatalogEdit::V1(CatalogEditV1::CreateType((*id, s.clone()))),
TypeSchema::Enum(e) => CatalogEdit::V1(CatalogEditV1::CreateEnum((*id, e.clone()))),
}));
edits.extend(self.tables.iter().map(|(id, schema)| {
CatalogEdit::V1(CatalogEditV1::CreateTable((id.clone(), schema.clone())))
}));
CatalogEdit::V1(CatalogEditV1::Snapshot(edits))
}
fn filename_prefix() -> &'static str {
"catalog"
}
fn initial() -> Self {
CatalogState::default()
}
}
pub(crate) struct Catalog<I: Io> {
data: CatalogState,
journal: JournalHandle<CatalogState>,
journal_handle: JoinHandle<()>,
_marker: PhantomData<I>,
}
impl<I: Io> Catalog<I> {
#[instrument(skip_all, level = "info")]
pub(crate) async fn open(dir: PathBuf, config: CatalogConfig) -> Result<Self, CatalogError> {
info!("opening catalog at {:?}", dir);
let (journal, journal_handle) =
Journal::<CatalogState, I>::new(dir, config.journal.clone()).await?;
let data = journal.data().clone();
info!("finished opening catalog");
Ok(Self {
data,
journal,
journal_handle,
_marker: PhantomData,
})
}
#[instrument(skip_all, level = "info")]
pub(crate) async fn create_database(
&mut self,
schema: DatabaseSchema,
) -> Result<DatabaseId, CatalogError> {
let (id, edit) = self.create_database_edit(schema)?;
debug!("perstisting database schema to journal");
self.journal.append(edit.clone()).await?;
self.data.apply(edit);
Ok(id)
}
#[instrument(skip_all, level = "info")]
pub(crate) async fn create_table(
&mut self,
schema: TableSchema,
) -> Result<TableId, CatalogError> {
let (id, edit) = self.create_table_edit(schema)?;
debug!("perstisting table schema to journal");
self.journal.append(edit.clone()).await?;
self.data.apply(edit);
Ok(id)
}
#[instrument(skip_all, level = "info")]
pub(crate) async fn create_type(
&mut self,
schema: StructSchema,
) -> Result<TypeId, CatalogError> {
let (id, edit) = self.create_type_edit(schema)?;
debug!("persisting type schema to journal");
self.journal.append(edit.clone()).await?;
self.data.apply(edit);
Ok(id)
}
#[instrument(skip_all, level = "info")]
pub(crate) async fn create_enum(
&mut self,
schema: EnumSchema,
) -> Result<TypeId, CatalogError> {
let (id, edit) = self.create_enum_edit(schema)?;
debug!("persisting enum schema to journal");
self.journal.append(edit.clone()).await?;
self.data.apply(edit);
Ok(id)
}
pub(crate) async fn shutdown(self) -> Result<(), CatalogError> {
drop(self.journal);
let _ = self.journal_handle.await;
Ok(())
}
}
impl<I: Io> Deref for Catalog<I> {
type Target = CatalogState;
fn deref(&self) -> &Self::Target {
&self.data
}
}
#[cfg(test)]
pub(crate) mod testing {
use std::collections::BTreeMap;
use crate::{
catalog::schema::{FieldDef, FieldId, TypeExpr},
types::TempestType,
};
use super::*;
pub(crate) fn create_catalog_state_for_testing() -> CatalogState {
let mut state = CatalogState::initial();
let (db_id, edit) = state
.create_database_edit(DatabaseSchema::new("main".into()))
.unwrap();
state.apply(edit);
let mut fields = BTreeMap::new();
fields.insert(
FieldId(0),
FieldDef {
name: "id".into(),
ty: TypeExpr::Primitive(TempestType::Int64),
},
);
fields.insert(
FieldId(1),
FieldDef {
name: "name".into(),
ty: TypeExpr::Primitive(TempestType::String),
},
);
let (type_id, edit) = state
.create_type_edit(StructSchema {
database_id: Some(db_id),
name: "User".into(),
generic_params: Vec::new(),
fields,
})
.unwrap();
state.apply(edit);
let (_, edit) = state
.create_table_edit(TableSchema {
database_id: db_id,
name: "users".into(),
type_id,
generic_args: Vec::new(),
primary_key: vec![vec![FieldId(0)]],
})
.unwrap();
state.apply(edit);
state
}
}