use nodedb_types::DatabaseId;
use pgwire::api::results::{Response, Tag};
use pgwire::error::PgWireResult;
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::state::SharedState;
use super::insert_parse::{
fire_before_triggers, fire_instead_triggers, fire_sync_after_triggers,
fire_sync_after_update_triggers, parse_write_statement,
};
pub async fn upsert_document(
state: &SharedState,
identity: &AuthenticatedIdentity,
database_id: DatabaseId,
sql: &str,
) -> Option<PgWireResult<Vec<Response>>> {
let parsed = match parse_write_statement(state, identity, sql, "UPSERT INTO ")? {
Ok(p) => p,
Err(e) => return Some(Err(e)),
};
let tenant_id = identity.tenant_id;
if let Some(result) = fire_instead_triggers(
state,
identity,
tenant_id,
&parsed.coll_name,
&parsed.fields,
"UPSERT",
)
.await
{
return Some(result);
}
let mut fields = match fire_before_triggers(
state,
identity,
tenant_id,
&parsed.coll_name,
&parsed.fields,
)
.await
{
Ok(f) => f,
Err(e) => return Some(e),
};
if let Some(catalog) = state.credentials.catalog()
&& let Ok(Some(coll_def)) =
catalog.get_collection(database_id, tenant_id.as_u64(), &parsed.coll_name)
{
if !coll_def.type_guards.is_empty()
&& let Err(violation) =
crate::data::executor::enforcement::typeguard::inject_and_validate(
&parsed.coll_name,
&coll_def.type_guards,
&mut fields,
)
{
use crate::control::server::pgwire::types::error_code_to_sqlstate;
let (severity, code, message) = error_code_to_sqlstate(&violation);
return Some(Err(pgwire::error::PgWireError::UserError(Box::new(
pgwire::error::ErrorInfo::new(severity.to_owned(), code.to_owned(), message),
))));
}
if !coll_def.check_constraints.is_empty()
&& let Err(e) = super::check_constraint::enforce_check_constraints(
state,
tenant_id,
&coll_def.check_constraints,
&fields,
)
.await
{
return Some(Err(e));
}
}
if let Some(catalog) = state.credentials.catalog()
&& let Ok(Some(coll_def)) =
catalog.get_collection(database_id, tenant_id.as_u64(), &parsed.coll_name)
{
for (field_name, type_name) in &coll_def.fields {
if let Some(value) = fields.get(field_name.as_str()) {
let label = match value {
nodedb_types::Value::String(s) => s.as_str(),
_ => continue,
};
if let Err(msg) = state.custom_type_registry.validate_enum_label(
tenant_id.as_u64(),
type_name,
label,
) {
use crate::control::server::pgwire::types::sqlstate_error;
return Some(Err(sqlstate_error("22P02", &msg)));
}
}
}
}
let pk_for_probe = fields
.get("id")
.or_else(|| fields.get("document_id"))
.or_else(|| fields.get("key"))
.map(|v| match v {
nodedb_types::Value::String(s) => s.clone(),
nodedb_types::Value::Integer(i) => i.to_string(),
other => format!("{other:?}"),
});
let old_fields = if let Some(ref pk) = pk_for_probe {
let row = crate::control::trigger::dml_hook::fetch_old_row(
state,
tenant_id,
&parsed.coll_name,
pk,
)
.await;
if row.is_empty() { None } else { Some(row) }
} else {
None
};
let upsert_sql = super::insert_parse::fields_to_upsert_sql(&parsed.coll_name, &fields);
if let Err(e) =
super::insert_parse::plan_and_dispatch(state, identity, tenant_id, database_id, &upsert_sql)
.await
{
return Some(Err(e));
}
if let Some(ref old) = old_fields {
if let Some(err) = fire_sync_after_update_triggers(
state,
identity,
tenant_id,
&parsed.coll_name,
old,
&fields,
)
.await
{
return Some(err);
}
} else if let Some(err) =
fire_sync_after_triggers(state, identity, tenant_id, &parsed.coll_name, &fields).await
{
return Some(err);
}
Some(Ok(vec![Response::Execution(Tag::new("UPSERT"))]))
}