use std::fmt;
use std::str::FromStr;
use arrow_schema::{DataType, SchemaRef};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use crate::catalog::backend::BackendError;
use crate::tenant::TenantId;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct MutableTableId(String);
impl MutableTableId {
pub fn new(name: impl Into<String>) -> Result<Self, MutableTableError> {
let s = name.into();
if s.is_empty() {
return Err(MutableTableError::InvalidId("empty".to_string()));
}
if s.len() > 63 {
return Err(MutableTableError::InvalidId(format!(
"length must be 1..=63 (got {})",
s.len()
)));
}
if !s
.chars()
.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
{
return Err(MutableTableError::InvalidId(format!(
"only lowercase ASCII, digits, and `_` allowed; got: {s}"
)));
}
Ok(MutableTableId(s))
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl fmt::Display for MutableTableId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.0)
}
}
impl FromStr for MutableTableId {
type Err = MutableTableError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Self::new(s)
}
}
#[derive(Debug, Clone)]
pub struct MutableIndexDef {
pub name: String,
pub columns: Vec<String>,
pub unique: bool,
}
#[derive(Debug, Clone)]
pub struct MutableTableDefinition {
pub id: MutableTableId,
pub schema: SchemaRef,
pub primary_key: Vec<String>,
pub tenant: Option<TenantId>,
pub indexes: Vec<MutableIndexDef>,
pub user_metadata: serde_json::Value,
pub order_column: Option<String>,
pub chunk_size: usize,
}
#[derive(Debug)]
pub struct MutableTableDefinitionBuilder {
id: MutableTableId,
schema: SchemaRef,
primary_key: Vec<String>,
tenant: Option<TenantId>,
indexes: Vec<MutableIndexDef>,
user_metadata: serde_json::Value,
order_column: Option<String>,
chunk_size: usize,
allow_internal_columns: bool,
}
impl MutableTableDefinitionBuilder {
pub fn new(id: MutableTableId, schema: SchemaRef) -> Self {
Self {
id,
schema,
primary_key: vec![],
tenant: None,
indexes: vec![],
user_metadata: serde_json::Value::Object(serde_json::Map::new()),
order_column: None,
chunk_size: 8192,
allow_internal_columns: false,
}
}
pub fn allow_internal_columns(mut self) -> Self {
self.allow_internal_columns = true;
self
}
pub fn primary_key(mut self, columns: Vec<String>) -> Self {
self.primary_key = columns;
self
}
pub fn tenant(mut self, tenant: Option<TenantId>) -> Self {
self.tenant = tenant;
self
}
pub fn index(mut self, idx: MutableIndexDef) -> Self {
self.indexes.push(idx);
self
}
pub fn user_metadata(mut self, value: serde_json::Value) -> Self {
self.user_metadata = value;
self
}
pub fn order_column(mut self, column: impl Into<String>) -> Self {
self.order_column = Some(column.into());
self
}
pub fn chunk_size(mut self, size: usize) -> Self {
self.chunk_size = size;
self
}
pub fn build(self) -> Result<MutableTableDefinition, MutableTableError> {
if self.primary_key.is_empty() {
return Err(MutableTableError::Schema(
"primary key must not be empty".to_string(),
));
}
let field_names: std::collections::HashSet<&str> = self
.schema
.fields()
.iter()
.map(|f| f.name().as_str())
.collect();
for pk in &self.primary_key {
if !field_names.contains(pk.as_str()) {
return Err(MutableTableError::MissingPrimaryKey(pk.clone()));
}
}
for f in self.schema.fields() {
if f.name() == "tenant_id" {
return Err(MutableTableError::ReservedColumn(f.name().clone()));
}
if !self.allow_internal_columns && f.name().starts_with('_') {
return Err(MutableTableError::ReservedColumn(f.name().clone()));
}
}
if let Some(ref col) = self.order_column {
let field = self.schema.field_with_name(col).map_err(|_| {
MutableTableError::Schema(format!("order_column '{col}' not in schema"))
})?;
if !matches!(field.data_type(), DataType::Int64 | DataType::UInt64) {
return Err(MutableTableError::Schema(format!(
"order_column '{col}' must be Int64 or UInt64, got {:?}",
field.data_type()
)));
}
}
Ok(MutableTableDefinition {
id: self.id,
schema: self.schema,
primary_key: self.primary_key,
tenant: self.tenant,
indexes: self.indexes,
user_metadata: self.user_metadata,
order_column: self.order_column,
chunk_size: self.chunk_size,
})
}
}
#[derive(Debug, Error)]
pub enum MutableTableError {
#[error("invalid mutable table id: {0}")]
InvalidId(String),
#[error("schema validation: {0}")]
Schema(String),
#[error("primary key column not present in schema: {0}")]
MissingPrimaryKey(String),
#[error("reserved column name: {0}")]
ReservedColumn(String),
#[error("mutable table not found: {0}")]
NotFound(MutableTableId),
#[error("mutable table already exists: {0}")]
AlreadyExists(MutableTableId),
#[error("table has no order_column declared; required by scan_after")]
NoOrderColumn,
#[error("backend: {0}")]
Backend(#[from] BackendError),
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_schema::{Field, Schema};
use std::sync::Arc;
fn schema_with(cols: &[(&str, DataType)]) -> SchemaRef {
let fields: Vec<Field> = cols
.iter()
.map(|(name, ty)| Field::new(*name, ty.clone(), false))
.collect();
Arc::new(Schema::new(fields))
}
#[test]
fn id_accepts_lowercase_digits_underscore() {
assert!(MutableTableId::new("ab1_c2").is_ok());
assert!(MutableTableId::new("a").is_ok());
assert!(MutableTableId::new("123").is_ok());
}
#[test]
fn id_rejects_uppercase_or_special() {
assert!(MutableTableId::new("Foo").is_err());
assert!(MutableTableId::new("foo-bar").is_err());
assert!(MutableTableId::new("foo bar").is_err());
assert!(MutableTableId::new("").is_err());
let too_long = "a".repeat(64);
assert!(MutableTableId::new(too_long).is_err());
}
#[test]
fn builder_rejects_empty_primary_key() {
let id = MutableTableId::new("t").unwrap();
let s = schema_with(&[("a", DataType::Int64)]);
let err = MutableTableDefinitionBuilder::new(id, s)
.build()
.unwrap_err();
assert!(matches!(err, MutableTableError::Schema(_)));
}
#[test]
fn builder_rejects_pk_not_in_schema() {
let id = MutableTableId::new("t").unwrap();
let s = schema_with(&[("a", DataType::Int64)]);
let err = MutableTableDefinitionBuilder::new(id, s)
.primary_key(vec!["missing".to_string()])
.build()
.unwrap_err();
assert!(matches!(err, MutableTableError::MissingPrimaryKey(_)));
}
#[test]
fn builder_rejects_reserved_tenant_id_column() {
let id = MutableTableId::new("t").unwrap();
let s = schema_with(&[("a", DataType::Int64), ("tenant_id", DataType::Utf8)]);
let err = MutableTableDefinitionBuilder::new(id, s)
.primary_key(vec!["a".to_string()])
.build()
.unwrap_err();
assert!(matches!(err, MutableTableError::ReservedColumn(_)));
}
#[test]
fn builder_rejects_underscore_prefixed_column() {
let id = MutableTableId::new("t").unwrap();
let s = schema_with(&[("a", DataType::Int64), ("_rowid", DataType::Int64)]);
let err = MutableTableDefinitionBuilder::new(id, s)
.primary_key(vec!["a".to_string()])
.build()
.unwrap_err();
assert!(matches!(err, MutableTableError::ReservedColumn(_)));
}
#[test]
fn builder_rejects_order_column_with_wrong_type() {
let id = MutableTableId::new("t").unwrap();
let s = schema_with(&[("a", DataType::Int64), ("seq", DataType::Float32)]);
let err = MutableTableDefinitionBuilder::new(id, s)
.primary_key(vec!["a".to_string()])
.order_column("seq")
.build()
.unwrap_err();
assert!(matches!(err, MutableTableError::Schema(_)));
}
#[test]
fn builder_happy_path_with_index_order_and_tenant() {
let id = MutableTableId::new("dim").unwrap();
let s = schema_with(&[
("k", DataType::Int64),
("v", DataType::Utf8),
("seq", DataType::Int64),
]);
let t = TenantId::from_str("01906c83-d4c8-7e10-9c4f-3b6f7c5a8e9a").unwrap();
let def = MutableTableDefinitionBuilder::new(id, s)
.primary_key(vec!["k".to_string()])
.tenant(Some(t))
.index(MutableIndexDef {
name: "idx_v".into(),
columns: vec!["v".into()],
unique: false,
})
.order_column("seq")
.build()
.unwrap();
assert_eq!(def.primary_key, vec!["k".to_string()]);
assert_eq!(def.indexes.len(), 1);
assert_eq!(def.tenant, Some(t));
assert_eq!(def.order_column.as_deref(), Some("seq"));
}
}