use pgwire::api::results::{Response, Tag};
use pgwire::error::PgWireResult;
use crate::control::security::catalog::procedure_types::StoredProcedure;
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::state::SharedState;
use super::super::super::super::types::{require_tenant_admin, sqlstate_error};
use super::parse::parse_create_procedure;
use super::routability::extract_routability;
pub fn create_procedure(
state: &SharedState,
identity: &AuthenticatedIdentity,
sql: &str,
) -> PgWireResult<Vec<Response>> {
require_tenant_admin(identity, "create procedures")?;
let parsed = parse_create_procedure(sql)?;
let tenant_id = identity.tenant_id.as_u64();
let catalog = state
.credentials
.catalog()
.as_ref()
.ok_or_else(|| sqlstate_error("XX000", "system catalog not available"))?;
if !parsed.or_replace
&& let Ok(Some(_)) = catalog.get_procedure(tenant_id, &parsed.name)
{
return Err(sqlstate_error(
"42723",
&format!("procedure '{}' already exists", parsed.name),
));
}
crate::control::planner::procedural::parse_block(&parsed.body_sql)
.map_err(|e| sqlstate_error("42601", &format!("procedure body parse error: {e}")))?;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|_| sqlstate_error("XX000", "system clock before UNIX epoch"))?
.as_secs();
let routability = extract_routability(&parsed.body_sql);
let stored = StoredProcedure {
tenant_id,
name: parsed.name.clone(),
parameters: parsed.parameters,
body_sql: parsed.body_sql,
max_iterations: parsed.max_iterations,
timeout_secs: parsed.timeout_secs,
routability,
owner: identity.username.clone(),
created_at: now,
descriptor_version: 0,
modification_hlc: nodedb_types::Hlc::ZERO,
};
let entry = crate::control::catalog_entry::CatalogEntry::PutProcedure(Box::new(stored.clone()));
super::super::super::catalog_propose::propose_and_apply(state, &entry)?;
emit_procedure_put(state, &stored);
state.audit_record(
crate::control::security::audit::AuditEvent::AdminAction,
Some(identity.tenant_id),
&identity.username,
&format!("CREATE PROCEDURE {}", stored.name),
);
Ok(vec![Response::Execution(Tag::new("CREATE PROCEDURE"))])
}
fn emit_procedure_put(
state: &crate::control::state::SharedState,
stored: &crate::control::security::catalog::procedure_types::StoredProcedure,
) {
use nodedb_types::sync::wire::DefinitionSyncMsg;
let lite_params: Vec<serde_json::Value> = stored
.parameters
.iter()
.map(|p| {
serde_json::json!({
"name": p.name,
"data_type": p.data_type,
"direction": p.direction.as_str(),
})
})
.collect();
let payload_json = serde_json::json!({
"name": stored.name,
"parameters": lite_params,
"body_sql": stored.body_sql,
"max_iterations": stored.max_iterations,
"timeout_secs": stored.timeout_secs,
"owner": stored.owner,
"created_at": stored.created_at,
});
match sonic_rs::to_vec(&payload_json) {
Ok(payload) => {
let msg = DefinitionSyncMsg {
definition_type: "procedure".into(),
name: stored.name.clone(),
action: "put".into(),
payload,
};
state.definition_sync_fanout.broadcast(&msg);
}
Err(e) => {
tracing::warn!(
name = %stored.name,
error = %e,
"definition_sync: failed to serialize procedure payload; skipping broadcast"
);
}
}
}