use chrono::Utc;
use uuid::Uuid;
use vti_common::audit::AuditKey;
use vti_common::error::AppError;
use vti_common::pagination::{Cursor, Paginated, paginate};
use vti_common::store::KeyspaceHandle;
use super::model::{Policy, PolicyPurpose};
const POLICY_PREFIX: &[u8] = b"policies:";
const ACTIVE_PREFIX: &[u8] = b"active_policies:";
fn policy_key(id: Uuid) -> Vec<u8> {
let mut k = POLICY_PREFIX.to_vec();
k.extend_from_slice(id.to_string().as_bytes());
k
}
fn active_key(purpose: PolicyPurpose) -> Vec<u8> {
let mut k = ACTIVE_PREFIX.to_vec();
k.extend_from_slice(purpose.as_str().as_bytes());
k
}
fn decode(bytes: &[u8]) -> Result<Policy, AppError> {
serde_json::from_slice(bytes).map_err(|e| AppError::Internal(format!("Policy decode: {e}")))
}
pub async fn get_policy(ks: &KeyspaceHandle, id: Uuid) -> Result<Option<Policy>, AppError> {
let raw = ks.get_raw(policy_key(id)).await?;
match raw {
Some(bytes) => Ok(Some(decode(&bytes)?)),
None => Ok(None),
}
}
pub async fn store_policy(ks: &KeyspaceHandle, policy: &Policy) -> Result<(), AppError> {
ks.insert(
String::from_utf8(policy_key(policy.id)).expect("policy key is ASCII"),
policy,
)
.await
}
pub async fn delete_policy(ks: &KeyspaceHandle, id: Uuid) -> Result<(), AppError> {
ks.remove(policy_key(id)).await
}
pub async fn list_policies(ks: &KeyspaceHandle) -> Result<Vec<Policy>, AppError> {
let raw = ks.prefix_iter_raw(POLICY_PREFIX.to_vec()).await?;
let mut out = Vec::with_capacity(raw.len());
for (_k, v) in raw {
match decode(&v) {
Ok(p) => out.push(p),
Err(err) => tracing::warn!(error = %err, "skipping unparseable policy row"),
}
}
Ok(out)
}
pub async fn list_policies_paginated(
ks: &KeyspaceHandle,
audit_key: &AuditKey,
cursor: Option<&Cursor>,
limit: usize,
) -> Result<Paginated<Policy>, AppError> {
let mut pairs = ks.prefix_iter_raw(POLICY_PREFIX.to_vec()).await?;
pairs.sort_by(|(a, _), (b, _)| a.cmp(b));
let snapshot_id: u64 = pairs.len() as u64;
paginate(pairs, cursor, limit, &audit_key.key, snapshot_id, decode)
}
pub async fn max_version_for(ks: &KeyspaceHandle, purpose: PolicyPurpose) -> Result<u32, AppError> {
let rows = list_policies(ks).await?;
Ok(rows
.into_iter()
.filter(|p| p.purpose == purpose)
.map(|p| p.version)
.max()
.unwrap_or(0))
}
pub async fn get_active_policy_id(
ks: &KeyspaceHandle,
purpose: PolicyPurpose,
) -> Result<Option<Uuid>, AppError> {
let raw = ks.get_raw(active_key(purpose)).await?;
match raw {
Some(bytes) => {
let s = std::str::from_utf8(&bytes)
.map_err(|e| AppError::Internal(format!("active policy pointer not utf-8: {e}")))?;
let id = Uuid::parse_str(s)
.map_err(|e| AppError::Internal(format!("active policy pointer not uuid: {e}")))?;
Ok(Some(id))
}
None => Ok(None),
}
}
pub async fn set_active_policy_id(
ks: &KeyspaceHandle,
purpose: PolicyPurpose,
id: Uuid,
) -> Result<(), AppError> {
ks.insert_raw(active_key(purpose), id.to_string().into_bytes())
.await
}
pub async fn clear_active_policy_id(
ks: &KeyspaceHandle,
purpose: PolicyPurpose,
) -> Result<(), AppError> {
ks.remove(active_key(purpose)).await
}
pub fn new_policy(
purpose: PolicyPurpose,
rego_source: String,
sha256: [u8; 32],
author_did: String,
version: u32,
) -> Policy {
Policy {
id: Uuid::new_v4(),
purpose,
rego_source,
sha256,
activated_at: None,
author_did,
created_at: Utc::now(),
version,
}
}
#[cfg(test)]
mod tests {
use super::*;
use sha2::{Digest, Sha256};
use vti_common::audit::AuditKeyStore;
use vti_common::config::StoreConfig;
use vti_common::store::Store;
async fn temp_keyspaces() -> (KeyspaceHandle, KeyspaceHandle, AuditKey, tempfile::TempDir) {
let dir = tempfile::tempdir().expect("tempdir");
let store = Store::open(&StoreConfig {
data_dir: dir.path().to_path_buf(),
})
.expect("store");
let policies_ks = store.keyspace("policies").expect("policies ks");
let active_ks = store.keyspace("active_policies").expect("active ks");
let audit_key = AuditKeyStore::new(store.keyspace("audit_key").unwrap())
.ensure_initial(&[0xAA; 32])
.await
.expect("audit key");
(policies_ks, active_ks, audit_key, dir)
}
fn sample(purpose: PolicyPurpose, version: u32, marker: u8) -> Policy {
let src = format!("package vtc.test\nimport rego.v1\n\n# v{version}-marker:{marker}\n");
let sha: [u8; 32] = Sha256::digest(src.as_bytes()).into();
new_policy(purpose, src, sha, "did:key:zAdmin".into(), version)
}
#[tokio::test]
async fn round_trip_every_purpose() {
let (policies_ks, _active_ks, _ak, _dir) = temp_keyspaces().await;
for (i, purpose) in PolicyPurpose::ALL.into_iter().enumerate() {
let p = sample(purpose, 1, i as u8);
store_policy(&policies_ks, &p).await.unwrap();
let got = get_policy(&policies_ks, p.id).await.unwrap().unwrap();
assert_eq!(got, p, "round-trip mismatch for {purpose:?}");
}
let all = list_policies(&policies_ks).await.unwrap();
assert_eq!(all.len(), PolicyPurpose::ALL.len());
}
#[tokio::test]
async fn delete_is_idempotent() {
let (policies_ks, _active_ks, _ak, _dir) = temp_keyspaces().await;
let p = sample(PolicyPurpose::Join, 1, 0);
store_policy(&policies_ks, &p).await.unwrap();
delete_policy(&policies_ks, p.id).await.unwrap();
delete_policy(&policies_ks, p.id).await.unwrap();
assert!(get_policy(&policies_ks, p.id).await.unwrap().is_none());
}
#[tokio::test]
async fn max_version_is_purpose_scoped() {
let (policies_ks, _active_ks, _ak, _dir) = temp_keyspaces().await;
store_policy(&policies_ks, &sample(PolicyPurpose::Join, 1, 0))
.await
.unwrap();
store_policy(&policies_ks, &sample(PolicyPurpose::Join, 2, 1))
.await
.unwrap();
store_policy(&policies_ks, &sample(PolicyPurpose::Removal, 5, 0))
.await
.unwrap();
assert_eq!(
max_version_for(&policies_ks, PolicyPurpose::Join)
.await
.unwrap(),
2
);
assert_eq!(
max_version_for(&policies_ks, PolicyPurpose::Removal)
.await
.unwrap(),
5
);
assert_eq!(
max_version_for(&policies_ks, PolicyPurpose::Personhood)
.await
.unwrap(),
0,
"purpose with no rows must return 0"
);
}
#[tokio::test]
async fn paginated_walks_policies() {
let (policies_ks, _active_ks, audit_key, _dir) = temp_keyspaces().await;
for i in 0..5 {
let mut p = sample(PolicyPurpose::Join, (i + 1) as u32, i as u8);
p.id = Uuid::from_u128(0x1000_0000_0000_0000_0000_0000_0000_0000 + i as u128);
store_policy(&policies_ks, &p).await.unwrap();
}
let page = list_policies_paginated(&policies_ks, &audit_key, None, 2)
.await
.unwrap();
assert_eq!(page.items.len(), 2);
assert!(
page.next_cursor.is_some(),
"expected cursor for partial page"
);
}
#[tokio::test]
async fn active_pointer_set_get_round_trips() {
let (_policies_ks, active_ks, _ak, _dir) = temp_keyspaces().await;
let join_id = Uuid::new_v4();
let removal_id = Uuid::new_v4();
for purpose in PolicyPurpose::ALL {
assert!(
get_active_policy_id(&active_ks, purpose)
.await
.unwrap()
.is_none(),
"purpose {purpose:?} should start unmapped"
);
}
set_active_policy_id(&active_ks, PolicyPurpose::Join, join_id)
.await
.unwrap();
set_active_policy_id(&active_ks, PolicyPurpose::Removal, removal_id)
.await
.unwrap();
assert_eq!(
get_active_policy_id(&active_ks, PolicyPurpose::Join)
.await
.unwrap(),
Some(join_id)
);
assert_eq!(
get_active_policy_id(&active_ks, PolicyPurpose::Removal)
.await
.unwrap(),
Some(removal_id)
);
assert!(
get_active_policy_id(&active_ks, PolicyPurpose::Personhood)
.await
.unwrap()
.is_none()
);
}
#[tokio::test]
async fn active_pointer_overwrites_in_place() {
let (_policies_ks, active_ks, _ak, _dir) = temp_keyspaces().await;
let first = Uuid::new_v4();
let second = Uuid::new_v4();
set_active_policy_id(&active_ks, PolicyPurpose::Join, first)
.await
.unwrap();
set_active_policy_id(&active_ks, PolicyPurpose::Join, second)
.await
.unwrap();
assert_eq!(
get_active_policy_id(&active_ks, PolicyPurpose::Join)
.await
.unwrap(),
Some(second)
);
}
#[tokio::test]
async fn active_pointer_clear_is_idempotent() {
let (_policies_ks, active_ks, _ak, _dir) = temp_keyspaces().await;
let id = Uuid::new_v4();
set_active_policy_id(&active_ks, PolicyPurpose::Removal, id)
.await
.unwrap();
clear_active_policy_id(&active_ks, PolicyPurpose::Removal)
.await
.unwrap();
clear_active_policy_id(&active_ks, PolicyPurpose::Removal)
.await
.unwrap();
assert!(
get_active_policy_id(&active_ks, PolicyPurpose::Removal)
.await
.unwrap()
.is_none()
);
}
}