use core::fmt;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct TableId(pub u32);
impl fmt::Display for TableId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "table:{}", self.0)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct EdgeId(pub u32);
impl fmt::Display for EdgeId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "edge:{}", self.0)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct NodeKey(pub u64);
impl NodeKey {
#[must_use]
pub const fn registered(table: TableId, primary_key: u64) -> Self {
Self(((table.0 as u64) << 32) | primary_key)
}
#[must_use]
pub const fn table_id(self) -> TableId {
TableId((self.0 >> 32) as u32)
}
#[must_use]
pub const fn primary_key(self) -> u64 {
self.0 & 0xFFFF_FFFF
}
}
impl fmt::Display for NodeKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "node:{}:{}", self.table_id(), self.primary_key())
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct FilterColumn {
pub table: TableId,
pub column: String,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct RegisteredTable {
pub id: TableId,
pub schema: String,
pub name: String,
pub primary_key_column: String,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct RegisteredEdge {
pub id: EdgeId,
pub source_table: TableId,
pub target_table: TableId,
pub source_column: String,
pub target_column: String,
pub schema: String,
pub name: String,
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Catalog {
pub tables: Vec<RegisteredTable>,
pub edges: Vec<RegisteredEdge>,
pub filter_columns: Vec<FilterColumn>,
}
impl Catalog {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub const fn validate_for_build(&self) -> Result<(), CatalogError> {
if self.tables.is_empty() {
return Err(CatalogError::EmptyCatalog);
}
Ok(())
}
pub fn add_table(&mut self, table: RegisteredTable) -> Result<(), CatalogError> {
if self.tables.iter().any(|existing| existing.id == table.id) {
return Err(CatalogError::DuplicateTableId(table.id));
}
if self
.tables
.iter()
.any(|existing| existing.schema == table.schema && existing.name == table.name)
{
return Err(CatalogError::DuplicateTableName {
schema: table.schema,
name: table.name,
});
}
self.tables.push(table);
Ok(())
}
pub fn add_edge(&mut self, edge: RegisteredEdge) -> Result<(), CatalogError> {
if self.edges.iter().any(|existing| existing.id == edge.id) {
return Err(CatalogError::DuplicateEdgeId(edge.id));
}
if !self
.tables
.iter()
.any(|table| table.id == edge.source_table)
{
return Err(CatalogError::MissingTable(edge.source_table));
}
if !self
.tables
.iter()
.any(|table| table.id == edge.target_table)
{
return Err(CatalogError::MissingTable(edge.target_table));
}
self.edges.push(edge);
Ok(())
}
pub fn add_filter_column(&mut self, column: FilterColumn) -> Result<(), CatalogError> {
if !self.tables.iter().any(|table| table.id == column.table) {
return Err(CatalogError::MissingTable(column.table));
}
self.filter_columns.push(column);
Ok(())
}
#[must_use]
pub fn table(&self, id: TableId) -> Option<&RegisteredTable> {
self.tables.iter().find(|table| table.id == id)
}
pub fn from_registration_rows(
tables: impl IntoIterator<Item = RegisteredTable>,
edges: impl IntoIterator<Item = RegisteredEdge>,
filter_columns: impl IntoIterator<Item = FilterColumn>,
) -> Result<Self, CatalogError> {
let mut catalog = Self::new();
for table in tables {
catalog.add_table(table)?;
}
for edge in edges {
catalog.add_edge(edge)?;
}
for column in filter_columns {
catalog.add_filter_column(column)?;
}
Ok(catalog)
}
}
impl RegisteredEdge {
pub fn edge_scan_sql(&self, catalog: &Catalog) -> Result<alloc::string::String, CatalogError> {
validate_sql_ident(&self.schema)?;
validate_sql_ident(&self.name)?;
validate_sql_ident(&self.source_column)?;
validate_sql_ident(&self.target_column)?;
let source = catalog
.table(self.source_table)
.ok_or(CatalogError::MissingTable(self.source_table))?;
let target = catalog
.table(self.target_table)
.ok_or(CatalogError::MissingTable(self.target_table))?;
validate_sql_ident(&source.schema)?;
validate_sql_ident(&source.name)?;
validate_sql_ident(&target.schema)?;
validate_sql_ident(&target.name)?;
validate_sql_ident(&source.primary_key_column)?;
validate_sql_ident(&target.primary_key_column)?;
if self.source_table == self.target_table {
return Ok(alloc::format!(
"SELECT \"{}\"::bigint, \"{}\"::bigint FROM \"{}\".\"{}\"",
self.source_column,
self.target_column,
self.schema,
self.name
));
}
Ok(alloc::format!(
"SELECT src.\"{}\"::bigint, tgt.\"{}\"::bigint \
FROM \"{}\".\"{}\" e \
JOIN \"{}\".\"{}\" src ON e.\"{}\" = src.\"{}\" \
JOIN \"{}\".\"{}\" tgt ON e.\"{}\" = tgt.\"{}\"",
source.primary_key_column,
target.primary_key_column,
self.schema,
self.name,
source.schema,
source.name,
self.source_column,
source.primary_key_column,
target.schema,
target.name,
self.target_column,
target.primary_key_column,
))
}
}
pub fn validate_primary_key(value: i64) -> Result<u64, CatalogError> {
if value.is_negative() {
return Err(CatalogError::InvalidPrimaryKey);
}
let primary_key = u64::try_from(value).map_err(|_| CatalogError::PrimaryKeyOutOfRange)?;
if primary_key > u64::from(u32::MAX) {
return Err(CatalogError::PrimaryKeyOutOfRange);
}
Ok(primary_key)
}
pub fn table_id_from_i32(value: i32) -> Result<TableId, CatalogError> {
if value.is_negative() {
return Err(CatalogError::InvalidTableId);
}
u32::try_from(value)
.map(TableId)
.map_err(|_| CatalogError::InvalidTableId)
}
pub fn edge_id_from_i32(value: i32) -> Result<EdgeId, CatalogError> {
if value.is_negative() {
return Err(CatalogError::InvalidEdgeId);
}
u32::try_from(value)
.map(EdgeId)
.map_err(|_| CatalogError::InvalidEdgeId)
}
pub fn validate_sql_ident(ident: &str) -> Result<(), CatalogError> {
let mut chars = ident.chars();
let Some(first) = chars.next() else {
return Err(CatalogError::InvalidSqlIdent);
};
if !first.is_ascii_alphabetic() && first != '_' {
return Err(CatalogError::InvalidSqlIdent);
}
if !chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_') {
return Err(CatalogError::InvalidSqlIdent);
}
Ok(())
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CatalogError {
EmptyCatalog,
DuplicateTableId(TableId),
DuplicateEdgeId(EdgeId),
DuplicateTableName {
schema: String,
name: String,
},
MissingTable(TableId),
InvalidSqlIdent,
InvalidPrimaryKey,
PrimaryKeyOutOfRange,
InvalidTableId,
InvalidEdgeId,
}
impl fmt::Display for CatalogError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::EmptyCatalog => f.write_str("catalog must register at least one table"),
Self::DuplicateTableId(id) => write!(f, "duplicate table id {id}"),
Self::DuplicateEdgeId(id) => write!(f, "duplicate edge id {id}"),
Self::DuplicateTableName { schema, name } => {
write!(f, "duplicate table name {schema}.{name}")
}
Self::MissingTable(id) => write!(f, "missing catalog table {id}"),
Self::InvalidSqlIdent => f.write_str("invalid SQL identifier"),
Self::InvalidPrimaryKey => f.write_str("primary key must be non-negative"),
Self::PrimaryKeyOutOfRange => {
f.write_str("primary key must fit in u32 for NodeKey encoding")
}
Self::InvalidTableId => f.write_str("invalid registered table id"),
Self::InvalidEdgeId => f.write_str("invalid registered edge id"),
}
}
}
impl core::error::Error for CatalogError {}