use std::sync::{Arc, Mutex};
use nodedb_cluster::{DescriptorId, DescriptorKind};
use nodedb_sql::{
SqlCatalog, SqlCatalogError,
types::{ArrayCatalogView, CollectionInfo, ColumnInfo, EngineType, SqlDataType},
};
use crate::control::planner::descriptor_set::DescriptorVersionSet;
use crate::control::security::credential::CredentialStore;
use crate::control::state::SharedState;
use crate::types::DatabaseId;
pub struct OriginCatalog {
credentials: Arc<CredentialStore>,
tenant_id: u64,
database_id: DatabaseId,
retention_policy_registry:
Option<Arc<crate::engine::timeseries::retention_policy::RetentionPolicyRegistry>>,
array_catalog: Option<crate::control::array_catalog::ArrayCatalogHandle>,
drain_tracker: Option<Arc<crate::control::lease::DescriptorDrainTracker>>,
recorded_versions: Mutex<DescriptorVersionSet>,
}
impl OriginCatalog {
pub fn new(
credentials: Arc<CredentialStore>,
tenant_id: u64,
database_id: DatabaseId,
retention_policy_registry: Option<
Arc<crate::engine::timeseries::retention_policy::RetentionPolicyRegistry>,
>,
) -> Self {
Self {
credentials,
tenant_id,
database_id,
retention_policy_registry,
drain_tracker: None,
recorded_versions: Mutex::new(DescriptorVersionSet::new()),
array_catalog: None,
}
}
pub fn new_with_lease(
shared: &Arc<SharedState>,
tenant_id: u64,
database_id: DatabaseId,
retention_policy_registry: Option<
Arc<crate::engine::timeseries::retention_policy::RetentionPolicyRegistry>,
>,
) -> Self {
Self {
credentials: Arc::clone(&shared.credentials),
tenant_id,
database_id,
retention_policy_registry,
drain_tracker: Some(Arc::clone(&shared.lease_drain)),
recorded_versions: Mutex::new(DescriptorVersionSet::new()),
array_catalog: Some(shared.array_catalog.clone()),
}
}
pub fn take_recorded_versions(&self) -> DescriptorVersionSet {
let mut guard = self
.recorded_versions
.lock()
.unwrap_or_else(|p| p.into_inner());
std::mem::take(&mut *guard)
}
fn has_auto_tier(&self, collection: &str) -> bool {
let registry = match &self.retention_policy_registry {
Some(r) => r,
None => return false,
};
registry
.get(self.tenant_id, collection)
.is_some_and(|p| p.auto_tier)
}
}
impl SqlCatalog for OriginCatalog {
fn get_collection(
&self,
_database_id: nodedb_types::DatabaseId,
name: &str,
) -> std::result::Result<Option<CollectionInfo>, SqlCatalogError> {
let catalog_ref = self.credentials.catalog();
let Some(catalog) = catalog_ref.as_ref() else {
return Ok(None);
};
let Some(stored) = catalog
.get_collection(self.database_id, self.tenant_id, name)
.ok()
.flatten()
else {
return Ok(None);
};
if !stored.is_active {
let retention = crate::config::server::RetentionSettings::default()
.retention_window()
.as_nanos() as u64;
let retention_expires_at_ns = stored.modification_hlc.wall_ns.saturating_add(retention);
return Err(SqlCatalogError::CollectionDeactivated {
name: name.to_string(),
retention_expires_at_ns,
});
}
let descriptor_id = DescriptorId::new(
self.tenant_id,
DescriptorKind::Collection,
stored.name.clone(),
);
let version = stored.descriptor_version.max(1);
{
let mut guard = self
.recorded_versions
.lock()
.unwrap_or_else(|p| p.into_inner());
guard.record(descriptor_id.clone(), version);
}
if let Some(drain) = &self.drain_tracker {
let now_wall_ns = crate::control::lease::wall_now_ns();
if drain.is_draining(&descriptor_id, version, now_wall_ns) {
return Err(SqlCatalogError::RetryableSchemaChanged {
descriptor: format!("collection {name}"),
});
}
}
let (engine, columns, primary_key) = convert_collection_type(&stored);
let auto_tier = self.has_auto_tier(name);
let indexes = stored
.indexes
.iter()
.map(|i| nodedb_sql::types::IndexSpec {
name: i.name.clone(),
field: i.field.clone(),
unique: i.unique,
case_insensitive: i.case_insensitive,
state: match i.state {
crate::control::security::catalog::IndexBuildState::Building => {
nodedb_sql::types::IndexState::Building
}
crate::control::security::catalog::IndexBuildState::Ready => {
nodedb_sql::types::IndexState::Ready
}
},
predicate: i.predicate.clone(),
})
.collect();
Ok(Some(CollectionInfo {
name: stored.name,
engine,
columns,
primary_key,
has_auto_tier: auto_tier,
indexes,
bitemporal: stored.bitemporal,
primary: stored.primary,
vector_primary: stored.vector_primary,
}))
}
fn lookup_array(&self, name: &str) -> Option<ArrayCatalogView> {
use nodedb_array::schema::{ArraySchema, AttrType as EAT, DimType as EDT};
use nodedb_array::types::domain::DomainBound;
use nodedb_sql::types_array::{
ArrayAttrAst, ArrayAttrType, ArrayDimAst, ArrayDimType, ArrayDomainBound,
};
let handle = self.array_catalog.as_ref()?;
let entry = {
let cat = handle.read().ok()?;
cat.lookup_by_name(name)?
};
let schema: ArraySchema = zerompk::from_msgpack(&entry.schema_msgpack).ok()?;
let dims = schema
.dims
.iter()
.map(|d| ArrayDimAst {
name: d.name.clone(),
dtype: match d.dtype {
EDT::Int64 => ArrayDimType::Int64,
EDT::Float64 => ArrayDimType::Float64,
EDT::TimestampMs => ArrayDimType::TimestampMs,
EDT::String => ArrayDimType::String,
},
lo: bound_engine_to_ast(&d.domain.lo),
hi: bound_engine_to_ast(&d.domain.hi),
})
.collect();
let attrs = schema
.attrs
.iter()
.map(|a| ArrayAttrAst {
name: a.name.clone(),
dtype: match a.dtype {
EAT::Int64 => ArrayAttrType::Int64,
EAT::Float64 => ArrayAttrType::Float64,
EAT::String => ArrayAttrType::String,
EAT::Bytes => ArrayAttrType::Bytes,
},
nullable: a.nullable,
})
.collect();
let tile_extents = schema.tile_extents.iter().map(|n| *n as i64).collect();
fn bound_engine_to_ast(b: &DomainBound) -> ArrayDomainBound {
match b {
DomainBound::Int64(v) => ArrayDomainBound::Int64(*v),
DomainBound::Float64(v) => ArrayDomainBound::Float64(*v),
DomainBound::TimestampMs(v) => ArrayDomainBound::TimestampMs(*v),
DomainBound::String(v) => ArrayDomainBound::String(v.clone()),
}
}
Some(ArrayCatalogView {
name: schema.name,
dims,
attrs,
tile_extents,
})
}
}
fn convert_collection_type(
stored: &crate::control::security::catalog::StoredCollection,
) -> (EngineType, Vec<ColumnInfo>, Option<String>) {
use nodedb_types::CollectionType;
use nodedb_types::columnar::DocumentMode;
match &stored.collection_type {
CollectionType::Document(DocumentMode::Strict(schema)) => {
let columns = schema
.columns
.iter()
.map(|c| ColumnInfo {
name: c.name.clone(),
data_type: convert_column_type(&c.column_type),
nullable: c.nullable,
is_primary_key: c.primary_key,
default: c.default.clone(),
})
.collect();
let pk = schema
.columns
.iter()
.find(|c| c.primary_key)
.map(|c| c.name.clone());
(EngineType::DocumentStrict, columns, pk)
}
CollectionType::Document(DocumentMode::Schemaless) => {
let mut columns = vec![ColumnInfo {
name: "id".into(),
data_type: SqlDataType::String,
nullable: false,
is_primary_key: true,
default: None,
}];
for (name, type_str) in &stored.fields {
columns.push(ColumnInfo {
name: name.clone(),
data_type: parse_type_str(type_str),
nullable: true,
is_primary_key: false,
default: None,
});
}
(EngineType::DocumentSchemaless, columns, Some("id".into()))
}
CollectionType::KeyValue(config) => {
let columns = config
.schema
.columns
.iter()
.map(|c| ColumnInfo {
name: c.name.clone(),
data_type: convert_column_type(&c.column_type),
nullable: c.nullable,
is_primary_key: c.primary_key,
default: c.default.clone(),
})
.collect();
let pk = config
.schema
.columns
.iter()
.find(|c| c.primary_key)
.map(|c| c.name.clone())
.or_else(|| Some("key".into()));
(EngineType::KeyValue, columns, pk)
}
CollectionType::Columnar(profile) => {
let engine = if profile.is_timeseries() {
EngineType::Timeseries
} else if profile.is_spatial() {
EngineType::Spatial
} else {
EngineType::Columnar
};
let mut columns = Vec::new();
if !profile.is_timeseries() {
columns.push(ColumnInfo {
name: "id".into(),
data_type: SqlDataType::String,
nullable: false,
is_primary_key: true,
default: Some("UUID_V7".into()),
});
}
for (name, type_str) in &stored.fields {
columns.push(ColumnInfo {
name: name.clone(),
data_type: parse_type_str(type_str),
nullable: true,
is_primary_key: false,
default: None,
});
}
let pk = if profile.is_timeseries() {
None
} else {
Some("id".into())
};
(engine, columns, pk)
}
}
}
fn convert_column_type(ct: &nodedb_types::columnar::ColumnType) -> SqlDataType {
use nodedb_types::columnar::ColumnType;
match ct {
ColumnType::Int64 => SqlDataType::Int64,
ColumnType::Float64 => SqlDataType::Float64,
ColumnType::String => SqlDataType::String,
ColumnType::Bool => SqlDataType::Bool,
ColumnType::Bytes | ColumnType::Geometry | ColumnType::Json => SqlDataType::Bytes,
ColumnType::Timestamp | ColumnType::SystemTimestamp => SqlDataType::Timestamp,
ColumnType::Timestamptz => SqlDataType::Timestamptz,
ColumnType::Decimal { .. } => SqlDataType::Decimal,
ColumnType::Uuid | ColumnType::Ulid | ColumnType::Regex => SqlDataType::String,
ColumnType::Duration => SqlDataType::Int64,
ColumnType::Array | ColumnType::Set | ColumnType::Range | ColumnType::Record => {
SqlDataType::Bytes
}
ColumnType::Vector(dim) => SqlDataType::Vector(*dim as usize),
_ => SqlDataType::Bytes,
}
}
fn parse_type_str(s: &str) -> SqlDataType {
let upper = s.to_uppercase();
if upper.starts_with("DECIMAL") || upper.starts_with("NUMERIC") {
return SqlDataType::Decimal;
}
match upper.as_str() {
"INT" | "INTEGER" | "INT4" | "INT8" | "BIGINT" => SqlDataType::Int64,
"FLOAT" | "FLOAT4" | "FLOAT8" | "FLOAT64" | "DOUBLE" | "REAL" => SqlDataType::Float64,
"BOOL" | "BOOLEAN" => SqlDataType::Bool,
"BYTES" | "BYTEA" | "BLOB" => SqlDataType::Bytes,
"TIMESTAMP" | "TIMESTAMPTZ" => SqlDataType::Timestamp,
_ => SqlDataType::String,
}
}