use std::time::Duration;
use decision_forum::decision_object::DecisionClass;
use exo_identity::{did::DidDocument, registry::MAX_LOCAL_DID_REGISTRY_DOCUMENTS};
use serde_json::Value as JsonValue;
use sqlx::{
Executor, Postgres, Row, Transaction,
postgres::{PgConnectOptions, PgPool, PgPoolOptions},
};
use thiserror::Error;
pub const MAX_DB_LIST_ROWS: i64 = 1_000;
pub const MAX_DB_DID_DOCUMENTS: usize = MAX_LOCAL_DID_REGISTRY_DOCUMENTS;
const DB_POOL_ACQUIRE_TIMEOUT_SECS: u64 = 5;
const DID_DOCUMENT_CAPACITY_ADVISORY_LOCK_KEY: i64 = 1_014_400_003;
const LOCATION_CONSENT_SCOPE: &str = "location";
const BLOCKING_CONFLICT_NATURE_PATTERNS: [&str; 4] =
["%financial%", "%ownership%", "%personal%", "%family%"];
#[cfg(feature = "production-db")]
const DAGDB_RUNTIME_SEARCH_PATH: &str = "dagdb,public";
#[derive(Debug, Error)]
pub enum DbInitError {
#[error("failed to parse the PostgreSQL connection string")]
ParseUrl {
#[source]
source: sqlx::Error,
},
#[error("failed to connect to PostgreSQL")]
Connect {
#[source]
source: sqlx::Error,
},
#[error("failed to run database migrations")]
Migrate {
#[source]
source: sqlx::migrate::MigrateError,
},
#[cfg(feature = "production-db")]
#[error("failed to provision the DAG DB schema")]
DagDbMigrate {
#[source]
source: exo_dag_db_postgres::postgres::DagDbPostgresError,
},
}
#[derive(Debug, Error)]
pub enum DecisionUpdateError {
#[error("decision update matched no rows for tenant_id {tenant_id} and id_hash {id_hash}")]
MissingDecision { tenant_id: String, id_hash: String },
#[error("failed to update decision row")]
Query {
#[source]
source: sqlx::Error,
},
}
#[derive(Debug, Error)]
pub enum DecisionCreateError {
#[error("decision already exists for tenant_id {tenant_id} and id_hash {id_hash}")]
AlreadyExists { tenant_id: String, id_hash: String },
#[error("failed to create decision row")]
Query {
#[source]
source: sqlx::Error,
},
}
#[derive(Debug, Error)]
pub enum DidDocumentPersistenceError {
#[error("DID document timestamp is out of database range for field {field}: {value}")]
TimestampOutOfRange {
did: String,
field: &'static str,
value: u64,
},
#[error("failed to serialize DID document")]
Serialize {
did: String,
#[source]
source: serde_json::Error,
},
#[error("failed to deserialize persisted DID document")]
Deserialize {
did: String,
#[source]
source: serde_json::Error,
},
#[error("persisted DID document row key does not match payload id")]
DocumentDidMismatch {
row_did: String,
document_did: String,
},
#[error(
"DID document registry capacity exceeded: max_documents={max_documents}, attempted_documents={attempted_documents}"
)]
RegistryCapacityExceeded {
max_documents: usize,
attempted_documents: usize,
},
#[error("DID document persistence query failed")]
Query {
#[source]
source: sqlx::Error,
},
}
#[derive(Debug, Error)]
pub enum GatewayIdentityErasureError {
#[error("identity erasure timestamp must be positive: {erased_at_ms}")]
InvalidTimestamp { erased_at_ms: i64 },
#[error("gateway identity erasure query failed")]
Query {
#[source]
source: sqlx::Error,
},
}
#[derive(Debug, Error)]
pub enum ScanReceiptInsertError {
#[error("scan receipt location requires active location consent")]
LocationConsentRequired,
#[error("scan receipt insert query failed")]
Query {
#[source]
source: sqlx::Error,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct GatewayIdentityErasureSummary {
pub did_documents_tombstoned: u64,
pub users_deleted: u64,
pub agents_deleted: u64,
pub sessions_deleted: u64,
pub identity_scores_deleted: u64,
pub enrollment_log_deleted: u64,
pub livesafe_identities_deleted: u64,
pub scan_receipts_deleted: u64,
pub consent_anchors_deleted: u64,
pub trustee_shards_deleted: u64,
pub agent_roles_deleted: u64,
pub consent_records_deleted: u64,
pub authority_chains_deleted: u64,
pub delegations_deleted: u64,
pub layout_templates_deleted: u64,
pub feedback_issues_deleted: u64,
pub conflict_declarations_deleted: u64,
}
pub async fn init_pool(database_url: &str) -> Result<PgPool, DbInitError> {
let connect_options: PgConnectOptions = database_url
.parse()
.map_err(|source| DbInitError::ParseUrl { source })?;
#[cfg(feature = "production-db")]
let pool = {
{
let migration_options = connect_options.clone().options([(
"search_path",
format!(
"public,{}",
exo_dag_db_postgres::postgres::DAGDB_MIGRATION_SCHEMA
),
)]);
let migration_pool = PgPoolOptions::new()
.max_connections(10)
.acquire_timeout(Duration::from_secs(DB_POOL_ACQUIRE_TIMEOUT_SECS))
.connect_with(migration_options)
.await
.map_err(|source| DbInitError::Connect { source })?;
sqlx::migrate!("./migrations")
.run(&migration_pool)
.await
.map_err(|source| DbInitError::Migrate { source })?;
exo_dag_db_postgres::postgres::run_migrations_in_schema(
&migration_pool,
exo_dag_db_postgres::postgres::DAGDB_MIGRATION_SCHEMA,
)
.await
.map_err(|source| DbInitError::DagDbMigrate { source })?;
migration_pool.close().await;
tracing::info!("DAG DB schema provisioned via the ledgered DAG DB migrator");
let runtime_options =
connect_options.options([("search_path", DAGDB_RUNTIME_SEARCH_PATH.to_owned())]);
let runtime_pool = PgPoolOptions::new()
.max_connections(10)
.acquire_timeout(Duration::from_secs(DB_POOL_ACQUIRE_TIMEOUT_SECS))
.connect_with(runtime_options)
.await
.map_err(|source| DbInitError::Connect { source })?;
tracing::info!("PostgreSQL connection pool ready with DAG DB runtime search path");
runtime_pool
}
};
#[cfg(not(feature = "production-db"))]
let pool = {
let pool = PgPoolOptions::new()
.max_connections(10)
.acquire_timeout(Duration::from_secs(DB_POOL_ACQUIRE_TIMEOUT_SECS))
.connect_with(connect_options)
.await
.map_err(|source| DbInitError::Connect { source })?;
sqlx::migrate!("./migrations")
.run(&pool)
.await
.map_err(|source| DbInitError::Migrate { source })?;
tracing::info!("PostgreSQL connection pool ready and migrations applied");
pool
};
Ok(pool)
}
pub async fn next_hlc(pool: &PgPool) -> Result<i64, sqlx::Error> {
let row = sqlx::query("UPDATE hlc_state SET counter = counter + 1 RETURNING counter")
.fetch_one(pool)
.await?;
Ok(row.get::<i64, _>("counter"))
}
fn timestamp_ms_to_i64(
did: &str,
field: &'static str,
value: u64,
) -> Result<i64, DidDocumentPersistenceError> {
i64::try_from(value).map_err(|_| DidDocumentPersistenceError::TimestampOutOfRange {
did: did.to_owned(),
field,
value,
})
}
pub async fn insert_did_document(
pool: &PgPool,
doc: &DidDocument,
) -> Result<bool, DidDocumentPersistenceError> {
insert_did_document_with_capacity(pool, doc, MAX_DB_DID_DOCUMENTS).await
}
async fn insert_did_document_with_capacity(
pool: &PgPool,
doc: &DidDocument,
max_documents: usize,
) -> Result<bool, DidDocumentPersistenceError> {
let did = doc.id.as_str();
let document =
serde_json::to_value(doc).map_err(|source| DidDocumentPersistenceError::Serialize {
did: did.to_owned(),
source,
})?;
let created_at_ms = timestamp_ms_to_i64(did, "created", doc.created.physical_ms)?;
let updated_at_ms = timestamp_ms_to_i64(did, "updated", doc.updated.physical_ms)?;
let mut tx = pool
.begin()
.await
.map_err(|source| DidDocumentPersistenceError::Query { source })?;
sqlx::query("SELECT pg_advisory_xact_lock($1)")
.bind(DID_DOCUMENT_CAPACITY_ADVISORY_LOCK_KEY)
.execute(&mut *tx)
.await
.map_err(|source| DidDocumentPersistenceError::Query { source })?;
let existing_did: bool =
sqlx::query_scalar("SELECT EXISTS (SELECT 1 FROM did_documents WHERE did = $1)")
.bind(did)
.fetch_one(&mut *tx)
.await
.map_err(|source| DidDocumentPersistenceError::Query { source })?;
if existing_did {
tx.commit()
.await
.map_err(|source| DidDocumentPersistenceError::Query { source })?;
return Ok(false);
}
let document_count: i64 =
sqlx::query_scalar("SELECT COUNT(*) AS document_count FROM did_documents")
.fetch_one(&mut *tx)
.await
.map_err(|source| DidDocumentPersistenceError::Query { source })?;
let existing_documents = match usize::try_from(document_count) {
Ok(count) => count,
Err(_) => max_documents,
};
if existing_documents >= max_documents {
return Err(DidDocumentPersistenceError::RegistryCapacityExceeded {
max_documents,
attempted_documents: existing_documents.saturating_add(1),
});
}
let result = sqlx::query(
"INSERT INTO did_documents (did, document, created_at_ms, updated_at_ms, revoked) \
VALUES ($1, $2, $3, $4, $5) \
ON CONFLICT (did) DO NOTHING",
)
.bind(did)
.bind(document)
.bind(created_at_ms)
.bind(updated_at_ms)
.bind(doc.revoked)
.execute(&mut *tx)
.await
.map_err(|source| DidDocumentPersistenceError::Query { source })?;
tx.commit()
.await
.map_err(|source| DidDocumentPersistenceError::Query { source })?;
Ok(result.rows_affected() > 0)
}
pub async fn find_did_document(
pool: &PgPool,
did: &str,
) -> Result<Option<DidDocument>, DidDocumentPersistenceError> {
let row = sqlx::query(
"SELECT document \
FROM did_documents \
WHERE did = $1 AND revoked = false",
)
.bind(did)
.fetch_optional(pool)
.await
.map_err(|source| DidDocumentPersistenceError::Query { source })?;
let Some(row) = row else {
return Ok(None);
};
let document = row.get::<JsonValue, _>("document");
let doc = serde_json::from_value::<DidDocument>(document).map_err(|source| {
DidDocumentPersistenceError::Deserialize {
did: did.to_owned(),
source,
}
})?;
if doc.id.as_str() != did {
return Err(DidDocumentPersistenceError::DocumentDidMismatch {
row_did: did.to_owned(),
document_did: doc.id.as_str().to_owned(),
});
}
Ok(Some(doc))
}
pub async fn list_did_document_ids(pool: &PgPool) -> Result<Vec<String>, sqlx::Error> {
let rows = sqlx::query(
"SELECT did \
FROM did_documents \
WHERE revoked = false \
ORDER BY did \
LIMIT $1",
)
.bind(MAX_DB_LIST_ROWS)
.fetch_all(pool)
.await?;
Ok(rows
.into_iter()
.map(|row| row.get::<String, _>("did"))
.collect())
}
pub async fn erase_gateway_identity_records(
pool: &PgPool,
did: &str,
erased_at_ms: i64,
) -> Result<GatewayIdentityErasureSummary, GatewayIdentityErasureError> {
if erased_at_ms <= 0 {
return Err(GatewayIdentityErasureError::InvalidTimestamp { erased_at_ms });
}
let mut tx = pool
.begin()
.await
.map_err(|source| GatewayIdentityErasureError::Query { source })?;
let tombstone = serde_json::json!({
"schema": "exo.gateway.did_document_tombstone.v1",
"erased": true
});
let did_documents_tombstoned = sqlx::query(
"UPDATE did_documents \
SET document = $2, updated_at_ms = $3, revoked = true, erased_at_ms = $3 \
WHERE did = $1",
)
.bind(did)
.bind(tombstone)
.bind(erased_at_ms)
.execute(&mut *tx)
.await
.map_err(|source| GatewayIdentityErasureError::Query { source })?
.rows_affected();
let sessions_deleted = sqlx::query("DELETE FROM sessions WHERE actor_did = $1")
.bind(did)
.execute(&mut *tx)
.await
.map_err(|source| GatewayIdentityErasureError::Query { source })?
.rows_affected();
let users_deleted = sqlx::query("DELETE FROM users WHERE did = $1")
.bind(did)
.execute(&mut *tx)
.await
.map_err(|source| GatewayIdentityErasureError::Query { source })?
.rows_affected();
let agents_deleted = sqlx::query("DELETE FROM agents WHERE did = $1 OR owner_did = $1")
.bind(did)
.execute(&mut *tx)
.await
.map_err(|source| GatewayIdentityErasureError::Query { source })?
.rows_affected();
let identity_scores_deleted = sqlx::query("DELETE FROM identity_scores WHERE did = $1")
.bind(did)
.execute(&mut *tx)
.await
.map_err(|source| GatewayIdentityErasureError::Query { source })?
.rows_affected();
let enrollment_log_deleted = sqlx::query("DELETE FROM enrollment_log WHERE did = $1")
.bind(did)
.execute(&mut *tx)
.await
.map_err(|source| GatewayIdentityErasureError::Query { source })?
.rows_affected();
let livesafe_identities_deleted = sqlx::query("DELETE FROM livesafe_identities WHERE did = $1")
.bind(did)
.execute(&mut *tx)
.await
.map_err(|source| GatewayIdentityErasureError::Query { source })?
.rows_affected();
let scan_receipts_deleted =
sqlx::query("DELETE FROM scan_receipts WHERE subscriber_did = $1 OR responder_did = $1")
.bind(did)
.execute(&mut *tx)
.await
.map_err(|source| GatewayIdentityErasureError::Query { source })?
.rows_affected();
let consent_anchors_deleted =
sqlx::query("DELETE FROM consent_anchors WHERE subscriber_did = $1 OR provider_did = $1")
.bind(did)
.execute(&mut *tx)
.await
.map_err(|source| GatewayIdentityErasureError::Query { source })?
.rows_affected();
let trustee_shards_deleted = sqlx::query(
"DELETE FROM trustee_shard_status WHERE subscriber_did = $1 OR trustee_did = $1",
)
.bind(did)
.execute(&mut *tx)
.await
.map_err(|source| GatewayIdentityErasureError::Query { source })?
.rows_affected();
let agent_roles_deleted =
sqlx::query("DELETE FROM agent_roles WHERE agent_did = $1 OR granted_by = $1")
.bind(did)
.execute(&mut *tx)
.await
.map_err(|source| GatewayIdentityErasureError::Query { source })?
.rows_affected();
let consent_records_deleted =
sqlx::query("DELETE FROM consent_records WHERE subject_did = $1 OR actor_did = $1")
.bind(did)
.execute(&mut *tx)
.await
.map_err(|source| GatewayIdentityErasureError::Query { source })?
.rows_affected();
let authority_chains_deleted = sqlx::query("DELETE FROM authority_chains WHERE actor_did = $1")
.bind(did)
.execute(&mut *tx)
.await
.map_err(|source| GatewayIdentityErasureError::Query { source })?
.rows_affected();
let delegations_deleted =
sqlx::query("DELETE FROM delegations WHERE delegator = $1 OR delegatee = $1")
.bind(did)
.execute(&mut *tx)
.await
.map_err(|source| GatewayIdentityErasureError::Query { source })?
.rows_affected();
let layout_templates_deleted = sqlx::query("DELETE FROM layout_templates WHERE user_did = $1")
.bind(did)
.execute(&mut *tx)
.await
.map_err(|source| GatewayIdentityErasureError::Query { source })?
.rows_affected();
let feedback_issues_deleted =
sqlx::query("DELETE FROM feedback_issues WHERE reporter_did = $1")
.bind(did)
.execute(&mut *tx)
.await
.map_err(|source| GatewayIdentityErasureError::Query { source })?
.rows_affected();
let conflict_declarations_deleted = sqlx::query(
"DELETE FROM conflict_declarations \
WHERE declarant_did = $1",
)
.bind(did)
.execute(&mut *tx)
.await
.map_err(|source| GatewayIdentityErasureError::Query { source })?
.rows_affected();
tx.commit()
.await
.map_err(|source| GatewayIdentityErasureError::Query { source })?;
Ok(GatewayIdentityErasureSummary {
did_documents_tombstoned,
users_deleted,
agents_deleted,
sessions_deleted,
identity_scores_deleted,
enrollment_log_deleted,
livesafe_identities_deleted,
scan_receipts_deleted,
consent_anchors_deleted,
trustee_shards_deleted,
agent_roles_deleted,
consent_records_deleted,
authority_chains_deleted,
delegations_deleted,
layout_templates_deleted,
feedback_issues_deleted,
conflict_declarations_deleted,
})
}
#[allow(clippy::too_many_arguments)]
pub async fn insert_user(
pool: &PgPool,
did: &str,
display_name: &str,
email: &str,
roles: &JsonValue,
tenant_id: &str,
created_at: i64,
status: &str,
pace_status: &str,
password_hash: &str,
salt: &str,
mfa_enabled: bool,
) -> Result<(), sqlx::Error> {
sqlx::query(
"INSERT INTO users (did, display_name, email, roles, tenant_id, created_at, status, pace_status, password_hash, salt, mfa_enabled)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (did) DO NOTHING"
)
.bind(did).bind(display_name).bind(email).bind(roles).bind(tenant_id)
.bind(created_at).bind(status).bind(pace_status).bind(password_hash)
.bind(salt).bind(mfa_enabled)
.execute(pool).await?;
Ok(())
}
pub async fn find_user_by_email(
pool: &PgPool,
email: &str,
) -> Result<Option<PublicUserRow>, sqlx::Error> {
sqlx::query_as::<_, PublicUserRow>(
"SELECT did, display_name, email, roles, tenant_id, created_at, status, pace_status, mfa_enabled FROM users WHERE email = $1"
).bind(email).fetch_optional(pool).await
}
pub async fn find_user_by_did(
pool: &PgPool,
did: &str,
) -> Result<Option<PublicUserRow>, sqlx::Error> {
sqlx::query_as::<_, PublicUserRow>(
"SELECT did, display_name, email, roles, tenant_id, created_at, status, pace_status, mfa_enabled FROM users WHERE did = $1"
).bind(did).fetch_optional(pool).await
}
pub async fn active_human_user_dids_for_votes(
pool: &PgPool,
tenant_id: &str,
voter_dids: &[String],
) -> Result<Vec<String>, sqlx::Error> {
if voter_dids.is_empty() {
return Ok(Vec::new());
}
sqlx::query_scalar::<_, String>(
"SELECT did FROM users WHERE tenant_id = $1 AND status = 'Active' AND did = ANY($2) ORDER BY did",
)
.bind(tenant_id)
.bind(voter_dids)
.fetch_all(pool)
.await
}
pub async fn list_users_db(
pool: &PgPool,
tenant_id: &str,
) -> Result<Vec<PublicUserRow>, sqlx::Error> {
sqlx::query_as::<_, PublicUserRow>(
"SELECT did, display_name, email, roles, tenant_id, created_at, status, pace_status, mfa_enabled FROM users WHERE tenant_id = $1 ORDER BY created_at LIMIT $2"
).bind(tenant_id).bind(MAX_DB_LIST_ROWS).fetch_all(pool).await
}
pub async fn update_user_pace(
pool: &PgPool,
did: &str,
pace_status: &str,
) -> Result<(), sqlx::Error> {
let result = sqlx::query("UPDATE users SET pace_status = $1 WHERE did = $2")
.bind(pace_status)
.bind(did)
.execute(pool)
.await?;
if result.rows_affected() == 0 {
return Err(sqlx::Error::RowNotFound);
}
Ok(())
}
pub async fn user_exists_by_email(pool: &PgPool, email: &str) -> Result<bool, sqlx::Error> {
Ok(sqlx::query("SELECT 1 FROM users WHERE email = $1")
.bind(email)
.fetch_optional(pool)
.await?
.is_some())
}
pub async fn count_users(pool: &PgPool) -> Result<i64, sqlx::Error> {
Ok(sqlx::query("SELECT COUNT(*) as cnt FROM users")
.fetch_one(pool)
.await?
.get::<i64, _>("cnt"))
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct PublicUserRow {
pub did: String,
pub display_name: String,
pub email: String,
pub roles: JsonValue,
pub tenant_id: String,
pub created_at: i64,
pub status: String,
pub pace_status: String,
pub mfa_enabled: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct QuorumEligibilityCounts {
pub eligible_voters: usize,
pub eligible_human_voters: usize,
}
fn decision_class_rank(class: DecisionClass) -> i32 {
match class {
DecisionClass::Routine => 0,
DecisionClass::Operational => 1,
DecisionClass::Strategic => 2,
DecisionClass::Constitutional => 3,
}
}
fn count_result_to_usize(label: &'static str, value: i64) -> Result<usize, sqlx::Error> {
usize::try_from(value)
.map_err(|_| sqlx::Error::Protocol(format!("{label} returned invalid count {value}")))
}
async fn count_quorum_eligible_voters_with_executor<'e, E>(
executor: E,
tenant_id: &str,
decision_class: DecisionClass,
) -> Result<QuorumEligibilityCounts, sqlx::Error>
where
E: Executor<'e, Database = Postgres>,
{
let row = sqlx::query(
r#"
SELECT
(
SELECT COUNT(*)
FROM users
WHERE tenant_id = $1
AND status = 'Active'
) AS active_human_users,
(
SELECT COUNT(*)
FROM agents
WHERE tenant_id = $1
AND status = 'Active'
AND delegation_id IS NOT NULL
AND CASE max_decision_class
WHEN 'Routine' THEN 0
WHEN 'Operational' THEN 1
WHEN 'Strategic' THEN 2
WHEN 'Constitutional' THEN 3
ELSE -1
END >= $2
) AS active_delegated_agents
"#,
)
.bind(tenant_id)
.bind(decision_class_rank(decision_class))
.fetch_one(executor)
.await?;
let active_human_users = count_result_to_usize(
"active_human_users",
row.try_get::<i64, _>("active_human_users")?,
)?;
let active_delegated_agents = count_result_to_usize(
"active_delegated_agents",
row.try_get::<i64, _>("active_delegated_agents")?,
)?;
let eligible_voters = active_human_users
.checked_add(active_delegated_agents)
.ok_or_else(|| sqlx::Error::Protocol("quorum eligible voter count overflowed".into()))?;
Ok(QuorumEligibilityCounts {
eligible_voters,
eligible_human_voters: active_human_users,
})
}
pub async fn count_quorum_eligible_voters(
pool: &PgPool,
tenant_id: &str,
decision_class: DecisionClass,
) -> Result<QuorumEligibilityCounts, sqlx::Error> {
count_quorum_eligible_voters_with_executor(pool, tenant_id, decision_class).await
}
pub async fn count_quorum_eligible_voters_in_transaction(
tx: &mut Transaction<'_, Postgres>,
tenant_id: &str,
decision_class: DecisionClass,
) -> Result<QuorumEligibilityCounts, sqlx::Error> {
count_quorum_eligible_voters_with_executor(&mut **tx, tenant_id, decision_class).await
}
#[allow(clippy::too_many_arguments)]
pub async fn insert_agent(
pool: &PgPool,
did: &str,
agent_name: &str,
agent_type: &str,
owner_did: &str,
tenant_id: &str,
capabilities: &JsonValue,
trust_tier: &str,
trust_score: i32,
delegation_id: Option<&str>,
pace_status: &str,
created_at: i64,
status: &str,
max_decision_class: &str,
) -> Result<(), sqlx::Error> {
sqlx::query(
"INSERT INTO agents (did, agent_name, agent_type, owner_did, tenant_id, capabilities, trust_tier, trust_score, delegation_id, pace_status, created_at, status, max_decision_class)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
ON CONFLICT (did) DO NOTHING"
)
.bind(did).bind(agent_name).bind(agent_type).bind(owner_did).bind(tenant_id)
.bind(capabilities).bind(trust_tier).bind(trust_score).bind(delegation_id)
.bind(pace_status).bind(created_at).bind(status).bind(max_decision_class)
.execute(pool).await?;
Ok(())
}
pub async fn find_agent_by_did(
pool: &PgPool,
did: &str,
tenant_id: &str,
) -> Result<Option<AgentRow>, sqlx::Error> {
sqlx::query_as::<_, AgentRow>(
"SELECT did, agent_name, agent_type, owner_did, tenant_id, capabilities, trust_tier, trust_score, delegation_id, pace_status, created_at, status, max_decision_class FROM agents WHERE did = $1 AND tenant_id = $2"
).bind(did).bind(tenant_id).fetch_optional(pool).await
}
pub async fn list_agents_db(pool: &PgPool, tenant_id: &str) -> Result<Vec<AgentRow>, sqlx::Error> {
sqlx::query_as::<_, AgentRow>(
"SELECT did, agent_name, agent_type, owner_did, tenant_id, capabilities, trust_tier, trust_score, delegation_id, pace_status, created_at, status, max_decision_class FROM agents WHERE tenant_id = $1 ORDER BY created_at LIMIT $2"
).bind(tenant_id).bind(MAX_DB_LIST_ROWS).fetch_all(pool).await
}
pub async fn update_agent_pace(
pool: &PgPool,
did: &str,
pace_status: &str,
) -> Result<(), sqlx::Error> {
let result = sqlx::query("UPDATE agents SET pace_status = $1 WHERE did = $2")
.bind(pace_status)
.bind(did)
.execute(pool)
.await?;
if result.rows_affected() == 0 {
return Err(sqlx::Error::RowNotFound);
}
Ok(())
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct AgentRow {
pub did: String,
pub agent_name: String,
pub agent_type: String,
pub owner_did: String,
pub tenant_id: String,
pub capabilities: JsonValue,
pub trust_tier: String,
pub trust_score: i32,
pub delegation_id: Option<String>,
pub pace_status: String,
pub created_at: i64,
pub status: String,
pub max_decision_class: String,
}
#[allow(clippy::too_many_arguments)]
pub async fn insert_decision(
pool: &PgPool,
id_hash: &str,
tenant_id: &str,
status: &str,
title: &str,
decision_class: &str,
author: &str,
created_at_ms: i64,
constitution_version: &str,
payload: &JsonValue,
) -> Result<(), sqlx::Error> {
sqlx::query(
"INSERT INTO decisions (id_hash, tenant_id, status, title, decision_class, author, created_at_ms, constitution_version, payload)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (tenant_id, id_hash) DO UPDATE SET status = $3, payload = $9"
)
.bind(id_hash).bind(tenant_id).bind(status).bind(title).bind(decision_class)
.bind(author).bind(created_at_ms).bind(constitution_version).bind(payload)
.execute(pool).await?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub async fn create_decision(
pool: &PgPool,
id_hash: &str,
tenant_id: &str,
status: &str,
title: &str,
decision_class: &str,
author: &str,
created_at_ms: i64,
constitution_version: &str,
payload: &JsonValue,
) -> Result<(), DecisionCreateError> {
let result = sqlx::query(
"INSERT INTO decisions (id_hash, tenant_id, status, title, decision_class, author, created_at_ms, constitution_version, payload)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (tenant_id, id_hash) DO NOTHING"
)
.bind(id_hash)
.bind(tenant_id)
.bind(status)
.bind(title)
.bind(decision_class)
.bind(author)
.bind(created_at_ms)
.bind(constitution_version)
.bind(payload)
.execute(pool)
.await
.map_err(|source| DecisionCreateError::Query { source })?;
if result.rows_affected() == 0 {
return Err(DecisionCreateError::AlreadyExists {
tenant_id: tenant_id.to_owned(),
id_hash: id_hash.to_owned(),
});
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub async fn upsert_decision(
pool: &PgPool,
id_hash: &str,
tenant_id: &str,
status: &str,
title: &str,
decision_class: &str,
author: &str,
created_at_ms: i64,
constitution_version: &str,
payload: &JsonValue,
) -> Result<(), sqlx::Error> {
insert_decision(
pool,
id_hash,
tenant_id,
status,
title,
decision_class,
author,
created_at_ms,
constitution_version,
payload,
)
.await
}
pub async fn find_decision(
pool: &PgPool,
id_hash: &str,
tenant_id: &str,
) -> Result<Option<DecisionRow>, sqlx::Error> {
sqlx::query_as::<_, DecisionRow>(
"SELECT id_hash, tenant_id, status, title, decision_class, author, created_at_ms, constitution_version, payload FROM decisions WHERE id_hash = $1 AND tenant_id = $2"
).bind(id_hash).bind(tenant_id).fetch_optional(pool).await
}
pub async fn list_decisions_db(
pool: &PgPool,
tenant_id: &str,
) -> Result<Vec<DecisionRow>, sqlx::Error> {
sqlx::query_as::<_, DecisionRow>(
"SELECT id_hash, tenant_id, status, title, decision_class, author, created_at_ms, constitution_version, payload FROM decisions WHERE tenant_id = $1 ORDER BY created_at_ms LIMIT $2"
).bind(tenant_id).bind(MAX_DB_LIST_ROWS).fetch_all(pool).await
}
pub async fn update_decision(
pool: &PgPool,
id_hash: &str,
tenant_id: &str,
status: &str,
payload: &JsonValue,
) -> Result<(), DecisionUpdateError> {
let result = sqlx::query(
"UPDATE decisions SET status = $1, payload = $2 WHERE id_hash = $3 AND tenant_id = $4",
)
.bind(status)
.bind(payload)
.bind(id_hash)
.bind(tenant_id)
.execute(pool)
.await
.map_err(|source| DecisionUpdateError::Query { source })?;
if result.rows_affected() == 0 {
return Err(DecisionUpdateError::MissingDecision {
tenant_id: tenant_id.to_owned(),
id_hash: id_hash.to_owned(),
});
}
Ok(())
}
pub async fn list_conflict_declaration_payloads_db(
pool: &PgPool,
declarant_did: &str,
) -> Result<Vec<JsonValue>, sqlx::Error> {
let rows = sqlx::query(
"SELECT payload FROM conflict_declarations
WHERE declarant_did = $1
ORDER BY timestamp_physical_ms, timestamp_logical, id_hash
LIMIT $2",
)
.bind(declarant_did)
.bind(MAX_DB_LIST_ROWS)
.fetch_all(pool)
.await?;
rows.into_iter()
.map(|row| row.try_get::<JsonValue, _>("payload"))
.collect()
}
#[derive(Debug, Clone)]
pub struct ConflictDeclarationRecusalCandidate {
pub declarant_did: String,
pub nature: String,
pub related_dids: JsonValue,
pub payload: JsonValue,
}
pub async fn list_blocking_conflict_declaration_recusal_candidates_db(
pool: &PgPool,
declarant_did: &str,
affected_dids: &[String],
) -> Result<Vec<ConflictDeclarationRecusalCandidate>, sqlx::Error> {
if affected_dids.is_empty() {
return Ok(Vec::new());
}
let blocking_patterns = BLOCKING_CONFLICT_NATURE_PATTERNS
.into_iter()
.map(ToOwned::to_owned)
.collect::<Vec<String>>();
let row = sqlx::query(
"SELECT declarant_did, nature, related_dids, payload FROM conflict_declarations
WHERE declarant_did = $1
AND related_dids ?| $2
AND nature LIKE ANY($3)
ORDER BY timestamp_physical_ms, timestamp_logical, id_hash
LIMIT 1",
)
.bind(declarant_did)
.bind(affected_dids)
.bind(blocking_patterns)
.fetch_optional(pool)
.await?;
match row {
Some(row) => Ok(vec![ConflictDeclarationRecusalCandidate {
declarant_did: row.try_get::<String, _>("declarant_did")?,
nature: row.try_get::<String, _>("nature")?,
related_dids: row.try_get::<JsonValue, _>("related_dids")?,
payload: row.try_get::<JsonValue, _>("payload")?,
}]),
None => Ok(Vec::new()),
}
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct DecisionRow {
pub id_hash: String,
pub tenant_id: String,
pub status: String,
pub title: String,
pub decision_class: String,
pub author: String,
pub created_at_ms: i64,
pub constitution_version: String,
pub payload: JsonValue,
}
#[allow(clippy::too_many_arguments)]
pub async fn insert_delegation(
pool: &PgPool,
id_hash: &str,
tenant_id: &str,
delegator: &str,
delegatee: &str,
created_at_ms: i64,
expires_at: i64,
constitution_version: &str,
payload: &JsonValue,
) -> Result<(), sqlx::Error> {
sqlx::query(
"INSERT INTO delegations (id_hash, tenant_id, delegator, delegatee, created_at_ms, expires_at, constitution_version, payload)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (id_hash) DO NOTHING"
)
.bind(id_hash).bind(tenant_id).bind(delegator).bind(delegatee)
.bind(created_at_ms).bind(expires_at).bind(constitution_version).bind(payload)
.execute(pool).await?;
Ok(())
}
pub async fn list_delegations_db(pool: &PgPool) -> Result<Vec<DelegationRow>, sqlx::Error> {
sqlx::query_as::<_, DelegationRow>(
"SELECT id_hash, tenant_id, delegator, delegatee, created_at_ms, expires_at, revoked_at, constitution_version, payload FROM delegations ORDER BY created_at_ms LIMIT $1"
).bind(MAX_DB_LIST_ROWS).fetch_all(pool).await
}
pub async fn has_active_delegation(pool: &PgPool, delegatee: &str) -> Result<bool, sqlx::Error> {
Ok(
sqlx::query(
"SELECT 1 FROM delegations WHERE delegatee = $1 AND revoked_at IS NULL LIMIT 1",
)
.bind(delegatee)
.fetch_optional(pool)
.await?
.is_some(),
)
}
pub async fn has_active_delegation_either(pool: &PgPool, did: &str) -> Result<bool, sqlx::Error> {
Ok(sqlx::query("SELECT 1 FROM delegations WHERE (delegatee = $1 OR delegator = $1) AND revoked_at IS NULL LIMIT 1")
.bind(did).fetch_optional(pool).await?.is_some())
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct DelegationRow {
pub id_hash: String,
pub tenant_id: String,
pub delegator: String,
pub delegatee: String,
pub created_at_ms: i64,
pub expires_at: i64,
pub revoked_at: Option<i64>,
pub constitution_version: String,
pub payload: JsonValue,
}
#[allow(clippy::too_many_arguments)]
pub async fn insert_audit_entry(
pool: &PgPool,
sequence: i64,
prev_hash: &str,
event_hash: &str,
event_type: &str,
actor: &str,
tenant_id: &str,
decision_id: &str,
timestamp_physical_ms: i64,
timestamp_logical: i32,
entry_hash: &str,
) -> Result<(), sqlx::Error> {
sqlx::query(
"INSERT INTO audit_entries (sequence, prev_hash, event_hash, event_type, actor, tenant_id, decision_id, timestamp_physical_ms, timestamp_logical, entry_hash)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)"
)
.bind(sequence).bind(prev_hash).bind(event_hash).bind(event_type)
.bind(actor).bind(tenant_id).bind(decision_id).bind(timestamp_physical_ms)
.bind(timestamp_logical).bind(entry_hash)
.execute(pool).await?;
Ok(())
}
pub async fn list_audit_entries(pool: &PgPool) -> Result<Vec<AuditRow>, sqlx::Error> {
sqlx::query_as::<_, AuditRow>(
"SELECT sequence, prev_hash, event_hash, event_type, actor, tenant_id, decision_id, timestamp_physical_ms, timestamp_logical, entry_hash FROM audit_entries ORDER BY sequence LIMIT $1"
).bind(MAX_DB_LIST_ROWS).fetch_all(pool).await
}
pub async fn list_audit_entries_for_decision(
pool: &PgPool,
decision_id: &str,
tenant_id: &str,
) -> Result<Vec<AuditRow>, sqlx::Error> {
sqlx::query_as::<_, AuditRow>(
"SELECT sequence, prev_hash, event_hash, event_type, actor, tenant_id, decision_id, timestamp_physical_ms, timestamp_logical, entry_hash
FROM audit_entries WHERE decision_id = $1 AND tenant_id = $2 ORDER BY sequence LIMIT $3",
)
.bind(decision_id)
.bind(tenant_id)
.bind(MAX_DB_LIST_ROWS)
.fetch_all(pool)
.await
}
pub async fn get_last_audit_entry(pool: &PgPool) -> Result<Option<AuditRow>, sqlx::Error> {
sqlx::query_as::<_, AuditRow>(
"SELECT sequence, prev_hash, event_hash, event_type, actor, tenant_id, decision_id, timestamp_physical_ms, timestamp_logical, entry_hash FROM audit_entries ORDER BY sequence DESC LIMIT 1"
).fetch_optional(pool).await
}
pub async fn count_audit_entries(pool: &PgPool) -> Result<i64, sqlx::Error> {
Ok(sqlx::query("SELECT COUNT(*) as cnt FROM audit_entries")
.fetch_one(pool)
.await?
.get::<i64, _>("cnt"))
}
pub async fn check_actor_in_audit(pool: &PgPool, actor: &str) -> Result<bool, sqlx::Error> {
Ok(
sqlx::query("SELECT 1 FROM audit_entries WHERE actor = $1 LIMIT 1")
.bind(actor)
.fetch_optional(pool)
.await?
.is_some(),
)
}
pub async fn check_actor_voted(pool: &PgPool, actor: &str) -> Result<bool, sqlx::Error> {
Ok(sqlx::query(
"SELECT 1 FROM audit_entries WHERE actor = $1 AND event_type = 'VoteCast' LIMIT 1",
)
.bind(actor)
.fetch_optional(pool)
.await?
.is_some())
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct AuditRow {
pub sequence: i64,
pub prev_hash: String,
pub event_hash: String,
pub event_type: String,
pub actor: String,
pub tenant_id: String,
pub decision_id: String,
pub timestamp_physical_ms: i64,
pub timestamp_logical: i32,
pub entry_hash: String,
}
pub async fn upsert_constitution(
pool: &PgPool,
tenant_id: &str,
version: &str,
payload: &JsonValue,
) -> Result<(), sqlx::Error> {
sqlx::query(
"INSERT INTO constitutions (tenant_id, version, payload) VALUES ($1, $2, $3)
ON CONFLICT (tenant_id, version) DO UPDATE SET payload = $3",
)
.bind(tenant_id)
.bind(version)
.bind(payload)
.execute(pool)
.await?;
Ok(())
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct ConstitutionRow {
pub tenant_id: String,
pub version: String,
pub payload: JsonValue,
}
pub async fn upsert_identity_score(
pool: &PgPool,
did: &str,
score: i32,
tier: &str,
factors: &JsonValue,
last_updated: i64,
) -> Result<(), sqlx::Error> {
sqlx::query(
"INSERT INTO identity_scores (did, score, tier, factors, last_updated) VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (did) DO UPDATE SET score = $2, tier = $3, factors = $4, last_updated = $5"
).bind(did).bind(score).bind(tier).bind(factors).bind(last_updated)
.execute(pool).await?;
Ok(())
}
pub async fn get_identity_score(
pool: &PgPool,
did: &str,
) -> Result<Option<IdentityScoreRow>, sqlx::Error> {
sqlx::query_as::<_, IdentityScoreRow>(
"SELECT did, score, tier, factors, last_updated FROM identity_scores WHERE did = $1",
)
.bind(did)
.fetch_optional(pool)
.await
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct IdentityScoreRow {
pub did: String,
pub score: i32,
pub tier: String,
pub factors: JsonValue,
pub last_updated: i64,
}
pub async fn insert_enrollment(
pool: &PgPool,
did: &str,
entity_type: &str,
step: &str,
timestamp: i64,
verified_by: &str,
audit_hash: &str,
) -> Result<(), sqlx::Error> {
sqlx::query(
"INSERT INTO enrollment_log (did, entity_type, step, timestamp, verified_by, audit_hash) VALUES ($1, $2, $3, $4, $5, $6)"
).bind(did).bind(entity_type).bind(step).bind(timestamp).bind(verified_by).bind(audit_hash)
.execute(pool).await?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub async fn insert_livesafe_identity(
pool: &PgPool,
did: &str,
odentity_composite_basis_points: i32,
pace_status: &str,
card_status: &str,
created_at_ms: i64,
exochain_anchor: Option<&str>,
) -> Result<(), sqlx::Error> {
sqlx::query(
"INSERT INTO livesafe_identities (did, odentity_composite_basis_points, pace_status, card_status, created_at_ms, exochain_anchor)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (did) DO UPDATE SET odentity_composite_basis_points = $2, pace_status = $3, card_status = $4, exochain_anchor = $6"
).bind(did).bind(odentity_composite_basis_points).bind(pace_status).bind(card_status)
.bind(created_at_ms).bind(exochain_anchor)
.execute(pool).await?;
Ok(())
}
pub async fn get_livesafe_identity(
pool: &PgPool,
did: &str,
) -> Result<Option<LiveSafeIdentityRow>, sqlx::Error> {
sqlx::query_as::<_, LiveSafeIdentityRow>(
"SELECT did, odentity_composite_basis_points, pace_status, card_status, created_at_ms, exochain_anchor FROM livesafe_identities WHERE did = $1"
).bind(did).fetch_optional(pool).await
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct LiveSafeIdentityRow {
pub did: String,
pub odentity_composite_basis_points: i32,
pub pace_status: String,
pub card_status: String,
pub created_at_ms: i64,
pub exochain_anchor: Option<String>,
}
#[allow(clippy::too_many_arguments)]
pub async fn insert_scan_receipt(
pool: &PgPool,
scan_id: &str,
subscriber_did: &str,
responder_did: &str,
location: Option<&str>,
scanned_at_ms: i64,
consent_expires_at_ms: i64,
audit_receipt_hash: &str,
anchor_receipt: Option<&str>,
) -> Result<(), ScanReceiptInsertError> {
if location.is_some()
&& !scan_receipt_location_consent_exists(pool, subscriber_did, responder_did, scanned_at_ms)
.await
.map_err(|source| ScanReceiptInsertError::Query { source })?
{
return Err(ScanReceiptInsertError::LocationConsentRequired);
}
sqlx::query(
"INSERT INTO scan_receipts (scan_id, subscriber_did, responder_did, location, scanned_at_ms, consent_expires_at_ms, audit_receipt_hash, anchor_receipt)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)"
).bind(scan_id).bind(subscriber_did).bind(responder_did).bind(location)
.bind(scanned_at_ms).bind(consent_expires_at_ms).bind(audit_receipt_hash).bind(anchor_receipt)
.execute(pool).await
.map_err(|source| ScanReceiptInsertError::Query { source })?;
Ok(())
}
async fn scan_receipt_location_consent_exists(
pool: &PgPool,
subscriber_did: &str,
responder_did: &str,
scanned_at_ms: i64,
) -> Result<bool, sqlx::Error> {
let location_scope = serde_json::json!([LOCATION_CONSENT_SCOPE]);
sqlx::query_scalar::<_, bool>(
"SELECT EXISTS (
SELECT 1
FROM consent_anchors
WHERE subscriber_did = $1
AND provider_did = $2
AND granted_at_ms <= $3
AND (expires_at_ms IS NULL OR expires_at_ms > $3)
AND revoked_at_ms IS NULL
AND scope @> $4::jsonb
)",
)
.bind(subscriber_did)
.bind(responder_did)
.bind(scanned_at_ms)
.bind(location_scope)
.fetch_one(pool)
.await
}
pub async fn list_scan_receipts(
pool: &PgPool,
subscriber_did: &str,
) -> Result<Vec<ScanReceiptRow>, sqlx::Error> {
sqlx::query_as::<_, ScanReceiptRow>(
"SELECT scan_id, subscriber_did, responder_did, location, scanned_at_ms, consent_expires_at_ms, audit_receipt_hash, anchor_receipt FROM scan_receipts WHERE subscriber_did = $1 ORDER BY scanned_at_ms DESC LIMIT $2"
).bind(subscriber_did).bind(MAX_DB_LIST_ROWS).fetch_all(pool).await
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct ScanReceiptRow {
pub scan_id: String,
pub subscriber_did: String,
pub responder_did: String,
pub location: Option<String>,
pub scanned_at_ms: i64,
pub consent_expires_at_ms: i64,
pub audit_receipt_hash: String,
pub anchor_receipt: Option<String>,
}
#[allow(clippy::too_many_arguments)]
pub async fn insert_consent_anchor(
pool: &PgPool,
consent_id: &str,
subscriber_did: &str,
provider_did: &str,
scope: &JsonValue,
granted_at_ms: i64,
expires_at_ms: Option<i64>,
audit_receipt_hash: &str,
) -> Result<(), sqlx::Error> {
sqlx::query(
"INSERT INTO consent_anchors (consent_id, subscriber_did, provider_did, scope, granted_at_ms, expires_at_ms, audit_receipt_hash)
VALUES ($1, $2, $3, $4, $5, $6, $7)"
).bind(consent_id).bind(subscriber_did).bind(provider_did).bind(scope)
.bind(granted_at_ms).bind(expires_at_ms).bind(audit_receipt_hash)
.execute(pool).await?;
Ok(())
}
pub async fn list_consent_anchors(
pool: &PgPool,
subscriber_did: &str,
) -> Result<Vec<ConsentAnchorRow>, sqlx::Error> {
sqlx::query_as::<_, ConsentAnchorRow>(
"SELECT consent_id, subscriber_did, provider_did, scope, granted_at_ms, expires_at_ms, revoked_at_ms, audit_receipt_hash FROM consent_anchors WHERE subscriber_did = $1 ORDER BY granted_at_ms DESC LIMIT $2"
).bind(subscriber_did).bind(MAX_DB_LIST_ROWS).fetch_all(pool).await
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct ConsentAnchorRow {
pub consent_id: String,
pub subscriber_did: String,
pub provider_did: String,
pub scope: JsonValue,
pub granted_at_ms: i64,
pub expires_at_ms: Option<i64>,
pub revoked_at_ms: Option<i64>,
pub audit_receipt_hash: String,
}
pub async fn insert_trustee_shard(
pool: &PgPool,
subscriber_did: &str,
trustee_did: &str,
role: &str,
shard_confirmed: bool,
accepted_at_ms: Option<i64>,
) -> Result<(), sqlx::Error> {
sqlx::query(
"INSERT INTO trustee_shard_status (subscriber_did, trustee_did, role, shard_confirmed, accepted_at_ms) VALUES ($1, $2, $3, $4, $5)"
).bind(subscriber_did).bind(trustee_did).bind(role).bind(shard_confirmed).bind(accepted_at_ms)
.execute(pool).await?;
Ok(())
}
pub async fn list_trustee_shards(
pool: &PgPool,
subscriber_did: &str,
) -> Result<Vec<TrusteeShardRow>, sqlx::Error> {
sqlx::query_as::<_, TrusteeShardRow>(
"SELECT subscriber_did, trustee_did, role, shard_confirmed, accepted_at_ms FROM trustee_shard_status WHERE subscriber_did = $1 LIMIT $2"
).bind(subscriber_did).bind(MAX_DB_LIST_ROWS).fetch_all(pool).await
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct TrusteeShardRow {
pub subscriber_did: String,
pub trustee_did: String,
pub role: String,
pub shard_confirmed: bool,
pub accepted_at_ms: Option<i64>,
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct AgentRoleRow {
pub agent_did: String,
pub role: String,
pub branch: String,
pub granted_by: String,
pub valid_from: i64,
pub expires_at: Option<i64>,
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct ConsentRecordRow {
pub subject_did: String,
pub actor_did: String,
pub scope: String,
pub bailment_type: String,
pub status: String,
pub created_at: i64,
pub expires_at: Option<i64>,
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct AuthorityChainRow {
pub actor_did: String,
pub chain_json: JsonValue,
pub valid_from: i64,
pub expires_at: Option<i64>,
}
#[derive(Debug, Error)]
pub enum AgentRoleLoadError {
#[error("agent roles query failed")]
Query {
#[from]
source: sqlx::Error,
},
#[error(
"agent role rows reached the bounded load cap; refusing truncated authorization evidence"
)]
TruncatedEvidence,
}
pub async fn load_agent_roles(
pool: &PgPool,
actor_did: &str,
now_ms: i64,
) -> Result<Vec<AgentRoleRow>, AgentRoleLoadError> {
let rows = sqlx::query_as::<_, AgentRoleRow>(
"SELECT agent_did, role, branch, granted_by, valid_from, expires_at \
FROM agent_roles \
WHERE agent_did = $1 \
AND valid_from <= $2 \
AND (expires_at IS NULL OR expires_at > $2) \
ORDER BY role ASC \
LIMIT $3",
)
.bind(actor_did)
.bind(now_ms)
.bind(MAX_DB_LIST_ROWS)
.fetch_all(pool)
.await?;
if i64::try_from(rows.len()).unwrap_or(i64::MAX) >= MAX_DB_LIST_ROWS {
return Err(AgentRoleLoadError::TruncatedEvidence);
}
Ok(rows)
}
pub async fn load_consent_records(
pool: &PgPool,
actor_did: &str,
now_ms: i64,
) -> Result<Vec<ConsentRecordRow>, sqlx::Error> {
sqlx::query_as::<_, ConsentRecordRow>(
"SELECT subject_did, actor_did, scope, bailment_type, status, created_at, expires_at \
FROM consent_records \
WHERE actor_did = $1 \
AND status = 'active' \
AND created_at <= $2 \
AND (expires_at IS NULL OR expires_at > $2) \
ORDER BY created_at DESC, subject_did ASC, scope ASC, bailment_type ASC, expires_at ASC NULLS LAST \
LIMIT $3",
)
.bind(actor_did)
.bind(now_ms)
.bind(MAX_DB_LIST_ROWS)
.fetch_all(pool)
.await
}
pub async fn load_authority_chain(
pool: &PgPool,
actor_did: &str,
now_ms: i64,
) -> Result<Option<AuthorityChainRow>, sqlx::Error> {
sqlx::query_as::<_, AuthorityChainRow>(
"SELECT actor_did, chain_json, valid_from, expires_at \
FROM authority_chains \
WHERE actor_did = $1 \
AND valid_from <= $2 \
AND (expires_at IS NULL OR expires_at > $2) \
ORDER BY valid_from DESC \
LIMIT 1",
)
.bind(actor_did)
.bind(now_ms)
.fetch_optional(pool)
.await
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct LayoutTemplateRow {
pub id: String,
pub user_did: Option<String>,
pub name: String,
pub layout_json: JsonValue,
pub hidden_panels: JsonValue,
pub is_built_in: bool,
pub created_at: i64,
pub updated_at: i64,
}
#[allow(clippy::too_many_arguments)]
pub async fn upsert_layout_template(
pool: &PgPool,
id: &str,
user_did: Option<&str>,
name: &str,
layout_json: &JsonValue,
hidden_panels: &JsonValue,
is_built_in: bool,
created_at: i64,
updated_at: i64,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
"INSERT INTO layout_templates (id, user_did, name, layout_json, hidden_panels, is_built_in, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (id) DO UPDATE SET name = $3, layout_json = $4, hidden_panels = $5, updated_at = $8
WHERE layout_templates.user_did = $2 AND layout_templates.is_built_in = false"
)
.bind(id).bind(user_did).bind(name).bind(layout_json).bind(hidden_panels)
.bind(is_built_in).bind(created_at).bind(updated_at)
.execute(pool).await?;
Ok(result.rows_affected() > 0)
}
pub async fn list_layout_templates(
pool: &PgPool,
user_did: Option<&str>,
) -> Result<Vec<LayoutTemplateRow>, sqlx::Error> {
if let Some(uid) = user_did {
sqlx::query_as::<_, LayoutTemplateRow>(
"SELECT id, user_did, name, layout_json, hidden_panels, is_built_in, created_at, updated_at \
FROM layout_templates WHERE user_did = $1 OR is_built_in = true ORDER BY created_at LIMIT $2"
).bind(uid).bind(MAX_DB_LIST_ROWS).fetch_all(pool).await
} else {
sqlx::query_as::<_, LayoutTemplateRow>(
"SELECT id, user_did, name, layout_json, hidden_panels, is_built_in, created_at, updated_at \
FROM layout_templates ORDER BY created_at LIMIT $1"
).bind(MAX_DB_LIST_ROWS).fetch_all(pool).await
}
}
pub async fn delete_layout_template(
pool: &PgPool,
id: &str,
user_did: &str,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
"DELETE FROM layout_templates \
WHERE id = $1 AND user_did = $2 AND is_built_in = false",
)
.bind(id)
.bind(user_did)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct FeedbackIssueRow {
pub id: String,
pub title: String,
pub description: String,
pub severity: String,
pub category: String,
pub status: String,
pub source_widget_id: String,
pub source_module_type: String,
pub reporter_did: Option<String>,
pub assigned_agent_team: Option<String>,
pub widget_state: Option<JsonValue>,
pub browser_info: Option<JsonValue>,
pub resolution_notes: Option<String>,
pub created_at: i64,
pub updated_at: i64,
}
#[allow(clippy::too_many_arguments)]
pub async fn insert_feedback_issue(
pool: &PgPool,
id: &str,
title: &str,
description: &str,
severity: &str,
category: &str,
source_widget_id: &str,
source_module_type: &str,
reporter_did: Option<&str>,
widget_state: Option<&JsonValue>,
browser_info: Option<&JsonValue>,
created_at: i64,
) -> Result<(), sqlx::Error> {
sqlx::query(
"INSERT INTO feedback_issues (id, title, description, severity, category, status, source_widget_id, source_module_type, reporter_did, widget_state, browser_info, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, 'open', $6, $7, $8, $9, $10, $11, $11)"
)
.bind(id).bind(title).bind(description).bind(severity).bind(category)
.bind(source_widget_id).bind(source_module_type).bind(reporter_did)
.bind(widget_state).bind(browser_info).bind(created_at)
.execute(pool).await?;
Ok(())
}
pub async fn list_feedback_issues(
pool: &PgPool,
reporter_did: Option<&str>,
status_filter: Option<&str>,
) -> Result<Vec<FeedbackIssueRow>, sqlx::Error> {
match (reporter_did, status_filter) {
(Some(reporter), Some(status)) => {
sqlx::query_as::<_, FeedbackIssueRow>(
"SELECT id, title, description, severity, category, status, source_widget_id, source_module_type, \
reporter_did, assigned_agent_team, widget_state, browser_info, resolution_notes, created_at, updated_at \
FROM feedback_issues WHERE reporter_did = $1 AND status = $2 ORDER BY created_at DESC LIMIT $3"
).bind(reporter).bind(status).bind(MAX_DB_LIST_ROWS).fetch_all(pool).await
}
(Some(reporter), None) => {
sqlx::query_as::<_, FeedbackIssueRow>(
"SELECT id, title, description, severity, category, status, source_widget_id, source_module_type, \
reporter_did, assigned_agent_team, widget_state, browser_info, resolution_notes, created_at, updated_at \
FROM feedback_issues WHERE reporter_did = $1 ORDER BY created_at DESC LIMIT $2"
).bind(reporter).bind(MAX_DB_LIST_ROWS).fetch_all(pool).await
}
(None, Some(status)) => {
sqlx::query_as::<_, FeedbackIssueRow>(
"SELECT id, title, description, severity, category, status, source_widget_id, source_module_type, \
reporter_did, assigned_agent_team, widget_state, browser_info, resolution_notes, created_at, updated_at \
FROM feedback_issues WHERE status = $1 ORDER BY created_at DESC LIMIT $2"
).bind(status).bind(MAX_DB_LIST_ROWS).fetch_all(pool).await
}
(None, None) => {
sqlx::query_as::<_, FeedbackIssueRow>(
"SELECT id, title, description, severity, category, status, source_widget_id, source_module_type, \
reporter_did, assigned_agent_team, widget_state, browser_info, resolution_notes, created_at, updated_at \
FROM feedback_issues ORDER BY created_at DESC LIMIT $1"
).bind(MAX_DB_LIST_ROWS).fetch_all(pool).await
}
}
}
pub async fn update_feedback_issue_status(
pool: &PgPool,
id: &str,
reporter_did: &str,
status: &str,
assigned_agent_team: Option<&str>,
resolution_notes: Option<&str>,
updated_at: i64,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
"UPDATE feedback_issues SET status = $1, assigned_agent_team = COALESCE($2, assigned_agent_team), \
resolution_notes = COALESCE($3, resolution_notes), updated_at = $4 WHERE id = $5 AND reporter_did = $6"
)
.bind(status).bind(assigned_agent_team).bind(resolution_notes)
.bind(updated_at).bind(id).bind(reporter_did)
.execute(pool).await?;
Ok(result.rows_affected() > 0)
}
#[cfg(test)]
pub(crate) mod tests {
use exo_core::{Did, Timestamp};
use exo_governance::conflict::{ActionRequest, check_and_block, check_conflicts};
use sha2::{Digest, Sha384};
use super::*;
pub(crate) fn is_test_scoped_database_url(url: &str) -> bool {
let Some((_, after_scheme)) = url.split_once("://") else {
return false;
};
let host_port_path = after_scheme
.rsplit_once('@')
.map_or(after_scheme, |(_, rest)| rest);
let (host_port, db_segment) = match host_port_path.split_once('/') {
Some((host_port, rest)) => (host_port, rest),
None => (host_port_path, ""),
};
let host = if let Some(bracketed) = host_port.strip_prefix('[') {
bracketed
.split_once(']')
.map_or(bracketed, |(host, _)| host)
} else {
host_port
.rsplit_once(':')
.map_or(host_port, |(host, _)| host)
};
if matches!(host, "localhost" | "127.0.0.1" | "::1") {
return true;
}
let db_name = db_segment.split('?').next().unwrap_or("");
db_name.ends_with("_test")
}
fn production_source() -> &'static str {
let source = include_str!("db.rs");
source.split("#[cfg(test)]").next().unwrap_or(source)
}
fn migration_sources() -> String {
[
include_str!("../migrations/20260316000001_initial_schema.sql"),
include_str!("../migrations/20260330000001_create_sessions.sql"),
include_str!("../migrations/20260330000002_create_adjudication_tables.sql"),
include_str!("../migrations/20260407000001_create_dashboard_tables.sql"),
include_str!("../migrations/20260425000001_add_decision_id_to_audit_entries.sql"),
include_str!("../migrations/20260426000001_livesafe_composite_basis_points.sql"),
include_str!("../migrations/20260427000001_create_conflict_declarations.sql"),
include_str!("../migrations/20260504000001_add_gateway_runtime_query_indexes.sql"),
include_str!("../migrations/20260504000002_add_gateway_tenant_scope_indexes.sql"),
include_str!("../migrations/20260504000003_create_did_documents.sql"),
include_str!("../migrations/20260504000004_add_gateway_identity_erasure.sql"),
include_str!(
"../migrations/20260505000001_add_audit_decision_tenant_sequence_index.sql"
),
include_str!("../migrations/20260510000001_scope_decision_ids_by_tenant.sql"),
include_str!("../migrations/20260602000001_create_avc_registry_state.sql"),
]
.join("\n")
}
fn migration_sources_from_disk() -> String {
let migration_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("migrations");
let mut entries = std::fs::read_dir(&migration_dir)
.expect("read migrations directory")
.map(|entry| entry.expect("migration dir entry").path())
.collect::<Vec<_>>();
entries.sort();
entries
.into_iter()
.map(|path| std::fs::read_to_string(path).expect("read migration"))
.collect::<Vec<_>>()
.join("\n")
}
fn sha384_hex(bytes: &[u8]) -> String {
hex::encode(Sha384::digest(bytes))
}
fn applied_gateway_migrations() -> Vec<(&'static str, &'static [u8], &'static str)> {
vec![
(
"20260316000001_initial_schema.sql",
include_bytes!("../migrations/20260316000001_initial_schema.sql").as_slice(),
"aa89c91d97af590b343f6dba4c411977787cf604ed4df6b55433544b49ed539fa76d863855bacc1b558be6b9d158735c",
),
(
"20260330000001_create_sessions.sql",
include_bytes!("../migrations/20260330000001_create_sessions.sql").as_slice(),
"8bcdb4bc2b7bfa8e66f11376a43fdd58e98902704b15556f95c016ef470192883899c2b021d472a93b8c876ed7931e42",
),
(
"20260330000002_create_adjudication_tables.sql",
include_bytes!("../migrations/20260330000002_create_adjudication_tables.sql")
.as_slice(),
concat!(
"c6c6e47",
"f",
"645dff8385eb2edd2a0e46971f6e1dc43bdd794286aca14c84204f7e81beeba6d5fbdf9877f28302e8b8d204"
),
),
(
"20260407000001_create_dashboard_tables.sql",
include_bytes!("../migrations/20260407000001_create_dashboard_tables.sql")
.as_slice(),
"536da0680b74c939f723349dc1b416ac9971686bf077aa5d5904e8924aa1541417022d28f880c4b6bfe97d3685089982",
),
(
"20260425000001_add_decision_id_to_audit_entries.sql",
include_bytes!("../migrations/20260425000001_add_decision_id_to_audit_entries.sql")
.as_slice(),
"0e8cb231c2b2405e69e65846ea87d256b1c80d82c1321647b513c10120d4cb4fc669a904cdc6df4ce1e14ba70de1bff5",
),
(
"20260426000001_livesafe_composite_basis_points.sql",
include_bytes!("../migrations/20260426000001_livesafe_composite_basis_points.sql")
.as_slice(),
concat!(
"5eecfe53e39663cca94c878325dbaf882e3a0a83ba0e38c7bb17fac20607f504ef6a729d797a48dcb29a93",
"f",
"329dc6e88"
),
),
(
"20260427000001_create_conflict_declarations.sql",
include_bytes!("../migrations/20260427000001_create_conflict_declarations.sql")
.as_slice(),
"153baeb665786a7c9d90f44d6abc4e57853c0d0d3ab334e32f6d53b9c4aa434b365264ca55e32bff7bdc3b6ec269accc",
),
(
"20260504000001_add_gateway_runtime_query_indexes.sql",
include_bytes!(
"../migrations/20260504000001_add_gateway_runtime_query_indexes.sql"
)
.as_slice(),
"9532a395fdf690a03e3a3f7688e31ce5848f473eec9713c7c7af4d5fe2b2561212036e4b3da2cb36fa8787764a299e41",
),
(
"20260504000002_add_gateway_tenant_scope_indexes.sql",
include_bytes!("../migrations/20260504000002_add_gateway_tenant_scope_indexes.sql")
.as_slice(),
"265edf4a9eebd0eba8870aeeeebe4ca4906d8f97a12164254080ada6c40c01fe507f42946c398bf17d51ceeefc26adce",
),
(
"20260504000003_create_did_documents.sql",
include_bytes!("../migrations/20260504000003_create_did_documents.sql").as_slice(),
concat!(
"0c08b06a5a23d474b9b3f18dc7dda2357350a2553e1ec4883dcdbbe17cf89da57a28a9d5831ba71d04de576f0",
"f",
"64fbf1"
),
),
(
"20260504000004_add_gateway_identity_erasure.sql",
include_bytes!("../migrations/20260504000004_add_gateway_identity_erasure.sql")
.as_slice(),
"5aaa9c662a7919a66ea93200cb6ac85e36f4486153336e63d4bab5d14e91b2a8b66f352b317815ec7658e0ecf2f28385",
),
(
"20260505000001_add_audit_decision_tenant_sequence_index.sql",
include_bytes!(
"../migrations/20260505000001_add_audit_decision_tenant_sequence_index.sql"
)
.as_slice(),
"584d148c3df76bad4433760f433cf4de6e5414e5825979a1aff21cd15329389343711ea43833cc1a8ab73694dc96e9aa",
),
(
"20260510000001_scope_decision_ids_by_tenant.sql",
include_bytes!("../migrations/20260510000001_scope_decision_ids_by_tenant.sql")
.as_slice(),
"8de5b45554e6c821e34b575aac7479ff5b147b86254e8bee58596118b71c679f2e65db909193ce033613dc74e5efd024",
),
(
"20260602000001_create_avc_registry_state.sql",
include_bytes!("../migrations/20260602000001_create_avc_registry_state.sql")
.as_slice(),
"ca6b6fec1b15574ccbc6498836cd2ca4784b1f8c2376e873140a12bea56c49588e05769d80fe88c8374097025e8efa1f",
),
]
}
#[test]
fn applied_gateway_migration_checksums_are_immutable() {
for (name, bytes, expected_checksum) in applied_gateway_migrations() {
assert_eq!(
sha384_hex(bytes),
*expected_checksum,
"applied migration {name} must remain byte-for-byte stable; add a new migration instead of editing history"
);
}
}
#[test]
fn applied_gateway_migrations_match_disk_set() {
let mut source_names = applied_gateway_migrations()
.iter()
.map(|(name, ..)| (*name).to_string())
.collect::<Vec<_>>();
source_names.sort();
let migration_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("migrations");
let mut disk_names = std::fs::read_dir(&migration_dir)
.expect("read migrations directory")
.map(|entry| {
entry
.expect("migration dir entry")
.file_name()
.into_string()
.expect("migration name should be valid UTF-8")
})
.filter(|name| name.ends_with(".sql"))
.collect::<Vec<_>>();
disk_names.sort();
assert_eq!(
source_names, disk_names,
"migration checksum list must mirror disk migration set exactly"
);
}
fn compact_sql(sql: &str) -> String {
sql.split_whitespace().collect::<Vec<_>>().join(" ")
}
fn function_source<'a>(source: &'a str, name: &str) -> &'a str {
let public_signature = format!("pub async fn {name}");
let private_signature = format!("async fn {name}");
let start = source
.find(&public_signature)
.or_else(|| source.find(&private_signature))
.unwrap_or_else(|| panic!("{name} source must be present"));
let after_start = &source[start..];
let end = after_start.find("\n/// ").unwrap_or(after_start.len());
&after_start[..end]
}
fn contains_in_order(source: &str, first: &str, second: &str) -> bool {
let Some(first_index) = source.find(first) else {
return false;
};
let Some(second_index) = source.find(second) else {
return false;
};
first_index < second_index
}
async fn gateway_test_pool() -> Option<PgPool> {
let url = std::env::var("DATABASE_URL").ok()?;
assert!(
is_test_scoped_database_url(&url),
"refusing to seed gateway test fixtures: DATABASE_URL must point at localhost or a *_test database"
);
let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(1)
.connect(&url)
.await
.ok()?;
sqlx::migrate!("./migrations").run(&pool).await.ok()?;
Some(pool)
}
#[test]
fn test_scoped_database_url_guard_accepts_local_and_test_databases() {
assert!(is_test_scoped_database_url(
"postgres://user:secret@localhost:5433/exochain"
));
assert!(is_test_scoped_database_url(
"postgres://user:secret@127.0.0.1/exochain"
));
assert!(is_test_scoped_database_url(
"postgres://user:secret@[::1]:5433/exochain?sslmode=disable"
));
assert!(is_test_scoped_database_url(
"postgres://user:secret@db.internal:5432/exochain_test"
));
assert!(is_test_scoped_database_url(
"postgres://user:secret@db.internal:5432/exochain_test?sslmode=require"
));
}
#[test]
fn test_scoped_database_url_guard_rejects_shared_databases() {
assert!(!is_test_scoped_database_url(
"postgres://user:secret@db.prod.internal:5432/exochain"
));
assert!(!is_test_scoped_database_url(
"postgres://db.prod.internal/exochain"
));
assert!(!is_test_scoped_database_url(
"postgres://user:secret@staging.example.com/exochain_testing" ));
assert!(!is_test_scoped_database_url("not-a-url"));
}
async fn cleanup_identity_erasure_fixture(pool: &PgPool, did: &str) -> Result<(), sqlx::Error> {
for statement in [
"DELETE FROM did_documents WHERE did = $1",
"DELETE FROM sessions WHERE actor_did = $1",
"DELETE FROM users WHERE did = $1",
"DELETE FROM agents WHERE did = $1 OR owner_did = $1",
"DELETE FROM identity_scores WHERE did = $1",
"DELETE FROM enrollment_log WHERE did = $1",
"DELETE FROM livesafe_identities WHERE did = $1",
"DELETE FROM scan_receipts WHERE subscriber_did = $1 OR responder_did = $1",
"DELETE FROM consent_anchors WHERE subscriber_did = $1 OR provider_did = $1",
"DELETE FROM trustee_shard_status WHERE subscriber_did = $1 OR trustee_did = $1",
"DELETE FROM agent_roles WHERE agent_did = $1 OR granted_by = $1",
"DELETE FROM consent_records WHERE subject_did = $1 OR actor_did = $1",
"DELETE FROM authority_chains WHERE actor_did = $1",
"DELETE FROM delegations WHERE delegator = $1 OR delegatee = $1",
"DELETE FROM layout_templates WHERE user_did = $1",
"DELETE FROM feedback_issues WHERE reporter_did = $1",
"DELETE FROM conflict_declarations WHERE declarant_did = $1 OR related_dids @> jsonb_build_array($1::text)",
] {
sqlx::query(statement).bind(did).execute(pool).await?;
}
Ok(())
}
async fn count_rows_by_did(
pool: &PgPool,
statement: &str,
did: &str,
) -> Result<i64, sqlx::Error> {
sqlx::query_scalar(statement)
.bind(did)
.fetch_one(pool)
.await
}
fn minimal_doc(did_str: &str) -> DidDocument {
DidDocument {
id: Did::new(did_str).expect("valid DID"),
public_keys: vec![],
authentication: vec![],
verification_methods: vec![],
hybrid_verification_methods: vec![],
service_endpoints: vec![],
created: Timestamp::ZERO,
updated: Timestamp::ZERO,
revoked: false,
}
}
#[test]
fn init_pool_returns_result_without_panic_paths() {
let source = production_source();
let init_pool_source = function_source(source, "init_pool");
assert!(
source.contains("pub enum DbInitError"),
"database initialization failures must use a typed error"
);
assert!(
init_pool_source.contains("-> Result<PgPool, DbInitError>"),
"init_pool must return a typed Result instead of panicking"
);
assert!(
!init_pool_source.contains(".expect("),
"init_pool must not panic on connection or migration failure"
);
assert!(
!init_pool_source.contains("#[allow(clippy::expect_used)]"),
"init_pool must not suppress panic linting"
);
}
#[test]
fn init_pool_uses_structured_tracing_not_stdout() {
let source = production_source();
let init_pool_source = function_source(source, "init_pool");
assert!(
init_pool_source.contains("tracing::info!"),
"database initialization events must flow through structured tracing"
);
assert!(
!init_pool_source.contains("println!("),
"database initialization must not bypass structured tracing with stdout logging"
);
assert!(
!init_pool_source.contains("eprintln!("),
"database initialization must not bypass structured tracing with stderr logging"
);
}
#[test]
fn db_init_error_display_redacts_driver_sources() {
let source = production_source();
assert!(
!source.contains("failed to connect to PostgreSQL: {source}"),
"DbInitError Display must not include driver connection details"
);
assert!(
!source.contains("failed to run database migrations: {source}"),
"DbInitError Display must not include migration driver details"
);
assert!(
source.contains("#[source]"),
"DbInitError must retain underlying sources for internal diagnostics"
);
}
#[test]
fn fetch_all_database_helpers_have_explicit_row_limits() {
let source = production_source();
assert!(
source.contains("pub const MAX_DB_LIST_ROWS: i64"),
"database list limits must be centralized"
);
for (name, expected_limit_clauses) in [
("list_users_db", 1),
("list_agents_db", 1),
("list_decisions_db", 1),
("list_conflict_declaration_payloads_db", 1),
("list_delegations_db", 1),
("list_audit_entries", 1),
("list_audit_entries_for_decision", 1),
("list_scan_receipts", 1),
("list_consent_anchors", 1),
("list_trustee_shards", 1),
("load_agent_roles", 1),
("load_consent_records", 1),
("list_layout_templates", 2),
("list_feedback_issues", 4),
] {
let body = function_source(source, name);
assert!(
body.matches(".fetch_all(pool)").count() >= expected_limit_clauses,
"{name} must keep using reviewed pool fetch paths"
);
assert_eq!(
body.matches("LIMIT $").count(),
expected_limit_clauses,
"{name} must apply an explicit SQL LIMIT to every fetch_all query"
);
assert_eq!(
body.matches(".bind(MAX_DB_LIST_ROWS)").count(),
expected_limit_clauses,
"{name} must bind the centralized row limit for every fetch_all query"
);
}
}
#[test]
fn conflict_recusal_enforcement_uses_scoped_blocking_lookup_not_ui_list_cap() {
let source = production_source();
let recusal_lookup = function_source(
source,
"list_blocking_conflict_declaration_recusal_candidates_db",
);
assert!(
recusal_lookup.contains("SELECT declarant_did, nature, related_dids, payload"),
"recusal enforcement must return scalar fields with the payload so the server can verify canonical consistency"
);
assert!(
recusal_lookup.contains("related_dids ?|"),
"recusal enforcement must scope the DB lookup to affected DIDs"
);
assert!(
recusal_lookup.contains("nature LIKE ANY"),
"recusal enforcement must query only blocking conflict natures"
);
assert!(
recusal_lookup.contains("LIMIT 1"),
"recusal enforcement only needs one matching blocking declaration to fail closed"
);
assert!(
!recusal_lookup.contains("MAX_DB_LIST_ROWS"),
"vote recusal enforcement must not reuse the UI/list cap"
);
}
#[tokio::test]
async fn conflict_recusal_lookup_finds_blocking_declaration_beyond_ui_list_cap() {
let Some(pool) = gateway_test_pool().await else {
return;
};
let actor = Did::new("did:exo:recusal-cap-voter").expect("valid DID");
let unrelated = Did::new("did:exo:recusal-cap-unrelated").expect("valid DID");
let affected = Did::new("did:exo:recusal-cap-affected").expect("valid DID");
sqlx::query("DELETE FROM conflict_declarations WHERE declarant_did = $1")
.bind(actor.as_str())
.execute(&pool)
.await
.expect("clean recusal cap fixture before test");
for idx in 0..MAX_DB_LIST_ROWS {
let timestamp = 10_000_i64 + idx;
sqlx::query(
"INSERT INTO conflict_declarations (id_hash, declarant_did, nature, related_dids, timestamp_physical_ms, timestamp_logical, payload) \
VALUES ($1, $2, $3, $4, $5, $6, $7)",
)
.bind(format!("recusal-cap-unrelated-{idx}"))
.bind(actor.as_str())
.bind("advisory")
.bind(serde_json::json!([unrelated.as_str()]))
.bind(timestamp)
.bind(0_i32)
.bind(serde_json::json!({
"declarant_did": actor.as_str(),
"nature": "advisory",
"related_dids": [unrelated.as_str()],
"timestamp": {
"physical_ms": timestamp,
"logical": 0
}
}))
.execute(&pool)
.await
.expect("insert unrelated advisory conflict declaration");
}
let blocking_timestamp = 20_000_i64 + MAX_DB_LIST_ROWS;
sqlx::query(
"INSERT INTO conflict_declarations (id_hash, declarant_did, nature, related_dids, timestamp_physical_ms, timestamp_logical, payload) \
VALUES ($1, $2, $3, $4, $5, $6, $7)",
)
.bind("recusal-cap-blocking")
.bind(actor.as_str())
.bind("financial ownership")
.bind(serde_json::json!([affected.as_str()]))
.bind(blocking_timestamp)
.bind(0_i32)
.bind(serde_json::json!({
"declarant_did": actor.as_str(),
"nature": "financial ownership",
"related_dids": [affected.as_str()],
"timestamp": {
"physical_ms": blocking_timestamp,
"logical": 0
}
}))
.execute(&pool)
.await
.expect("insert blocking conflict declaration after capped rows");
let affected_did_strings = vec![affected.as_str().to_owned()];
let candidates = list_blocking_conflict_declaration_recusal_candidates_db(
&pool,
actor.as_str(),
&affected_did_strings,
)
.await
.expect("load conflict declarations");
let declarations = candidates
.into_iter()
.map(|candidate| candidate.payload)
.map(serde_json::from_value)
.collect::<Result<Vec<_>, _>>()
.expect("decode conflict declarations");
let action = ActionRequest {
action_id: "recusal-cap-decision".to_owned(),
actor_did: actor.clone(),
affected_dids: vec![affected],
description: "vote on affected decision".to_owned(),
};
let conflicts = check_conflicts(&actor, &action, &declarations);
assert!(
check_and_block(&actor, &conflicts).is_err(),
"recusal enforcement must see blocking conflicts newer than the generic UI list cap"
);
sqlx::query("DELETE FROM conflict_declarations WHERE declarant_did = $1")
.bind(actor.as_str())
.execute(&pool)
.await
.expect("clean recusal cap fixture after test");
}
#[tokio::test]
async fn load_agent_roles_fails_closed_when_row_cap_is_reached() {
let Some(pool) = gateway_test_pool().await else {
return;
};
let actor = "did:exo:role-cap-actor";
sqlx::query("DELETE FROM agent_roles WHERE agent_did = $1")
.bind(actor)
.execute(&pool)
.await
.expect("clean role cap fixture before test");
sqlx::query(
"INSERT INTO agent_roles (agent_did, role, branch, granted_by, valid_from, expires_at) \
SELECT $1, 'role-cap-' || g, 'executive', 'did:exo:role-cap-granter', 0, NULL \
FROM generate_series(1, $2) AS g",
)
.bind(actor)
.bind(MAX_DB_LIST_ROWS)
.execute(&pool)
.await
.expect("seed capped active roles");
let capped = load_agent_roles(&pool, actor, 1_000).await;
assert!(
matches!(capped, Err(AgentRoleLoadError::TruncatedEvidence)),
"reaching the row cap must fail closed instead of returning a truncated role subset"
);
sqlx::query("DELETE FROM agent_roles WHERE agent_did = $1 AND role <> 'role-cap-1'")
.bind(actor)
.execute(&pool)
.await
.expect("trim role cap fixture below the cap");
let below_cap = load_agent_roles(&pool, actor, 1_000)
.await
.expect("below-cap role load must succeed");
assert_eq!(below_cap.len(), 1);
sqlx::query("DELETE FROM agent_roles WHERE agent_did = $1")
.bind(actor)
.execute(&pool)
.await
.expect("clean role cap fixture after test");
}
#[test]
fn load_consent_records_orders_active_rows_deterministically() {
let body = function_source(production_source(), "load_consent_records");
assert!(
body.contains(
"ORDER BY created_at DESC, subject_did ASC, scope ASC, bailment_type ASC, expires_at ASC NULLS LAST"
),
"active consent rows must have a deterministic order before adapter selection"
);
}
#[test]
fn gateway_runtime_query_filters_have_migration_indexes() {
let migrations = compact_sql(&migration_sources());
for index_sql in [
"CREATE INDEX IF NOT EXISTS idx_users_created_at ON users(created_at);",
"CREATE INDEX IF NOT EXISTS idx_users_tenant_created_at ON users(tenant_id, created_at);",
"CREATE INDEX IF NOT EXISTS idx_agents_tenant_created_at ON agents(tenant_id, created_at);",
"CREATE INDEX IF NOT EXISTS idx_agents_created_at ON agents(created_at);",
"CREATE INDEX IF NOT EXISTS idx_decisions_tenant_created_at_ms ON decisions(tenant_id, created_at_ms);",
"CREATE INDEX IF NOT EXISTS idx_decisions_created_at_ms ON decisions(created_at_ms);",
"CREATE INDEX IF NOT EXISTS idx_delegations_created_at_ms ON delegations(created_at_ms);",
"CREATE INDEX IF NOT EXISTS idx_delegations_active_delegatee ON delegations(delegatee) WHERE revoked_at IS NULL;",
"CREATE INDEX IF NOT EXISTS idx_delegations_active_delegator ON delegations(delegator) WHERE revoked_at IS NULL;",
"CREATE INDEX IF NOT EXISTS idx_audit_entries_actor_event_type ON audit_entries(actor, event_type);",
"CREATE INDEX IF NOT EXISTS idx_audit_entries_decision_tenant_sequence ON audit_entries(decision_id, tenant_id, sequence);",
] {
assert!(
migrations.contains(index_sql),
"gateway migration set must include runtime query index: {index_sql}"
);
}
}
#[test]
fn production_gateway_state_has_no_explicit_public_schema_writes() {
let production = production_source().to_ascii_lowercase();
for table in [
"users",
"agents",
"decisions",
"delegations",
"audit_entries",
"constitutions",
"identity_scores",
"enrollment_log",
"hlc_state",
"livesafe_identities",
"scan_receipts",
"consent_anchors",
"trustee_shard_status",
"sessions",
"agent_roles",
"consent_records",
"authority_chains",
"layout_templates",
"feedback_issues",
"conflict_declarations",
"did_documents",
"avc_registry_state",
] {
for verb in ["insert into", "update", "delete from"] {
let forbidden = format!("{verb} public.{table}");
assert!(
!production.contains(&forbidden),
"production gateway state must not schema-qualify public legacy table writes: {forbidden}"
);
}
}
}
#[test]
fn production_gateway_state_resolves_legacy_tables_in_dagdb_schema() {
let production = production_source();
assert!(
production.contains("DAGDB_RUNTIME_SEARCH_PATH"),
"production gateway pool must name a DAG DB-first runtime search path"
);
assert!(
production.contains("\"dagdb,public\""),
"production gateway pool must prefer DAG DB table contracts over public rollback tables"
);
assert!(
production.contains(
"connect_options.options([(\"search_path\", DAGDB_RUNTIME_SEARCH_PATH.to_owned())])"
),
"returned production runtime pool must use the DAG DB-first search path"
);
let dagdb_gateway_contracts = include_str!(
"../../exo-dag-db-postgres/migrations/20260623000005_create_gateway_legacy_table_contracts.sql"
)
.to_ascii_lowercase();
for table in [
"users",
"agents",
"decisions",
"delegations",
"audit_entries",
"constitutions",
"identity_scores",
"enrollment_log",
"hlc_state",
"livesafe_identities",
"scan_receipts",
"consent_anchors",
"trustee_shard_status",
"sessions",
"agent_roles",
"consent_records",
"authority_chains",
"layout_templates",
"feedback_issues",
"conflict_declarations",
"did_documents",
"avc_registry_state",
] {
assert!(
dagdb_gateway_contracts.contains(&format!("create table if not exists {table}")),
"DAG DB schema migration must own gateway legacy table contract {table}"
);
}
}
#[test]
fn did_documents_have_durable_schema_and_persistence_helpers() {
let migrations = compact_sql(&migration_sources_from_disk());
assert!(
migrations.contains("CREATE TABLE IF NOT EXISTS did_documents ("),
"DB-backed gateway identity must persist DID documents instead of relying on LocalDidRegistry memory"
);
assert!(
migrations.contains("did TEXT PRIMARY KEY"),
"persisted DID documents must be keyed by DID"
);
assert!(
migrations.contains("document JSONB NOT NULL"),
"persisted DID documents must retain the canonical serialized document payload"
);
let source = production_source();
let insert_source = function_source(source, "insert_did_document");
assert!(
source.contains("MAX_DB_DID_DOCUMENTS"),
"DB-backed DID registration must define a durable registry capacity limit"
);
assert!(
insert_source.contains("insert_did_document_with_capacity"),
"public DID persistence must route through the capacity-enforcing insert helper"
);
assert!(
source.contains("pg_advisory_xact_lock"),
"DID capacity checks must be serialized to avoid concurrent over-capacity inserts"
);
assert!(
source.contains("SELECT COUNT(*) AS document_count FROM did_documents"),
"DB-backed DID registration must check the durable did_documents row count before inserting"
);
assert!(insert_source.contains("INSERT INTO did_documents"));
assert!(
source.contains("DidDocumentPersistenceError::RegistryCapacityExceeded"),
"durable DID capacity exhaustion must be a typed error, not an unbounded insert"
);
let lookup_source = function_source(source, "find_did_document");
assert!(lookup_source.contains("FROM did_documents"));
assert!(lookup_source.contains("serde_json::from_value"));
let list_source = function_source(source, "list_did_document_ids");
assert!(list_source.contains("FROM did_documents"));
assert!(list_source.contains("LIMIT $"));
assert!(list_source.contains(".bind(MAX_DB_LIST_ROWS)"));
}
#[test]
fn did_document_persistence_errors_do_not_display_raw_dids() {
let sensitive_did = "did:exo:persistence-sensitive-subject";
let payload_did = "did:exo:persistence-payload-subject";
let serde_source =
serde_json::from_str::<serde_json::Value>("{").expect_err("invalid JSON source");
let errors = [
DidDocumentPersistenceError::TimestampOutOfRange {
did: sensitive_did.to_owned(),
field: "created",
value: 42,
}
.to_string(),
DidDocumentPersistenceError::Serialize {
did: sensitive_did.to_owned(),
source: serde_source,
}
.to_string(),
DidDocumentPersistenceError::Deserialize {
did: sensitive_did.to_owned(),
source: serde_json::from_str::<serde_json::Value>("{")
.expect_err("invalid JSON source"),
}
.to_string(),
DidDocumentPersistenceError::DocumentDidMismatch {
row_did: sensitive_did.to_owned(),
document_did: payload_did.to_owned(),
}
.to_string(),
];
for error in errors {
assert!(
!error.contains(sensitive_did) && !error.contains(payload_did),
"persistence error display must not expose raw DID identifiers: {error}"
);
}
}
#[test]
fn gateway_identity_erasure_has_durable_tombstone_schema_and_helper() {
let migrations = compact_sql(&migration_sources_from_disk());
assert!(
migrations
.contains("ALTER TABLE did_documents ADD COLUMN IF NOT EXISTS erased_at_ms BIGINT"),
"gateway DID erasure must persist a tombstone timestamp so erased DIDs cannot be re-registered after process restart"
);
let source = production_source();
let helper = function_source(source, "erase_gateway_identity_records");
for table in [
"did_documents",
"users",
"agents",
"sessions",
"identity_scores",
"enrollment_log",
"livesafe_identities",
"scan_receipts",
"consent_anchors",
"trustee_shard_status",
"agent_roles",
"consent_records",
"authority_chains",
"delegations",
"layout_templates",
"feedback_issues",
"conflict_declarations",
] {
assert!(
helper.contains(table),
"gateway identity erasure helper must cover durable DID-linked table {table}"
);
}
assert!(
helper.contains("revoked = true") && helper.contains("erased_at_ms"),
"DID document erasure must tombstone the DID instead of deleting the reuse guard"
);
assert!(
helper.contains("DELETE FROM conflict_declarations")
&& helper.contains("WHERE declarant_did = $1"),
"identity erasure may remove only conflict declarations authored by the erased DID"
);
assert!(
!helper.contains("related_dids @> jsonb_build_array($1::text)"),
"identity erasure must not delete third-party conflict declarations that merely reference the erased DID"
);
}
#[test]
fn scan_receipt_location_writes_require_active_location_consent() {
let source = production_source();
let insert = function_source(source, "insert_scan_receipt");
assert!(
source.contains("pub enum ScanReceiptInsertError"),
"scan receipt writes must distinguish consent denial from SQL failure"
);
assert!(
source.contains("const LOCATION_CONSENT_SCOPE: &str = \"location\";"),
"location consent scope must be explicit and centrally named"
);
assert!(
insert.contains("location.is_some()"),
"scan receipts without location may be stored, but location-bearing receipts need consent"
);
assert!(
insert.contains("scan_receipt_location_consent_exists"),
"location-bearing scan receipts must check active consent before insert"
);
assert!(
contains_in_order(
insert,
"scan_receipt_location_consent_exists",
"INSERT INTO scan_receipts"
),
"location consent must be checked before writing the scan_receipts row"
);
assert!(
insert.contains("ScanReceiptInsertError::LocationConsentRequired"),
"missing active location consent must fail closed with a typed error"
);
}
#[tokio::test]
async fn insert_scan_receipt_rejects_location_without_active_location_consent()
-> std::result::Result<(), Box<dyn std::error::Error>> {
let Some(pool) = gateway_test_pool().await else {
return Ok(());
};
let subscriber = "did:exo:scan-location-subscriber-denied";
let responder = "did:exo:scan-location-responder-denied";
cleanup_identity_erasure_fixture(&pool, subscriber).await?;
cleanup_identity_erasure_fixture(&pool, responder).await?;
let err = insert_scan_receipt(
&pool,
"scan-location-denied",
subscriber,
responder,
Some("40.7128,-74.0060"),
1_000,
2_000,
"audit-location-denied",
None,
)
.await
.expect_err("location-bearing scan receipt must require active location consent");
assert!(
err.to_string().contains("active location consent"),
"missing location consent should produce a typed consent error: {err}"
);
let count: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM scan_receipts WHERE scan_id = $1")
.bind("scan-location-denied")
.fetch_one(&pool)
.await?;
assert_eq!(count, 0);
Ok(())
}
#[tokio::test]
async fn insert_did_document_enforces_durable_capacity_limit()
-> std::result::Result<(), Box<dyn std::error::Error>> {
let Some(pool) = gateway_test_pool().await else {
return Ok(());
};
let first_did = "did:exo:durable-capacity-first";
let second_did = "did:exo:durable-capacity-second";
cleanup_identity_erasure_fixture(&pool, first_did).await?;
cleanup_identity_erasure_fixture(&pool, second_did).await?;
assert!(
insert_did_document_with_capacity(&pool, &minimal_doc(first_did), 1).await?,
"first DID document should fit inside the durable capacity budget"
);
let err = insert_did_document_with_capacity(&pool, &minimal_doc(second_did), 1)
.await
.expect_err("second distinct DID document must be rejected at the durable cap");
assert!(
matches!(
err,
DidDocumentPersistenceError::RegistryCapacityExceeded {
max_documents: 1,
attempted_documents: 2
}
),
"expected typed durable capacity error, got {err}"
);
let stored_second: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM did_documents WHERE did = $1")
.bind(second_did)
.fetch_one(&pool)
.await?;
assert_eq!(
stored_second, 0,
"over-capacity DID document must not be persisted"
);
cleanup_identity_erasure_fixture(&pool, first_did).await?;
cleanup_identity_erasure_fixture(&pool, second_did).await?;
Ok(())
}
#[tokio::test]
async fn insert_scan_receipt_accepts_location_with_active_location_consent()
-> std::result::Result<(), Box<dyn std::error::Error>> {
let Some(pool) = gateway_test_pool().await else {
return Ok(());
};
let subscriber = "did:exo:scan-location-subscriber-allowed";
let responder = "did:exo:scan-location-responder-allowed";
cleanup_identity_erasure_fixture(&pool, subscriber).await?;
cleanup_identity_erasure_fixture(&pool, responder).await?;
insert_consent_anchor(
&pool,
"consent-location-allowed",
subscriber,
responder,
&serde_json::json!(["location"]),
900,
Some(2_000),
"audit-location-consent",
)
.await?;
insert_scan_receipt(
&pool,
"scan-location-allowed",
subscriber,
responder,
Some("40.7128,-74.0060"),
1_000,
2_000,
"audit-location-allowed",
None,
)
.await?;
let rows = list_scan_receipts(&pool, subscriber).await?;
assert!(
rows.iter().any(|row| row.scan_id == "scan-location-allowed"
&& row.location.as_deref() == Some("40.7128,-74.0060")),
"location should persist only when active location consent exists"
);
Ok(())
}
#[tokio::test]
async fn erase_gateway_identity_records_tombstones_did_and_removes_durable_identity_rows()
-> std::result::Result<(), Box<dyn std::error::Error>> {
let Some(pool) = gateway_test_pool().await else {
return Ok(());
};
let did = "did:exo:erasure-db-subject";
let third_party_declarant = "did:exo:erasure-db-third-party-declarant";
cleanup_identity_erasure_fixture(&pool, did).await?;
cleanup_identity_erasure_fixture(&pool, third_party_declarant).await?;
let doc = minimal_doc(did);
insert_did_document(&pool, &doc).await?;
sqlx::query(
"INSERT INTO sessions (token, actor_did, created_at, expires_at, revoked) \
VALUES ($1, $2, $3, $4, false)",
)
.bind("erasure-db-session")
.bind(did)
.bind(1_000_i64)
.bind(2_000_i64)
.execute(&pool)
.await?;
insert_user(
&pool,
did,
"Erasure Subject",
"erasure-db-subject@example.invalid",
&serde_json::json!(["subject"]),
"tenant-erasure",
1_000,
"Active",
"Complete",
"hash-to-delete",
"salt-to-delete",
false,
)
.await?;
insert_agent(
&pool,
did,
"Erasure Agent",
"agent",
did,
"tenant-erasure",
&serde_json::json!(["read"]),
"Trusted",
7_500,
None,
"Complete",
1_000,
"Active",
"Routine",
)
.await?;
upsert_identity_score(
&pool,
did,
7_500,
"Trusted",
&serde_json::json!({"registered": true}),
1_000,
)
.await?;
insert_enrollment(&pool, did, "user", "pace", 1_000, did, "audit").await?;
insert_livesafe_identity(
&pool,
did,
7_500,
"Complete",
"Issued",
1_000,
Some("anchor"),
)
.await?;
insert_consent_anchor(
&pool,
"erasure-db-location-consent",
did,
"did:exo:responder",
&serde_json::json!([LOCATION_CONSENT_SCOPE]),
999,
Some(2_000),
"location-consent-audit",
)
.await?;
insert_scan_receipt(
&pool,
"erasure-db-scan",
did,
"did:exo:responder",
Some("40.0,-70.0"),
1_000,
2_000,
"scan-audit",
Some("scan-anchor"),
)
.await?;
insert_consent_anchor(
&pool,
"erasure-db-consent",
did,
"did:exo:provider",
&serde_json::json!(["location"]),
1_000,
Some(2_000),
"consent-audit",
)
.await?;
insert_trustee_shard(&pool, did, "did:exo:trustee", "guardian", true, Some(1_000)).await?;
sqlx::query(
"INSERT INTO agent_roles (agent_did, role, branch, granted_by, valid_from, expires_at) \
VALUES ($1, $2, $3, $4, $5, NULL)",
)
.bind(did)
.bind("operator")
.bind("executive")
.bind(did)
.bind(1_000_i64)
.execute(&pool)
.await?;
sqlx::query(
"INSERT INTO consent_records (subject_did, actor_did, scope, bailment_type, status, created_at, expires_at) \
VALUES ($1, $2, $3, $4, $5, $6, NULL)",
)
.bind(did)
.bind("did:exo:actor")
.bind("read")
.bind("standard")
.bind("active")
.bind(1_000_i64)
.execute(&pool)
.await?;
sqlx::query(
"INSERT INTO authority_chains (actor_did, chain_json, valid_from, expires_at) \
VALUES ($1, $2, $3, NULL)",
)
.bind(did)
.bind(serde_json::json!({"chain": []}))
.bind(1_000_i64)
.execute(&pool)
.await?;
insert_delegation(
&pool,
"erasure-db-delegation",
"tenant-erasure",
did,
"did:exo:delegatee",
1_000,
2_000,
"exochain-constitution-v1",
&serde_json::json!({"delegator": did}),
)
.await?;
upsert_layout_template(
&pool,
"erasure-db-layout",
Some(did),
"Subject layout",
&serde_json::json!([{"id": "panel", "x": 0, "y": 0}]),
&serde_json::json!(["hidden"]),
false,
1_000,
1_000,
)
.await?;
insert_feedback_issue(
&pool,
"erasure-db-feedback",
"Subject feedback",
"remove reporter DID",
"medium",
"privacy",
"identity-panel",
"dashboard",
Some(did),
Some(&serde_json::json!({"did": did})),
Some(&serde_json::json!({"userAgent": "test"})),
1_000,
)
.await?;
sqlx::query(
"INSERT INTO conflict_declarations (id_hash, declarant_did, nature, related_dids, timestamp_physical_ms, timestamp_logical, payload) \
VALUES ($1, $2, $3, $4, $5, $6, $7)",
)
.bind("erasure-db-conflict")
.bind(did)
.bind("test conflict")
.bind(serde_json::json!(["did:exo:related"]))
.bind(1_000_i64)
.bind(0_i32)
.bind(serde_json::json!({"declarant_did": did}))
.execute(&pool)
.await?;
sqlx::query(
"INSERT INTO conflict_declarations (id_hash, declarant_did, nature, related_dids, timestamp_physical_ms, timestamp_logical, payload) \
VALUES ($1, $2, $3, $4, $5, $6, $7)",
)
.bind("erasure-db-third-party-conflict")
.bind(third_party_declarant)
.bind("third-party conflict")
.bind(serde_json::json!([did]))
.bind(1_001_i64)
.bind(0_i32)
.bind(serde_json::json!({
"declarant_did": third_party_declarant,
"nature": "third-party conflict",
"related_dids": [did],
"timestamp": {
"physical_ms": 1_001_i64,
"logical": 0_i32
}
}))
.execute(&pool)
.await?;
let summary = erase_gateway_identity_records(&pool, did, 9_000).await?;
assert_eq!(summary.did_documents_tombstoned, 1);
assert_eq!(summary.sessions_deleted, 1);
assert_eq!(summary.users_deleted, 1);
assert_eq!(summary.agents_deleted, 1);
assert_eq!(summary.identity_scores_deleted, 1);
assert_eq!(summary.enrollment_log_deleted, 1);
assert_eq!(summary.livesafe_identities_deleted, 1);
assert_eq!(summary.scan_receipts_deleted, 1);
assert_eq!(summary.consent_anchors_deleted, 2);
assert_eq!(summary.trustee_shards_deleted, 1);
assert_eq!(summary.agent_roles_deleted, 1);
assert_eq!(summary.consent_records_deleted, 1);
assert_eq!(summary.authority_chains_deleted, 1);
assert_eq!(summary.delegations_deleted, 1);
assert_eq!(summary.layout_templates_deleted, 1);
assert_eq!(summary.feedback_issues_deleted, 1);
assert_eq!(summary.conflict_declarations_deleted, 1);
assert!(find_did_document(&pool, did).await?.is_none());
let tombstone =
sqlx::query("SELECT revoked, erased_at_ms, document FROM did_documents WHERE did = $1")
.bind(did)
.fetch_one(&pool)
.await?;
assert!(tombstone.get::<bool, _>("revoked"));
assert_eq!(tombstone.get::<Option<i64>, _>("erased_at_ms"), Some(9_000));
assert_eq!(
tombstone.get::<JsonValue, _>("document")["schema"],
"exo.gateway.did_document_tombstone.v1"
);
assert!(
!insert_did_document(&pool, &doc).await?,
"erased DID tombstone must block DID document re-registration"
);
for statement in [
"SELECT COUNT(*) FROM sessions WHERE actor_did = $1",
"SELECT COUNT(*) FROM users WHERE did = $1",
"SELECT COUNT(*) FROM agents WHERE did = $1 OR owner_did = $1",
"SELECT COUNT(*) FROM identity_scores WHERE did = $1",
"SELECT COUNT(*) FROM enrollment_log WHERE did = $1",
"SELECT COUNT(*) FROM livesafe_identities WHERE did = $1",
"SELECT COUNT(*) FROM scan_receipts WHERE subscriber_did = $1 OR responder_did = $1",
"SELECT COUNT(*) FROM consent_anchors WHERE subscriber_did = $1 OR provider_did = $1",
"SELECT COUNT(*) FROM trustee_shard_status WHERE subscriber_did = $1 OR trustee_did = $1",
"SELECT COUNT(*) FROM agent_roles WHERE agent_did = $1 OR granted_by = $1",
"SELECT COUNT(*) FROM consent_records WHERE subject_did = $1 OR actor_did = $1",
"SELECT COUNT(*) FROM authority_chains WHERE actor_did = $1",
"SELECT COUNT(*) FROM delegations WHERE delegator = $1 OR delegatee = $1",
"SELECT COUNT(*) FROM layout_templates WHERE user_did = $1",
"SELECT COUNT(*) FROM feedback_issues WHERE reporter_did = $1",
"SELECT COUNT(*) FROM conflict_declarations WHERE declarant_did = $1",
] {
assert_eq!(count_rows_by_did(&pool, statement, did).await?, 0);
}
let third_party_conflicts_remaining: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM conflict_declarations \
WHERE id_hash = $1 \
AND declarant_did = $2 \
AND related_dids @> jsonb_build_array($3::text)",
)
.bind("erasure-db-third-party-conflict")
.bind(third_party_declarant)
.bind(did)
.fetch_one(&pool)
.await?;
assert_eq!(
third_party_conflicts_remaining, 1,
"identity erasure must preserve third-party conflict declarations that reference the erased DID"
);
cleanup_identity_erasure_fixture(&pool, did).await?;
cleanup_identity_erasure_fixture(&pool, third_party_declarant).await?;
Ok(())
}
#[test]
fn user_and_decision_list_queries_require_tenant_scope() {
let source = production_source();
let users = function_source(source, "list_users_db");
let agent_lookup = function_source(source, "find_agent_by_did");
let agents = function_source(source, "list_agents_db");
let decisions = function_source(source, "list_decisions_db");
assert!(
users.contains("tenant_id: &str"),
"list_users_db must require an explicit tenant scope"
);
assert!(
compact_sql(users)
.contains("FROM users WHERE tenant_id = $1 ORDER BY created_at LIMIT $2"),
"list_users_db must filter by tenant_id before ordering or limiting rows"
);
assert!(
contains_in_order(users, ".bind(tenant_id)", ".bind(MAX_DB_LIST_ROWS)"),
"list_users_db must bind tenant_id before the row limit"
);
assert!(
agents.contains("tenant_id: &str"),
"list_agents_db must require an explicit tenant scope"
);
assert!(
compact_sql(agents)
.contains("FROM agents WHERE tenant_id = $1 ORDER BY created_at LIMIT $2"),
"list_agents_db must filter by tenant_id before ordering or limiting rows"
);
assert!(
contains_in_order(agents, ".bind(tenant_id)", ".bind(MAX_DB_LIST_ROWS)"),
"list_agents_db must bind tenant_id before the row limit"
);
assert!(
!compact_sql(agents).contains("FROM agents ORDER BY created_at LIMIT $1"),
"list_agents_db must not retain an unscoped global listing query"
);
assert!(
agent_lookup.contains("tenant_id: &str"),
"find_agent_by_did must require an explicit tenant scope"
);
assert!(
compact_sql(agent_lookup).contains("FROM agents WHERE did = $1 AND tenant_id = $2"),
"find_agent_by_did must constrain agent lookup by DID and tenant_id"
);
assert!(
contains_in_order(agent_lookup, ".bind(did)", ".bind(tenant_id)"),
"find_agent_by_did must bind DID and tenant_id together"
);
assert!(
decisions.contains("tenant_id: &str"),
"list_decisions_db must require an explicit tenant scope"
);
assert!(
compact_sql(decisions)
.contains("FROM decisions WHERE tenant_id = $1 ORDER BY created_at_ms LIMIT $2"),
"list_decisions_db must filter by tenant_id before ordering or limiting rows"
);
assert!(
contains_in_order(decisions, ".bind(tenant_id)", ".bind(MAX_DB_LIST_ROWS)"),
"list_decisions_db must bind tenant_id before the row limit"
);
}
#[test]
fn decision_lookup_requires_tenant_scope() {
let source = production_source();
let lookup = function_source(source, "find_decision");
assert!(
lookup.contains("tenant_id: &str"),
"find_decision must require an explicit tenant scope"
);
assert!(
compact_sql(lookup).contains("FROM decisions WHERE id_hash = $1 AND tenant_id = $2"),
"find_decision must include tenant_id in the decision lookup predicate"
);
assert!(
contains_in_order(lookup, ".bind(id_hash)", ".bind(tenant_id)"),
"find_decision must bind id_hash and tenant_id together"
);
}
#[test]
fn decision_table_primary_key_is_tenant_scoped() {
let migrations = compact_sql(&migration_sources_from_disk());
assert!(
migrations.contains("ALTER TABLE decisions DROP CONSTRAINT IF EXISTS decisions_pkey"),
"tenant-scoping the historical decisions primary key must happen through a forward migration"
);
assert!(
migrations
.contains("ALTER TABLE decisions ADD CONSTRAINT decisions_pkey PRIMARY KEY (tenant_id, id_hash)"),
"decisions must be migrated to a tenant_id + id_hash primary key so identical hashes in different tenants cannot collide"
);
}
#[test]
fn decision_write_helpers_require_tenant_scope() {
let source = production_source();
let insert = function_source(source, "insert_decision");
let create = function_source(source, "create_decision");
let update = function_source(source, "update_decision");
assert!(
compact_sql(insert).contains("ON CONFLICT (tenant_id, id_hash) DO UPDATE"),
"insert_decision upserts must conflict only inside the same tenant"
);
assert!(
!compact_sql(insert).contains("ON CONFLICT (id_hash) DO UPDATE"),
"insert_decision must not upsert through a global id_hash conflict target"
);
assert!(
compact_sql(create).contains("ON CONFLICT (tenant_id, id_hash) DO NOTHING"),
"create_decision duplicate detection must be tenant-scoped"
);
assert!(
update.contains("tenant_id: &str"),
"update_decision must require an explicit tenant scope"
);
assert!(
compact_sql(update).contains(
"UPDATE decisions SET status = $1, payload = $2 WHERE id_hash = $3 AND tenant_id = $4"
),
"update_decision must constrain mutations by id_hash and tenant_id"
);
assert!(
contains_in_order(update, ".bind(id_hash)", ".bind(tenant_id)"),
"update_decision must bind id_hash and tenant_id together"
);
}
#[test]
fn quorum_eligibility_counts_are_tenant_scoped_and_human_bounded() {
let source = production_source();
let count = function_source(source, "count_quorum_eligible_voters_with_executor");
assert!(
count.contains("tenant_id: &str"),
"quorum eligibility counting must require an explicit tenant scope"
);
assert!(
compact_sql(count).contains("FROM users WHERE tenant_id = $1 AND status = 'Active'"),
"human quorum eligibility must count only active users inside the authenticated tenant"
);
assert!(
compact_sql(count).contains("FROM agents WHERE tenant_id = $1 AND status = 'Active'"),
"agent quorum eligibility must count only active agents inside the authenticated tenant"
);
assert!(
count.contains("delegation_id IS NOT NULL"),
"AI agents must not be quorum-eligible without a delegated authority boundary"
);
assert!(
count.contains("max_decision_class"),
"AI quorum eligibility must be bounded by the agent's maximum decision class"
);
assert!(
count.contains("eligible_human_voters: active_human_users"),
"human quorum eligibility must not include agents or unrelated DIDs"
);
}
#[test]
fn active_human_user_vote_lookup_is_tenant_scoped_and_candidate_bounded() {
let source = production_source();
let lookup = function_source(source, "active_human_user_dids_for_votes");
assert!(
lookup.contains("tenant_id: &str"),
"verified human voter lookup must require an explicit tenant scope"
);
assert!(
compact_sql(lookup)
.contains("FROM users WHERE tenant_id = $1 AND status = 'Active' AND did = ANY($2) ORDER BY did"),
"verified human voter lookup must only return active users from the authenticated tenant and candidate vote set"
);
assert!(
contains_in_order(lookup, ".bind(tenant_id)", ".bind(voter_dids)"),
"verified human voter lookup must bind tenant scope before the candidate vote set"
);
}
#[tokio::test]
async fn decision_writes_allow_same_hash_across_tenants_without_overwrite()
-> std::result::Result<(), Box<dyn std::error::Error>> {
let Some(pool) = gateway_test_pool().await else {
return Ok(());
};
let id_hash = "tenant-scoped-decision-write";
for tenant_id in ["tenant-a-write", "tenant-b-write"] {
sqlx::query("DELETE FROM decisions WHERE id_hash = $1 AND tenant_id = $2")
.bind(id_hash)
.bind(tenant_id)
.execute(&pool)
.await?;
}
create_decision(
&pool,
id_hash,
"tenant-a-write",
"Open",
"Tenant A",
"Routine",
"did:exo:tenant-a-author",
10_000,
"exochain-constitution-v1",
&serde_json::json!({"tenant": "a", "status": "Open"}),
)
.await?;
create_decision(
&pool,
id_hash,
"tenant-b-write",
"Open",
"Tenant B",
"Routine",
"did:exo:tenant-b-author",
10_001,
"exochain-constitution-v1",
&serde_json::json!({"tenant": "b", "status": "Open"}),
)
.await?;
update_decision(
&pool,
id_hash,
"tenant-b-write",
"Closed",
&serde_json::json!({"tenant": "b", "status": "Closed"}),
)
.await?;
let tenant_a = find_decision(&pool, id_hash, "tenant-a-write")
.await?
.expect("tenant-a decision must remain present");
let tenant_b = find_decision(&pool, id_hash, "tenant-b-write")
.await?
.expect("tenant-b decision must remain present");
assert_eq!(tenant_a.status, "Open");
assert_eq!(tenant_a.payload["tenant"], "a");
assert_eq!(tenant_b.status, "Closed");
assert_eq!(tenant_b.payload["tenant"], "b");
for tenant_id in ["tenant-a-write", "tenant-b-write"] {
sqlx::query("DELETE FROM decisions WHERE id_hash = $1 AND tenant_id = $2")
.bind(id_hash)
.bind(tenant_id)
.execute(&pool)
.await?;
}
Ok(())
}
#[tokio::test]
async fn quorum_eligibility_counts_ignore_other_tenants_and_ineligible_agents()
-> std::result::Result<(), Box<dyn std::error::Error>> {
let Some(pool) = gateway_test_pool().await else {
return Ok(());
};
let tenant_a = "tenant-quorum-a";
let tenant_b = "tenant-quorum-b";
let user_dids = [
"did:exo:quorum-a-human-1",
"did:exo:quorum-a-human-2",
"did:exo:quorum-a-human-3",
"did:exo:quorum-a-inactive-human",
"did:exo:quorum-b-human-1",
];
let agent_dids = [
"did:exo:quorum-a-agent-routine",
"did:exo:quorum-a-agent-operational",
"did:exo:quorum-a-agent-strategic",
"did:exo:quorum-a-agent-undelegated",
"did:exo:quorum-a-agent-inactive",
"did:exo:quorum-b-agent-constitutional",
];
for did in user_dids {
sqlx::query("DELETE FROM users WHERE did = $1")
.bind(did)
.execute(&pool)
.await?;
}
for did in agent_dids {
sqlx::query("DELETE FROM agents WHERE did = $1")
.bind(did)
.execute(&pool)
.await?;
}
for (idx, did) in user_dids.iter().take(3).enumerate() {
insert_user(
&pool,
did,
"Quorum Human",
&format!("quorum-human-{idx}@example.invalid"),
&serde_json::json!(["member"]),
tenant_a,
i64::try_from(idx + 1)?,
"Active",
"Verified",
"hash",
"salt",
true,
)
.await?;
}
insert_user(
&pool,
"did:exo:quorum-a-inactive-human",
"Inactive Human",
"quorum-inactive@example.invalid",
&serde_json::json!(["member"]),
tenant_a,
10,
"Suspended",
"Verified",
"hash",
"salt",
true,
)
.await?;
insert_user(
&pool,
"did:exo:quorum-b-human-1",
"Other Tenant Human",
"quorum-other@example.invalid",
&serde_json::json!(["member"]),
tenant_b,
11,
"Active",
"Verified",
"hash",
"salt",
true,
)
.await?;
for (did, delegation_id, status, max_class) in [
(
"did:exo:quorum-a-agent-routine",
Some("delegation-routine"),
"Active",
"Routine",
),
(
"did:exo:quorum-a-agent-operational",
Some("delegation-operational"),
"Active",
"Operational",
),
(
"did:exo:quorum-a-agent-strategic",
Some("delegation-strategic"),
"Active",
"Strategic",
),
(
"did:exo:quorum-a-agent-undelegated",
None,
"Active",
"Constitutional",
),
(
"did:exo:quorum-a-agent-inactive",
Some("delegation-inactive"),
"Suspended",
"Constitutional",
),
] {
insert_agent(
&pool,
did,
"Quorum Agent",
"delegate",
"did:exo:quorum-a-human-1",
tenant_a,
&serde_json::json!(["vote"]),
"Trusted",
100,
delegation_id,
"Verified",
20,
status,
max_class,
)
.await?;
}
insert_agent(
&pool,
"did:exo:quorum-b-agent-constitutional",
"Other Tenant Agent",
"delegate",
"did:exo:quorum-b-human-1",
tenant_b,
&serde_json::json!(["vote"]),
"Trusted",
100,
Some("delegation-other"),
"Verified",
21,
"Active",
"Constitutional",
)
.await?;
let routine = count_quorum_eligible_voters(&pool, tenant_a, DecisionClass::Routine).await?;
assert_eq!(routine.eligible_human_voters, 3);
assert_eq!(routine.eligible_voters, 6);
let operational =
count_quorum_eligible_voters(&pool, tenant_a, DecisionClass::Operational).await?;
assert_eq!(operational.eligible_human_voters, 3);
assert_eq!(operational.eligible_voters, 5);
let strategic =
count_quorum_eligible_voters(&pool, tenant_a, DecisionClass::Strategic).await?;
assert_eq!(strategic.eligible_human_voters, 3);
assert_eq!(strategic.eligible_voters, 4);
for did in user_dids {
sqlx::query("DELETE FROM users WHERE did = $1")
.bind(did)
.execute(&pool)
.await?;
}
for did in agent_dids {
sqlx::query("DELETE FROM agents WHERE did = $1")
.bind(did)
.execute(&pool)
.await?;
}
Ok(())
}
#[test]
fn audit_entry_lookup_requires_decision_and_tenant_scope() {
let source = production_source();
let lookup = function_source(source, "list_audit_entries_for_decision");
assert!(
lookup.contains("tenant_id: &str"),
"list_audit_entries_for_decision must require an explicit tenant scope"
);
assert!(
compact_sql(lookup).contains(
"FROM audit_entries WHERE decision_id = $1 AND tenant_id = $2 ORDER BY sequence LIMIT $3"
),
"list_audit_entries_for_decision must constrain audit rows by decision_id and tenant_id"
);
assert!(
contains_in_order(lookup, ".bind(decision_id)", ".bind(tenant_id)")
&& contains_in_order(lookup, ".bind(tenant_id)", ".bind(MAX_DB_LIST_ROWS)"),
"list_audit_entries_for_decision must bind decision_id, tenant_id, then row limit"
);
}
#[test]
fn create_decision_inserts_once_without_upsert_overwrite() {
let source = production_source();
let create = function_source(source, "create_decision");
assert!(
source.contains("pub enum DecisionCreateError"),
"create_decision must return typed duplicate-decision errors"
);
assert!(
create.contains("-> Result<(), DecisionCreateError>"),
"create_decision must distinguish duplicate ids from SQL failures"
);
assert!(
compact_sql(create).contains("ON CONFLICT (tenant_id, id_hash) DO NOTHING"),
"decision creation must not overwrite an existing decision row and duplicate detection must be tenant-scoped"
);
assert!(
create.contains("rows_affected()"),
"decision creation must inspect PgQueryResult row count"
);
assert!(
create.contains("DecisionCreateError::AlreadyExists"),
"decision creation must report a conflict when the id already exists"
);
assert!(
!create.contains("DO UPDATE"),
"decision creation must not silently mutate a pre-existing decision"
);
}
#[test]
fn update_decision_reports_missing_rows() {
let source = production_source();
let update = function_source(source, "update_decision");
assert!(
source.contains("pub enum DecisionUpdateError"),
"update_decision must use a typed error for missing-row decisions"
);
assert!(
update.contains("-> Result<(), DecisionUpdateError>"),
"update_decision must distinguish SQL failures from missing decision rows"
);
assert!(
update.contains("tenant_id: &str"),
"update_decision must require tenant_id before mutating a decision"
);
assert!(
compact_sql(update).contains(
"UPDATE decisions SET status = $1, payload = $2 WHERE id_hash = $3 AND tenant_id = $4"
),
"update_decision must update only the authenticated tenant's decision row"
);
assert!(
update.contains("rows_affected()"),
"update_decision must inspect PgQueryResult row count"
);
assert!(
update.contains("DecisionUpdateError::MissingDecision"),
"update_decision must return a missing-row error when no decision is updated"
);
assert!(
!update.contains(".execute(pool).await?;\n Ok(())"),
"update_decision must not discard PgQueryResult and report success"
);
}
#[test]
fn pace_update_helpers_report_missing_rows() {
let source = production_source();
for name in ["update_user_pace", "update_agent_pace"] {
let update = function_source(source, name);
assert!(
update.contains("rows_affected()"),
"{name} must inspect PgQueryResult row count"
);
assert!(
update.contains("sqlx::Error::RowNotFound"),
"{name} must report a missing subject row instead of returning success"
);
assert!(
!update.contains(".execute(pool)\n .await?;\n Ok(())"),
"{name} must not discard PgQueryResult and report success"
);
}
}
#[test]
fn pool_initialization_sets_explicit_connection_acquire_timeout() {
let source = production_source();
let init_pool = function_source(source, "init_pool");
assert!(
source.contains("const DB_POOL_ACQUIRE_TIMEOUT_SECS: u64"),
"gateway DB pool timeout must be explicit and centrally named"
);
assert!(
init_pool
.contains(".acquire_timeout(Duration::from_secs(DB_POOL_ACQUIRE_TIMEOUT_SECS))"),
"gateway DB pool initialization must bound waits for pooled or newly opened connections"
);
}
#[test]
fn public_user_row_has_no_password_material() {
let row = PublicUserRow {
did: "did:exo:user".to_owned(),
display_name: "User".to_owned(),
email: "user@example.invalid".to_owned(),
roles: serde_json::json!(["member"]),
tenant_id: "tenant".to_owned(),
created_at: 1,
status: "active".to_owned(),
pace_status: "normal".to_owned(),
mfa_enabled: true,
};
let debug = format!("{row:?}");
assert!(!debug.contains("password"));
assert!(!debug.contains("salt"));
assert!(debug.contains("did:exo:user"));
}
#[test]
fn list_users_db_never_selects_password_material() {
let source = include_str!("db.rs");
let Some(fn_start) = source.find("pub async fn list_users_db") else {
panic!("list_users_db source must be present");
};
let after_list_users = &source[fn_start..];
let Some(fn_end) = after_list_users.find("/// Update a user's PACE enrollment status.")
else {
panic!("list_users_db source terminator must be present");
};
let list_users_source = &after_list_users[..fn_end];
assert!(
!list_users_source.contains("password_hash"),
"list_users_db must not select password hashes"
);
assert!(
!list_users_source.contains("salt"),
"list_users_db must not select password salts"
);
assert!(
list_users_source.contains("Result<Vec<PublicUserRow>"),
"list_users_db must return the public user projection"
);
}
#[test]
fn user_lookup_apis_never_select_password_material() {
let source = include_str!("db.rs");
for (name, terminator) in [
("find_user_by_email", "/// Look up a user by DID"),
(
"find_user_by_did",
"/// List users for a tenant ordered by creation time.",
),
] {
let Some(fn_start) = source.find(&format!("pub async fn {name}")) else {
panic!("{name} source must be present");
};
let after_start = &source[fn_start..];
let Some(fn_end) = after_start.find(terminator) else {
panic!("{name} source terminator must be present");
};
let lookup_source = &after_start[..fn_end];
assert!(
!lookup_source.contains("password_hash"),
"{name} must not select password hashes"
);
assert!(
!lookup_source.contains("salt"),
"{name} must not select password salts"
);
assert!(
lookup_source.contains("Result<Option<PublicUserRow>"),
"{name} must return the non-secret public user projection"
);
}
}
}