use std::sync::Arc;
use futures::stream;
use pgwire::api::results::{DataRowEncoder, QueryResponse, Response, Tag};
use pgwire::error::PgWireResult;
use crate::control::security::audit::AuditEvent;
use crate::control::security::catalog::StoredCollection;
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::state::SharedState;
use super::super::types::{int8_field, sqlstate_error, text_field};
use super::schema_validation::{extract_vector_fields, parse_fields_clause};
pub use super::schema_validation::validate_document_schema;
pub fn create_collection(
state: &SharedState,
identity: &AuthenticatedIdentity,
parts: &[&str],
sql: &str,
) -> PgWireResult<Vec<Response>> {
if parts.len() < 3 {
return Err(sqlstate_error(
"42601",
"syntax: CREATE COLLECTION <name> [FIELDS (<field> <type>, ...)]",
));
}
let name_lower = parts[2].to_lowercase();
let name = name_lower.as_str();
let tenant_id = identity.tenant_id;
if let Some(catalog) = state.credentials.catalog()
&& let Ok(Some(existing)) = catalog.get_collection(tenant_id.as_u32(), name)
&& existing.is_active
{
return Err(sqlstate_error(
"42P07",
&format!("collection '{name}' already exists"),
));
}
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let upper = sql_upper_from_parts(parts);
let collection_type = if upper.contains("STORAGE") && upper.contains("STRICT") {
let schema = parse_typed_schema(sql).map_err(|e| sqlstate_error("42601", &e))?;
nodedb_types::CollectionType::strict(schema)
} else if upper.contains("STORAGE") && upper.contains("COLUMNAR") {
nodedb_types::CollectionType::columnar()
} else if super::kv::is_kv_storage_mode(&upper) {
super::kv::parse_kv_collection(sql, &upper)?
} else {
nodedb_types::CollectionType::document()
};
let fields = parse_fields_clause(parts);
let schema_json = match &collection_type {
nodedb_types::CollectionType::Document(nodedb_types::DocumentMode::Strict(schema)) => {
serde_json::to_string(schema).ok()
}
nodedb_types::CollectionType::KeyValue(config) => serde_json::to_string(config).ok(),
_ => None,
};
let coll = StoredCollection {
tenant_id: tenant_id.as_u32(),
name: name.to_string(),
owner: identity.username.clone(),
created_at: now,
fields,
field_defs: Vec::new(),
event_defs: Vec::new(),
collection_type,
timeseries_config: schema_json,
is_active: true,
};
if let Some(catalog) = state.credentials.catalog() {
catalog
.put_collection(&coll)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
}
let catalog = state.credentials.catalog();
state
.permissions
.set_owner(
"collection",
tenant_id,
name,
&identity.username,
catalog.as_ref(),
)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
let vector_fields = extract_vector_fields(&coll.fields);
if !vector_fields.is_empty() {
for (field_name, _dim, metric) in &vector_fields {
tracing::info!(
%name,
field = %field_name,
%metric,
"auto-configuring vector field"
);
}
}
state.audit_record(
AuditEvent::AdminAction,
Some(tenant_id),
&identity.username,
&format!("created collection '{name}'"),
);
Ok(vec![Response::Execution(Tag::new("CREATE COLLECTION"))])
}
pub fn drop_collection(
state: &SharedState,
identity: &AuthenticatedIdentity,
parts: &[&str],
) -> PgWireResult<Vec<Response>> {
if parts.len() < 3 {
return Err(sqlstate_error("42601", "syntax: DROP COLLECTION <name>"));
}
let name_lower = parts[2].to_lowercase();
let name = name_lower.as_str();
let tenant_id = identity.tenant_id;
let is_owner = state
.permissions
.get_owner("collection", tenant_id, name)
.as_deref()
== Some(&identity.username);
if !is_owner
&& !identity.is_superuser
&& !identity.has_role(&crate::control::security::identity::Role::TenantAdmin)
{
return Err(sqlstate_error(
"42501",
"permission denied: only owner, superuser, or tenant_admin can drop collections",
));
}
if let Some(catalog) = state.credentials.catalog() {
if let Ok(Some(mut coll)) = catalog.get_collection(tenant_id.as_u32(), name) {
coll.is_active = false;
catalog
.put_collection(&coll)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
} else {
return Err(sqlstate_error(
"42P01",
&format!("collection '{name}' does not exist"),
));
}
}
state.audit_record(
AuditEvent::AdminAction,
Some(tenant_id),
&identity.username,
&format!("dropped collection '{name}'"),
);
Ok(vec![Response::Execution(Tag::new("DROP COLLECTION"))])
}
pub fn create_index(
state: &SharedState,
identity: &AuthenticatedIdentity,
parts: &[&str],
sql: &str,
) -> PgWireResult<Vec<Response>> {
let upper = sql.to_uppercase();
let is_unique = upper.contains("UNIQUE INDEX");
let idx_offset = if is_unique { 3 } else { 2 };
if parts.len() < idx_offset + 4 {
return Err(sqlstate_error(
"42601",
"syntax: CREATE [UNIQUE] INDEX <name> ON <collection> (<field>) [WHERE ...]",
));
}
let index_name = parts[idx_offset];
if !parts[idx_offset + 1].eq_ignore_ascii_case("ON") {
return Err(sqlstate_error("42601", "expected ON after index name"));
}
let collection = parts[idx_offset + 2];
let field = parts[idx_offset + 3].trim_matches(|c| c == '(' || c == ')');
let where_condition = upper
.find(" WHERE ")
.map(|pos| sql[pos + 7..].trim().to_string());
let case_insensitive = upper.contains("COLLATE NOCASE") || upper.contains("COLLATE CI");
let tenant_id = identity.tenant_id;
if let Some(catalog) = state.credentials.catalog() {
match catalog.get_collection(tenant_id.as_u32(), collection) {
Ok(Some(coll)) if coll.is_active => {
let is_owner = coll.owner == identity.username;
if !is_owner
&& !identity.is_superuser
&& !identity.has_role(&crate::control::security::identity::Role::TenantAdmin)
{
return Err(sqlstate_error(
"42501",
"permission denied: must be collection owner or admin to create indexes",
));
}
}
_ => {
return Err(sqlstate_error(
"42P01",
&format!("collection '{collection}' does not exist"),
));
}
}
}
let catalog = state.credentials.catalog();
let index_owner = if let Some(cat) = catalog {
cat.get_collection(tenant_id.as_u32(), collection)
.ok()
.flatten()
.map(|c| c.owner)
.unwrap_or_else(|| identity.username.clone())
} else {
identity.username.clone()
};
state
.permissions
.set_owner(
"index",
tenant_id,
index_name,
&index_owner,
catalog.as_ref(),
)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
let kind = if is_unique { "unique index" } else { "index" };
let ci = if case_insensitive {
" COLLATE NOCASE"
} else {
""
};
let cond = where_condition
.as_deref()
.map(|c| format!(" WHERE {c}"))
.unwrap_or_default();
state.audit_record(
AuditEvent::AdminAction,
Some(tenant_id),
&identity.username,
&format!("created {kind} '{index_name}' on '{collection}' ({field}){ci}{cond}"),
);
Ok(vec![Response::Execution(Tag::new("CREATE INDEX"))])
}
pub fn drop_index(
state: &SharedState,
identity: &AuthenticatedIdentity,
parts: &[&str],
) -> PgWireResult<Vec<Response>> {
if parts.len() < 3 {
return Err(sqlstate_error("42601", "syntax: DROP INDEX <name>"));
}
let index_name = parts[2];
let tenant_id = identity.tenant_id;
let is_owner = state
.permissions
.get_owner("index", tenant_id, index_name)
.as_deref()
== Some(&identity.username);
if !is_owner
&& !identity.is_superuser
&& !identity.has_role(&crate::control::security::identity::Role::TenantAdmin)
{
return Err(sqlstate_error(
"42501",
"permission denied: must be index owner or admin",
));
}
let catalog = state.credentials.catalog();
state
.permissions
.remove_owner("index", tenant_id, index_name, catalog.as_ref())
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
state.audit_record(
AuditEvent::AdminAction,
Some(tenant_id),
&identity.username,
&format!("dropped index '{index_name}'"),
);
Ok(vec![Response::Execution(Tag::new("DROP INDEX"))])
}
pub fn describe_collection(
state: &SharedState,
identity: &AuthenticatedIdentity,
parts: &[&str],
) -> PgWireResult<Vec<Response>> {
if parts.len() < 2 {
return Err(sqlstate_error("42601", "syntax: DESCRIBE <collection>"));
}
let name_lower = parts[1].to_lowercase();
let name = name_lower.as_str();
let tenant_id = identity.tenant_id;
let catalog = match state.credentials.catalog() {
Some(c) => c,
None => return Err(sqlstate_error("XX000", "catalog not available")),
};
let coll = match catalog.get_collection(tenant_id.as_u32(), name) {
Ok(Some(c)) if c.is_active => c,
_ => {
return Err(sqlstate_error(
"42P01",
&format!("collection '{name}' not found"),
));
}
};
let schema = Arc::new(vec![
text_field("field"),
text_field("type"),
text_field("nullable"),
]);
let mut rows = Vec::new();
let mut encoder = DataRowEncoder::new(schema.clone());
encoder
.encode_field(&"id")
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&"TEXT")
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&"false")
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
rows.push(Ok(encoder.take_row()));
if coll.fields.is_empty() {
encoder
.encode_field(&"document")
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&"JSON")
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&"true")
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
rows.push(Ok(encoder.take_row()));
} else {
for (field_name, field_type) in &coll.fields {
encoder
.encode_field(field_name)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(field_type)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&"true")
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
rows.push(Ok(encoder.take_row()));
}
}
if coll.collection_type.is_strict()
|| coll.collection_type.is_columnar()
|| coll.collection_type.is_kv()
{
encoder
.encode_field(&"__storage")
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&coll.collection_type.as_str())
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&"false")
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
rows.push(Ok(encoder.take_row()));
}
if coll.collection_type.is_timeseries() {
encoder
.encode_field(&"__collection_type")
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&"timeseries")
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&"false")
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
rows.push(Ok(encoder.take_row()));
if let Some(config) = coll.get_timeseries_config() {
for (key, value) in config.as_object().into_iter().flatten() {
let val_str = match value {
serde_json::Value::String(s) => s.clone(),
other => other.to_string(),
};
encoder
.encode_field(&format!("__ts_{key}"))
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&val_str)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&"config")
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
rows.push(Ok(encoder.take_row()));
}
}
}
if let Some(kv_config) = coll.collection_type.kv_config() {
if let Some(pk) = kv_config.primary_key_column() {
encoder
.encode_field(&"__kv_key")
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&format!("{} ({})", pk.name, pk.column_type))
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&"false")
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
rows.push(Ok(encoder.take_row()));
}
if let Some(ttl) = &kv_config.ttl {
let ttl_str = match ttl {
nodedb_types::KvTtlPolicy::FixedDuration { duration_ms } => {
format!("INTERVAL '{duration_ms}ms'")
}
nodedb_types::KvTtlPolicy::FieldBased { field, offset_ms } => {
format!("{field} + INTERVAL '{offset_ms}ms'")
}
};
encoder
.encode_field(&"__kv_ttl")
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&ttl_str)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&"false")
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
rows.push(Ok(encoder.take_row()));
}
}
Ok(vec![Response::Query(QueryResponse::new(
schema,
stream::iter(rows),
))])
}
pub fn show_collections(
state: &SharedState,
identity: &AuthenticatedIdentity,
) -> PgWireResult<Vec<Response>> {
let tenant_id = identity.tenant_id;
let schema = Arc::new(vec![
text_field("name"),
text_field("owner"),
int8_field("created_at"),
]);
let collections = if let Some(catalog) = state.credentials.catalog() {
if identity.is_superuser {
catalog
.load_all_collections()
.unwrap_or_default()
.into_iter()
.filter(|c| c.is_active)
.collect::<Vec<_>>()
} else {
catalog
.load_collections_for_tenant(tenant_id.as_u32())
.unwrap_or_default()
}
} else {
Vec::new()
};
let mut rows = Vec::with_capacity(collections.len());
let mut encoder = DataRowEncoder::new(schema.clone());
for coll in &collections {
encoder
.encode_field(&coll.name)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&coll.owner)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&(coll.created_at as i64))
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
rows.push(Ok(encoder.take_row()));
}
Ok(vec![Response::Query(QueryResponse::new(
schema,
stream::iter(rows),
))])
}
pub fn show_indexes(
state: &SharedState,
identity: &AuthenticatedIdentity,
parts: &[&str],
) -> PgWireResult<Vec<Response>> {
let tenant_id = identity.tenant_id;
let filter_collection = if parts.len() >= 4
&& parts[1].eq_ignore_ascii_case("INDEXES")
&& parts[2].eq_ignore_ascii_case("ON")
{
Some(parts[3])
} else {
None
};
let schema = Arc::new(vec![
text_field("index_name"),
text_field("type"),
text_field("owner"),
]);
let index_types = [
("index", "btree"),
("vector_index", "vector"),
("fulltext_index", "fulltext"),
("spatial_index", "spatial"),
];
let mut rows = Vec::new();
let mut encoder = DataRowEncoder::new(schema.clone());
for (owner_type, display_type) in &index_types {
let indexes = state.permissions.list_owners(owner_type, tenant_id);
for (index_name, owner) in &indexes {
if let Some(coll) = filter_collection
&& !index_name.starts_with(coll)
{
continue;
}
encoder
.encode_field(index_name)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(display_type)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(owner)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
rows.push(Ok(encoder.take_row()));
}
}
Ok(vec![Response::Query(QueryResponse::new(
schema,
stream::iter(rows),
))])
}
pub fn alter_table_add_column(
state: &SharedState,
identity: &AuthenticatedIdentity,
parts: &[&str],
sql: &str,
) -> PgWireResult<Vec<Response>> {
let table_name = parts
.get(2)
.ok_or_else(|| sqlstate_error("42601", "ALTER TABLE requires a table name"))?
.to_lowercase();
let tenant_id = identity.tenant_id;
let upper = sql.to_uppercase();
let add_pos = upper
.find("ADD COLUMN ")
.map(|p| p + 11)
.or_else(|| upper.find("ADD ").map(|p| p + 4))
.ok_or_else(|| sqlstate_error("42601", "expected ADD [COLUMN]"))?;
let col_def_str = sql[add_pos..].trim();
let column = parse_origin_column_def(col_def_str).map_err(|e| sqlstate_error("42601", &e))?;
let column_name = column.name.clone();
if !column.nullable && column.default.is_none() {
return Err(sqlstate_error(
"42601",
&format!(
"ALTER ADD COLUMN '{}': non-nullable column must have a DEFAULT",
column.name
),
));
}
if let Some(catalog) = state.credentials.catalog() {
match catalog.get_collection(tenant_id.as_u32(), &table_name) {
Ok(Some(coll)) if coll.is_active => {
if coll.collection_type.is_strict()
&& let Some(config_json) = &coll.timeseries_config
&& let Ok(mut schema) =
serde_json::from_str::<nodedb_types::columnar::StrictSchema>(config_json)
{
if schema.columns.iter().any(|c| c.name == column.name) {
return Err(sqlstate_error(
"42P07",
&format!("column '{}' already exists", column.name),
));
}
schema.columns.push(column);
schema.version = schema.version.saturating_add(1);
let mut updated = coll;
updated.timeseries_config = serde_json::to_string(&schema).ok();
catalog
.put_collection(&updated)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
}
}
_ => {
return Err(sqlstate_error(
"42P01",
&format!("collection '{table_name}' does not exist"),
));
}
}
}
state.audit_record(
AuditEvent::AdminAction,
Some(tenant_id),
&identity.username,
&format!("ALTER TABLE '{table_name}' ADD COLUMN '{column_name}'"),
);
Ok(vec![Response::Execution(Tag::new("ALTER TABLE"))])
}
fn sql_upper_from_parts(parts: &[&str]) -> String {
parts.join(" ").to_uppercase()
}
pub(super) fn parse_typed_schema(
sql: &str,
) -> Result<nodedb_types::columnar::StrictSchema, String> {
use nodedb_types::columnar::{ColumnDef, ColumnType, StrictSchema};
let paren_start = sql
.find('(')
.ok_or("expected column definitions in parentheses")?;
let mut depth = 0;
let mut paren_end = None;
for (i, b) in sql.bytes().enumerate().skip(paren_start) {
match b {
b'(' => depth += 1,
b')' => {
depth -= 1;
if depth == 0 {
paren_end = Some(i);
break;
}
}
_ => {}
}
}
let paren_end = paren_end.ok_or("unmatched parenthesis")?;
let col_defs_str = &sql[paren_start + 1..paren_end];
let mut columns = Vec::new();
let mut depth = 0;
let mut start = 0;
for (i, c) in col_defs_str.char_indices() {
match c {
'(' => depth += 1,
')' => depth -= 1,
',' if depth == 0 => {
let part = col_defs_str[start..i].trim();
if !part.is_empty() {
columns.push(parse_origin_column_def(part)?);
}
start = i + 1;
}
_ => {}
}
}
let last = col_defs_str[start..].trim();
if !last.is_empty() {
columns.push(parse_origin_column_def(last)?);
}
if columns.is_empty() {
return Err("at least one column required".into());
}
if !columns.iter().any(|c| c.primary_key) {
columns.insert(
0,
ColumnDef::required("_rowid", ColumnType::Int64).with_primary_key(),
);
}
StrictSchema::new(columns).map_err(|e| e.to_string())
}
fn parse_origin_column_def(s: &str) -> Result<nodedb_types::columnar::ColumnDef, String> {
use nodedb_types::columnar::{ColumnDef, ColumnType};
let upper = s.to_uppercase();
let tokens: Vec<&str> = s.split_whitespace().collect();
if tokens.len() < 2 {
return Err(format!(
"column definition requires name and type, got: '{s}'"
));
}
let name = tokens[0].to_lowercase();
let keywords = [" NOT ", " NULL", " PRIMARY ", " DEFAULT "];
let type_end = keywords
.iter()
.filter_map(|kw| upper[name.len()..].find(kw))
.min()
.map(|p| p + name.len())
.unwrap_or(s.len());
let type_str = s[name.len()..type_end].trim();
let column_type: ColumnType = type_str
.parse()
.map_err(|e: nodedb_types::columnar::ColumnTypeParseError| e.to_string())?;
let is_not_null = upper.contains("NOT NULL");
let is_pk = upper.contains("PRIMARY KEY");
let nullable = !is_not_null && !is_pk;
let default = if let Some(pos) = upper.find("DEFAULT ") {
let after_default = s[pos + 8..].trim();
let end = keywords
.iter()
.filter_map(|kw| after_default.to_uppercase().find(kw))
.min()
.unwrap_or(after_default.len());
let expr = after_default[..end].trim();
if expr.is_empty() {
None
} else {
Some(expr.to_string())
}
} else {
None
};
let mut col = if nullable {
ColumnDef::nullable(name, column_type)
} else {
ColumnDef::required(name, column_type)
};
if is_pk {
col = col.with_primary_key();
}
if let Some(d) = default {
col = col.with_default(d);
}
Ok(col)
}