use nodedb_types::DatabaseId;
use pgwire::error::PgWireResult;
use crate::control::security::catalog::StoredCollection;
use crate::control::state::SharedState;
use super::super::super::super::types::sqlstate_error;
pub fn load_strict_collection(
state: &SharedState,
tenant_id: u64,
name: &str,
operation: &str,
) -> PgWireResult<(StoredCollection, nodedb_types::columnar::StrictSchema)> {
let Some(catalog) = state.credentials.catalog() else {
return Err(sqlstate_error("XX000", "no catalog available"));
};
let coll = catalog
.get_collection(DatabaseId::DEFAULT, tenant_id, name)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?
.filter(|c| c.is_active)
.ok_or_else(|| sqlstate_error("42P01", &format!("collection '{name}' does not exist")))?;
if !coll.collection_type.is_strict() {
return Err(sqlstate_error(
"0A000",
&format!("{operation} is only supported on strict document collections"),
));
}
let schema: nodedb_types::columnar::StrictSchema = coll
.timeseries_config
.as_deref()
.and_then(|s| sonic_rs::from_str(s).ok())
.ok_or_else(|| sqlstate_error("XX000", "strict schema missing or malformed"))?;
Ok((coll, schema))
}
pub fn write_schema_back(
coll: &mut StoredCollection,
schema: nodedb_types::columnar::StrictSchema,
) {
coll.collection_type = nodedb_types::CollectionType::strict(schema.clone());
coll.timeseries_config = sonic_rs::to_string(&schema).ok();
}
pub async fn persist_schema_change(
state: &SharedState,
updated: &StoredCollection,
) -> PgWireResult<()> {
let entry =
crate::control::catalog_entry::CatalogEntry::PutCollection(Box::new(updated.clone()));
super::super::super::catalog_propose::propose_and_apply(state, &entry)?;
super::super::create::dispatch_register_from_stored(state, updated)
.await
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
state.schema_version.bump();
Ok(())
}