use reifydb_core::{
interface::catalog::policy::{Policy, PolicyOperation, PolicyToCreate},
key::{policy::PolicyKey, policy_op::PolicyOpKey},
};
use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction};
use reifydb_type::fragment::Fragment;
use crate::{
CatalogStore, Result,
error::{CatalogError, CatalogObjectKind},
store::{
policy::shape::{
policy::{ENABLED, ID, NAME, SHAPE, TARGET_NAMESPACE, TARGET_SHAPE, TARGET_TYPE},
policy_op,
},
sequence::system::SystemSequence,
},
};
impl CatalogStore {
pub(crate) fn create_policy(
txn: &mut AdminTransaction,
to_create: PolicyToCreate,
) -> Result<(Policy, Vec<PolicyOperation>)> {
if let Some(ref name) = to_create.name
&& (Self::find_policy_by_name(&mut Transaction::Admin(&mut *txn), name)?).is_some()
{
return Err(CatalogError::AlreadyExists {
kind: CatalogObjectKind::Policy,
namespace: "system".to_string(),
name: name.clone(),
fragment: Fragment::None,
}
.into());
}
for op in &to_create.operations {
if !to_create.target_type.is_valid_operation(&op.operation) {
return Err(CatalogError::PolicyInvalidOperation {
target_type: to_create.target_type.as_str(),
operation: op.operation.clone(),
valid: to_create.target_type.valid_operation_names(),
policy_name: to_create.name.clone(),
}
.into());
}
}
let policy_id = SystemSequence::next_policy_id(txn)?;
let mut row = SHAPE.allocate();
SHAPE.set_u64(&mut row, ID, policy_id);
SHAPE.set_utf8(&mut row, NAME, to_create.name.as_deref().unwrap_or(""));
SHAPE.set_utf8(&mut row, TARGET_TYPE, to_create.target_type.as_str());
SHAPE.set_utf8(&mut row, TARGET_NAMESPACE, to_create.target_namespace.as_deref().unwrap_or(""));
SHAPE.set_utf8(&mut row, TARGET_SHAPE, to_create.target_shape.as_deref().unwrap_or(""));
SHAPE.set_bool(&mut row, ENABLED, true);
txn.set(&PolicyKey::encoded(policy_id), row)?;
let mut ops = Vec::new();
for (i, op) in to_create.operations.iter().enumerate() {
let mut op_row = policy_op::SHAPE.allocate();
policy_op::SHAPE.set_u64(&mut op_row, policy_op::POLICY_ID, policy_id);
policy_op::SHAPE.set_utf8(&mut op_row, policy_op::OPERATION, &op.operation);
policy_op::SHAPE.set_utf8(&mut op_row, policy_op::BODY_SOURCE, &op.body_source);
txn.set(&PolicyOpKey::encoded(policy_id, i as u64), op_row)?;
ops.push(PolicyOperation {
policy_id,
operation: op.operation.clone(),
body_source: op.body_source.clone(),
});
}
let def = Policy {
id: policy_id,
name: to_create.name,
target_type: to_create.target_type,
target_namespace: to_create.target_namespace,
target_shape: to_create.target_shape,
enabled: true,
};
Ok((def, ops))
}
}
#[cfg(test)]
mod tests {
use reifydb_core::interface::catalog::policy::{PolicyOpToCreate, PolicyTargetType, PolicyToCreate};
use reifydb_engine::test_harness::create_test_admin_transaction;
use crate::CatalogStore;
#[test]
fn test_create_policy() {
let mut txn = create_test_admin_transaction();
let to_create = PolicyToCreate {
name: Some("insert_gate".to_string()),
target_type: PolicyTargetType::Table,
target_namespace: None,
target_shape: None,
operations: vec![PolicyOpToCreate {
operation: "insert".to_string(),
body_source: "require { true }".to_string(),
}],
};
let (def, ops) = CatalogStore::create_policy(&mut txn, to_create).unwrap();
assert_eq!(def.name, Some("insert_gate".to_string()));
assert_eq!(def.target_type, PolicyTargetType::Table);
assert!(def.enabled);
assert_eq!(ops.len(), 1);
assert_eq!(ops[0].operation, "insert");
}
#[test]
fn test_create_policy_rejects_typo_on_crud_operation() {
let mut txn = create_test_admin_transaction();
let err = CatalogStore::create_policy(
&mut txn,
PolicyToCreate {
name: Some("bad_table".to_string()),
target_type: PolicyTargetType::Table,
target_namespace: None,
target_shape: None,
operations: vec![PolicyOpToCreate {
operation: "select".to_string(),
body_source: "filter { true }".to_string(),
}],
},
)
.unwrap_err();
assert_eq!(err.diagnostic().code, "CA_086");
}
#[test]
fn test_create_policy_rejects_unknown_session_operation() {
let mut txn = create_test_admin_transaction();
let err = CatalogStore::create_policy(
&mut txn,
PolicyToCreate {
name: Some("bad_session".to_string()),
target_type: PolicyTargetType::Session,
target_namespace: None,
target_shape: None,
operations: vec![PolicyOpToCreate {
operation: "subscribe".to_string(),
body_source: "filter { true }".to_string(),
}],
},
)
.unwrap_err();
assert_eq!(err.diagnostic().code, "CA_086");
}
#[test]
fn test_create_policy_rejects_op_on_operationless_target_type() {
let mut txn = create_test_admin_transaction();
let err = CatalogStore::create_policy(
&mut txn,
PolicyToCreate {
name: Some("bad_sub".to_string()),
target_type: PolicyTargetType::Subscription,
target_namespace: None,
target_shape: None,
operations: vec![PolicyOpToCreate {
operation: "from".to_string(),
body_source: "filter { true }".to_string(),
}],
},
)
.unwrap_err();
assert_eq!(err.diagnostic().code, "CA_086");
}
#[test]
fn test_create_policy_accepts_from_on_view() {
let mut txn = create_test_admin_transaction();
CatalogStore::create_policy(
&mut txn,
PolicyToCreate {
name: Some("view_from".to_string()),
target_type: PolicyTargetType::View,
target_namespace: None,
target_shape: None,
operations: vec![PolicyOpToCreate {
operation: "from".to_string(),
body_source: "filter { true }".to_string(),
}],
},
)
.unwrap();
}
#[test]
fn test_create_policy_allows_empty_operations() {
let mut txn = create_test_admin_transaction();
let (def, ops) = CatalogStore::create_policy(
&mut txn,
PolicyToCreate {
name: Some("empty".to_string()),
target_type: PolicyTargetType::Table,
target_namespace: None,
target_shape: None,
operations: vec![],
},
)
.unwrap();
assert!(def.enabled);
assert!(ops.is_empty());
}
#[test]
fn test_create_policy_duplicate() {
let mut txn = create_test_admin_transaction();
CatalogStore::create_policy(
&mut txn,
PolicyToCreate {
name: Some("read_only".to_string()),
target_type: PolicyTargetType::Table,
target_namespace: None,
target_shape: None,
operations: vec![],
},
)
.unwrap();
let err = CatalogStore::create_policy(
&mut txn,
PolicyToCreate {
name: Some("read_only".to_string()),
target_type: PolicyTargetType::Table,
target_namespace: None,
target_shape: None,
operations: vec![],
},
)
.unwrap_err();
assert_eq!(err.diagnostic().code, "CA_042");
}
}