use uuid::Uuid;
use vti_common::audit::AuditKey;
use vti_common::error::AppError;
use vti_common::pagination::{Cursor, Paginated, paginate};
use vti_common::store::KeyspaceHandle;
use super::Relationship;
pub const RELATIONSHIPS_PREFIX: &[u8] = b"relationships:";
pub const RELATIONSHIPS_BY_DID_PREFIX: &[u8] = b"relationships_by_did:";
fn primary_key(id: Uuid) -> Vec<u8> {
let mut k = RELATIONSHIPS_PREFIX.to_vec();
k.extend_from_slice(id.to_string().as_bytes());
k
}
fn index_key(did: &str, id: Uuid) -> Vec<u8> {
let mut k = RELATIONSHIPS_BY_DID_PREFIX.to_vec();
k.extend_from_slice(did.as_bytes());
k.push(b':');
k.extend_from_slice(id.to_string().as_bytes());
k
}
fn decode(bytes: &[u8]) -> Result<Relationship, AppError> {
serde_json::from_slice(bytes)
.map_err(|e| AppError::Internal(format!("Relationship decode: {e}")))
}
pub async fn get_relationship(
primary: &KeyspaceHandle,
id: Uuid,
) -> Result<Option<Relationship>, AppError> {
let raw = primary.get_raw(primary_key(id)).await?;
match raw {
Some(bytes) => Ok(Some(decode(&bytes)?)),
None => Ok(None),
}
}
pub async fn store_relationship(
primary: &KeyspaceHandle,
index: &KeyspaceHandle,
rel: &Relationship,
) -> Result<(), AppError> {
primary
.insert(
String::from_utf8(primary_key(rel.id)).expect("ascii key"),
rel,
)
.await?;
let id_value = serde_json::json!({ "id": rel.id.to_string() });
index
.insert(
String::from_utf8(index_key(&rel.issuer_did, rel.id)).expect("ascii key"),
&id_value,
)
.await?;
if rel.subject_did != rel.issuer_did {
index
.insert(
String::from_utf8(index_key(&rel.subject_did, rel.id)).expect("ascii key"),
&id_value,
)
.await?;
}
Ok(())
}
pub async fn delete_relationship(
primary: &KeyspaceHandle,
index: &KeyspaceHandle,
id: Uuid,
) -> Result<(), AppError> {
if let Some(rel) = get_relationship(primary, id).await? {
primary.remove(primary_key(id)).await?;
index.remove(index_key(&rel.issuer_did, id)).await?;
if rel.subject_did != rel.issuer_did {
index.remove(index_key(&rel.subject_did, id)).await?;
}
} else {
primary.remove(primary_key(id)).await?;
}
Ok(())
}
pub async fn find_by_hash(
primary: &KeyspaceHandle,
vrc_sha256: &str,
) -> Result<Option<Relationship>, AppError> {
let pairs = primary
.prefix_iter_raw(RELATIONSHIPS_PREFIX.to_vec())
.await?;
for (_k, v) in pairs {
if let Ok(rel) = decode(&v)
&& rel.vrc_sha256 == vrc_sha256
{
return Ok(Some(rel));
}
}
Ok(None)
}
pub async fn list_for_did(
primary: &KeyspaceHandle,
index: &KeyspaceHandle,
audit_key: &AuditKey,
did: &str,
cursor: Option<&Cursor>,
limit: usize,
) -> Result<Paginated<Relationship>, AppError> {
let mut index_prefix = RELATIONSHIPS_BY_DID_PREFIX.to_vec();
index_prefix.extend_from_slice(did.as_bytes());
index_prefix.push(b':');
let mut idx_pairs = index.prefix_iter_raw(index_prefix).await?;
idx_pairs.sort_by(|(a, _), (b, _)| a.cmp(b));
let mut hydrated: Vec<(Vec<u8>, Vec<u8>)> = Vec::with_capacity(idx_pairs.len());
for (idx_key, _idx_val) in idx_pairs {
let key_str = match std::str::from_utf8(&idx_key) {
Ok(s) => s,
Err(_) => continue,
};
let Some(id_str) = key_str.rsplit(':').next() else {
continue;
};
let Ok(id) = Uuid::parse_str(id_str) else {
continue;
};
if let Some(raw) = primary.get_raw(primary_key(id)).await? {
hydrated.push((idx_key, raw));
}
}
let snapshot_id: u64 = hydrated.len() as u64;
paginate(hydrated, cursor, limit, &audit_key.key, snapshot_id, decode)
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use vti_common::audit::AuditKeyStore;
use vti_common::config::StoreConfig;
use vti_common::store::Store;
async fn temp_kss() -> (KeyspaceHandle, KeyspaceHandle, AuditKey, tempfile::TempDir) {
let dir = tempfile::tempdir().unwrap();
let store = Store::open(&StoreConfig {
data_dir: dir.path().to_path_buf(),
})
.unwrap();
let primary = store.keyspace("relationships").unwrap();
let index = store.keyspace("relationships_by_did").unwrap();
let audit_key_ks = store.keyspace("audit_key").unwrap();
let key_store = AuditKeyStore::new(audit_key_ks);
key_store.ensure_initial(&[0xAB; 32]).await.unwrap();
let audit_key = key_store.active().await.unwrap();
(primary, index, audit_key, dir)
}
fn fresh(issuer: &str, subject: &str) -> Relationship {
let id = Uuid::new_v4();
Relationship {
id,
issuer_did: issuer.into(),
subject_did: subject.into(),
vrc_jsonld: serde_json::json!({
"type": ["VerifiableCredential", "VerifiableRecognitionCredential"],
"issuer": issuer,
"credentialSubject": { "id": subject, "endorsement": { "type": "endorses" } }
}),
vrc_sha256: format!("{:x}", id.as_u128()),
created_at: Utc::now(),
}
}
#[tokio::test]
async fn round_trip() {
let (primary, index, _audit, _dir) = temp_kss().await;
let rel = fresh("did:key:zA", "did:key:zB");
store_relationship(&primary, &index, &rel).await.unwrap();
let got = get_relationship(&primary, rel.id).await.unwrap().unwrap();
assert_eq!(got, rel);
}
#[tokio::test]
async fn delete_clears_primary_and_index() {
let (primary, index, _audit, _dir) = temp_kss().await;
let rel = fresh("did:key:zA", "did:key:zB");
store_relationship(&primary, &index, &rel).await.unwrap();
delete_relationship(&primary, &index, rel.id).await.unwrap();
assert!(get_relationship(&primary, rel.id).await.unwrap().is_none());
let pairs = index
.prefix_iter_raw(RELATIONSHIPS_BY_DID_PREFIX.to_vec())
.await
.unwrap();
assert!(pairs.is_empty());
}
#[tokio::test]
async fn list_for_issuer_returns_issued() {
let (primary, index, audit, _dir) = temp_kss().await;
let r1 = fresh("did:key:zA", "did:key:zB");
let r2 = fresh("did:key:zA", "did:key:zC");
let r3 = fresh("did:key:zX", "did:key:zY"); for r in [&r1, &r2, &r3] {
store_relationship(&primary, &index, r).await.unwrap();
}
let page = list_for_did(&primary, &index, &audit, "did:key:zA", None, 10)
.await
.unwrap();
assert_eq!(page.items.len(), 2);
let dids: Vec<_> = page.items.iter().map(|r| r.id).collect();
assert!(dids.contains(&r1.id));
assert!(dids.contains(&r2.id));
}
#[tokio::test]
async fn list_for_subject_returns_received() {
let (primary, index, audit, _dir) = temp_kss().await;
let r1 = fresh("did:key:zA", "did:key:zSubject");
let r2 = fresh("did:key:zB", "did:key:zSubject");
let r3 = fresh("did:key:zC", "did:key:zOther");
for r in [&r1, &r2, &r3] {
store_relationship(&primary, &index, r).await.unwrap();
}
let page = list_for_did(&primary, &index, &audit, "did:key:zSubject", None, 10)
.await
.unwrap();
assert_eq!(page.items.len(), 2);
}
#[tokio::test]
async fn list_paginates_with_cursor() {
let (primary, index, audit, _dir) = temp_kss().await;
for _ in 0..10 {
let rel = fresh("did:key:zHub", &format!("did:key:z{}", Uuid::new_v4()));
store_relationship(&primary, &index, &rel).await.unwrap();
}
let page1 = list_for_did(&primary, &index, &audit, "did:key:zHub", None, 3)
.await
.unwrap();
assert_eq!(page1.items.len(), 3);
let cursor_str = page1.next_cursor.as_ref().expect("more pages");
let cursor = Cursor::decode(cursor_str, &audit.key).expect("decode cursor");
let page2 = list_for_did(&primary, &index, &audit, "did:key:zHub", Some(&cursor), 3)
.await
.unwrap();
assert_eq!(page2.items.len(), 3);
let ids1: std::collections::HashSet<_> = page1.items.iter().map(|r| r.id).collect();
for r in &page2.items {
assert!(!ids1.contains(&r.id));
}
}
#[tokio::test]
async fn find_by_hash_returns_existing_row() {
let (primary, index, _audit, _dir) = temp_kss().await;
let mut rel = fresh("did:key:zA", "did:key:zB");
rel.vrc_sha256 = "deadbeef".into();
store_relationship(&primary, &index, &rel).await.unwrap();
let got = find_by_hash(&primary, "deadbeef").await.unwrap();
assert_eq!(got.map(|r| r.id), Some(rel.id));
let miss = find_by_hash(&primary, "feedface").await.unwrap();
assert!(miss.is_none());
}
#[tokio::test]
async fn self_loop_writes_one_index_entry() {
let (primary, index, audit, _dir) = temp_kss().await;
let rel = fresh("did:key:zSelf", "did:key:zSelf");
store_relationship(&primary, &index, &rel).await.unwrap();
let page = list_for_did(&primary, &index, &audit, "did:key:zSelf", None, 10)
.await
.unwrap();
assert_eq!(page.items.len(), 1, "self-edge must list once, not twice");
}
}