use crate::control::security::catalog::StoredCollection;
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::state::SharedState;
use crate::types::TraceId;
use nodedb_types::DatabaseId;
use super::enforcement::{build_generated_column_specs, find_materialized_sum_bindings};
pub async fn dispatch_register_if_needed(
state: &SharedState,
identity: &AuthenticatedIdentity,
parts: &[&str],
sql: &str,
) -> crate::Result<()> {
let name = parts.get(2).map(|s| s.to_lowercase()).unwrap_or_default();
let tenant_id = identity.tenant_id;
let Some(catalog) = state.credentials.catalog() else {
return Ok(());
};
let Ok(Some(coll)) = catalog.get_collection(DatabaseId::DEFAULT, tenant_id.as_u64(), &name)
else {
return Ok(());
};
let (fields, _serial_fields) =
super::super::super::schema_validation::parse_fields_clause(parts);
let mut indexes = derive_auto_indexes(fields.iter().map(|(n, _)| n.as_str()));
extend_with_catalog_indexes(&mut indexes, &coll);
let _ = sql; dispatch_register_from_stored_inner(state, tenant_id, &coll, indexes).await
}
pub async fn dispatch_register_by_name(
state: &SharedState,
identity: &AuthenticatedIdentity,
name: &str,
database_id: DatabaseId,
) -> crate::Result<()> {
let tenant_id = identity.tenant_id;
let Some(catalog) = state.credentials.catalog() else {
return Ok(());
};
let Ok(Some(coll)) = catalog.get_collection(database_id, tenant_id.as_u64(), name) else {
return Ok(());
};
let mut indexes = derive_auto_indexes(coll.fields.iter().map(|(n, _)| n.as_str()));
extend_with_catalog_indexes(&mut indexes, &coll);
dispatch_register_from_stored_inner(state, tenant_id, &coll, indexes).await
}
pub async fn dispatch_register_from_stored(
state: &SharedState,
coll: &StoredCollection,
) -> crate::Result<()> {
let tenant_id = crate::types::TenantId::new(coll.tenant_id);
let mut indexes = derive_auto_indexes(coll.fields.iter().map(|(n, _)| n.as_str()));
extend_with_catalog_indexes(&mut indexes, coll);
dispatch_register_from_stored_inner(state, tenant_id, coll, indexes).await
}
fn derive_auto_indexes<'a>(
field_names: impl IntoIterator<Item = &'a str>,
) -> Vec<nodedb_physical::physical_plan::RegisteredIndex> {
field_names
.into_iter()
.map(|n| nodedb_physical::physical_plan::RegisteredIndex {
name: n.to_string(),
path: format!("$.{n}"),
unique: false,
case_insensitive: false,
state: nodedb_physical::physical_plan::RegisteredIndexState::Ready,
predicate: None,
})
.collect()
}
fn extend_with_catalog_indexes(
out: &mut Vec<nodedb_physical::physical_plan::RegisteredIndex>,
coll: &StoredCollection,
) {
for idx in &coll.indexes {
let state = match idx.state {
crate::control::security::catalog::IndexBuildState::Building => {
nodedb_physical::physical_plan::RegisteredIndexState::Building
}
crate::control::security::catalog::IndexBuildState::Ready => {
nodedb_physical::physical_plan::RegisteredIndexState::Ready
}
};
let spec = nodedb_physical::physical_plan::RegisteredIndex {
name: idx.name.clone(),
path: idx.field.clone(),
unique: idx.unique,
case_insensitive: idx.case_insensitive,
state,
predicate: idx.predicate.clone(),
};
if let Some(existing) = out.iter_mut().find(|e| e.path == spec.path) {
*existing = spec;
} else {
out.push(spec);
}
}
}
async fn dispatch_register_from_stored_inner(
state: &SharedState,
tenant_id: crate::types::TenantId,
coll: &StoredCollection,
indexes: Vec<nodedb_physical::physical_plan::RegisteredIndex>,
) -> crate::Result<()> {
let name = crate::control::planner::sql_plan_convert::convert::db_qualified(
coll.database_id,
&coll.name,
);
let Some(catalog) = state.credentials.catalog() else {
return Ok(());
};
let storage_mode = match &coll.collection_type {
nodedb_types::CollectionType::Document(nodedb_types::DocumentMode::Strict(schema)) => {
nodedb_physical::physical_plan::StorageMode::Strict {
schema: schema.clone(),
}
}
nodedb_types::CollectionType::KeyValue(config) => {
nodedb_physical::physical_plan::StorageMode::Strict {
schema: config.schema.clone(),
}
}
nodedb_types::CollectionType::Document(nodedb_types::DocumentMode::Schemaless)
| nodedb_types::CollectionType::Columnar(_) => {
nodedb_physical::physical_plan::StorageMode::Schemaless
}
};
let crdt_enabled = false;
let enforcement = nodedb_physical::physical_plan::EnforcementOptions {
append_only: coll.append_only,
hash_chain: coll.hash_chain,
balanced: coll
.balanced
.as_ref()
.map(|b| nodedb_physical::physical_plan::BalancedDef {
group_key_column: b.group_key_column.clone(),
entry_type_column: b.entry_type_column.clone(),
debit_value: b.debit_value.clone(),
credit_value: b.credit_value.clone(),
amount_column: b.amount_column.clone(),
}),
period_lock: coll.period_lock.as_ref().map(|pl| {
nodedb_physical::physical_plan::PeriodLockConfig {
period_column: pl.period_column.clone(),
ref_table: pl.ref_table.clone(),
ref_pk: pl.ref_pk.clone(),
status_column: pl.status_column.clone(),
allowed_statuses: pl.allowed_statuses.clone(),
}
}),
retention: coll.retention_period.as_ref().and_then(|s| {
crate::data::executor::enforcement::retention::parse_retention_period(s).ok()
}),
has_legal_hold: !coll.legal_holds.is_empty(),
state_constraints: coll.state_constraints.clone(),
transition_checks: coll.transition_checks.clone(),
materialized_sum_sources: find_materialized_sum_bindings(
catalog,
tenant_id.as_u64(),
&name,
),
generated_columns: build_generated_column_specs(coll),
};
let plan = crate::bridge::envelope::PhysicalPlan::Document(
nodedb_physical::physical_plan::DocumentOp::Register {
collection: name.clone(),
indexes,
crdt_enabled,
storage_mode,
enforcement: Box::new(enforcement),
bitemporal: coll.bitemporal,
},
);
crate::control::server::broadcast::broadcast_register_to_all_cores(
state,
tenant_id,
plan,
TraceId::ZERO,
)
.await
}