use std::{collections::BTreeSet, future::Future, path::Path};
use exo_core::types::{Did, Hash256, Signature, Timestamp, TrustReceipt};
use exo_dag::{
consensus::{CommitCertificate, Vote},
dag::DagNode,
error::{DagError, Result as DagResult},
};
use exo_economy::{EconomyObjectKind, EconomyRecordAnchor};
use serde::{Serialize, de::DeserializeOwned};
use sqlx::{PgPool, Postgres, Row, Transaction};
fn store_err(e: impl std::fmt::Display) -> DagError {
DagError::StoreError(e.to_string())
}
use rusqlite::{Connection, params};
fn sqlite_u64_to_i64(value: u64, field: &str) -> DagResult<i64> {
i64::try_from(value)
.map_err(|_| store_err(format!("{field} value {value} exceeds SQLite INTEGER max")))
}
fn sqlite_i64_to_u64(value: i64, field: &str) -> DagResult<u64> {
u64::try_from(value).map_err(|_| store_err(format!("{field} value {value} is negative")))
}
fn decode_hash_bytes(bytes: &[u8], field: &str) -> DagResult<Hash256> {
let arr: [u8; 32] = bytes
.try_into()
.map_err(|_| store_err(format!("{field} must be 32 bytes, got {}", bytes.len())))?;
Ok(Hash256::from_bytes(arr))
}
fn decode_signature_bytes(bytes: &[u8], field: &str) -> DagResult<Signature> {
let arr: [u8; 64] = bytes
.try_into()
.map_err(|_| store_err(format!("{field} must be 64 bytes, got {}", bytes.len())))?;
let signature = Signature::from_bytes(arr);
validate_signature(&signature, field)?;
Ok(signature)
}
fn validate_signature(signature: &Signature, field: &str) -> DagResult<()> {
if signature.is_empty() {
return Err(store_err(format!("{field} must not be empty or all-zero")));
}
Ok(())
}
fn encode_cbor<T: Serialize>(value: &T, field: &str) -> DagResult<Vec<u8>> {
let mut buf = Vec::new();
ciborium::into_writer(value, &mut buf)
.map_err(|e| store_err(format!("{field} CBOR encode: {e}")))?;
Ok(buf)
}
fn decode_cbor<T: DeserializeOwned>(bytes: &[u8], field: &str) -> DagResult<T> {
ciborium::from_reader(bytes).map_err(|e| store_err(format!("{field} CBOR decode: {e}")))
}
fn validate_ed25519_signature<'a>(
signature: &'a Signature,
field: &str,
) -> DagResult<&'a [u8; 64]> {
let Signature::Ed25519(bytes) = signature else {
return Err(store_err(format!(
"{field} must be an Ed25519 signature for consensus persistence"
)));
};
if bytes.iter().all(|b| *b == 0) {
return Err(store_err(format!("{field} must not be empty or all-zero")));
}
Ok(bytes)
}
fn decode_did(value: &str, field: &str) -> DagResult<Did> {
Did::new(value).map_err(|e| store_err(format!("{field} is invalid: {e}")))
}
fn validate_vote(vote: &Vote, context: &str) -> DagResult<()> {
validate_ed25519_signature(&vote.signature, &format!("{context}.signature"))?;
Ok(())
}
fn validate_commit_certificate(cert: &CommitCertificate) -> DagResult<()> {
if cert.votes.is_empty() {
return Err(store_err("commit_certificates.votes must not be empty"));
}
for (idx, vote) in cert.votes.iter().enumerate() {
let context = format!("commit_certificates.votes[{idx}]");
if vote.round != cert.round {
return Err(store_err(format!(
"{context}.round {} does not match certificate round {}",
vote.round, cert.round
)));
}
if vote.node_hash != cert.node_hash {
return Err(store_err(format!(
"{context}.node_hash does not match certificate node_hash"
)));
}
validate_vote(vote, &context)?;
}
Ok(())
}
pub struct SqliteDagStore {
backend: NodeStoreBackend,
}
enum NodeStoreBackend {
#[allow(dead_code)]
LegacySqlite(Connection),
DagDb(PostgresDagNodeStore),
}
#[derive(Clone)]
struct PostgresDagNodeStore {
pool: PgPool,
tenant_id: String,
namespace: String,
}
pub struct DagDbNodeStore;
impl DagDbNodeStore {
pub async fn open(
pool: PgPool,
tenant_id: String,
namespace: String,
) -> anyhow::Result<SqliteDagStore> {
validate_scope_component("tenant_id", &tenant_id)?;
validate_scope_component("namespace", &namespace)?;
let store = PostgresDagNodeStore {
pool,
tenant_id,
namespace,
};
store.verify_schema().await?;
Ok(SqliteDagStore {
backend: NodeStoreBackend::DagDb(store),
})
}
}
fn validate_scope_component(field: &str, value: &str) -> anyhow::Result<()> {
if value.trim().is_empty() {
anyhow::bail!("DAG DB node store {field} must not be empty");
}
Ok(())
}
fn block_on_dagdb<T, F>(future: F) -> DagResult<T>
where
T: Send + 'static,
F: Future<Output = DagResult<T>> + Send + 'static,
{
match tokio::runtime::Handle::try_current() {
Ok(_) => std::thread::spawn(move || {
let runtime = tokio::runtime::Runtime::new()
.map_err(|error| store_err(format!("DAG DB node store runtime: {error}")))?;
runtime.block_on(future)
})
.join()
.map_err(|_| store_err("DAG DB node store worker panicked"))?,
Err(_) => {
let runtime = tokio::runtime::Runtime::new()
.map_err(|error| store_err(format!("DAG DB node store runtime: {error}")))?;
runtime.block_on(future)
}
}
}
impl SqliteDagStore {
fn dagdb(&self) -> Option<&PostgresDagNodeStore> {
match &self.backend {
NodeStoreBackend::LegacySqlite(_) => None,
NodeStoreBackend::DagDb(store) => Some(store),
}
}
fn sqlite_conn(&self) -> DagResult<&Connection> {
match &self.backend {
NodeStoreBackend::LegacySqlite(conn) => Ok(conn),
NodeStoreBackend::DagDb(_) => Err(store_err(
"legacy SQLite connection is unavailable for DAG DB-backed node store",
)),
}
}
fn sqlite_conn_mut(&mut self) -> DagResult<&mut Connection> {
match &mut self.backend {
NodeStoreBackend::LegacySqlite(conn) => Ok(conn),
NodeStoreBackend::DagDb(_) => Err(store_err(
"legacy SQLite connection is unavailable for DAG DB-backed node store",
)),
}
}
}
impl PostgresDagNodeStore {
async fn verify_schema(&self) -> anyhow::Result<()> {
let mut tx =
self.pool.begin().await.map_err(|error| {
anyhow::anyhow!("DAG DB node store schema check failed: {error}")
})?;
self.bind_tenant(&mut tx)
.await
.map_err(|error| anyhow::anyhow!("DAG DB node store tenant binding failed: {error}"))?;
let present: bool =
sqlx::query_scalar("SELECT to_regclass('dagdb_node_dag_nodes') IS NOT NULL")
.fetch_one(&mut *tx)
.await
.map_err(|error| {
anyhow::anyhow!("DAG DB node store schema lookup failed: {error}")
})?;
tx.commit().await.map_err(|error| {
anyhow::anyhow!("DAG DB node store schema check commit failed: {error}")
})?;
if !present {
anyhow::bail!("DAG DB node store schema is missing dagdb_node_dag_nodes");
}
Ok(())
}
async fn bind_tenant(
&self,
tx: &mut Transaction<'_, Postgres>,
) -> std::result::Result<(), sqlx::Error> {
sqlx::query("SELECT set_config('exo.tenant_id', $1, true)")
.bind(&self.tenant_id)
.execute(&mut **tx)
.await?;
Ok(())
}
async fn begin(&self) -> DagResult<Transaction<'_, Postgres>> {
let mut tx = self.pool.begin().await.map_err(store_err)?;
self.bind_tenant(&mut tx).await.map_err(store_err)?;
Ok(tx)
}
async fn insert_node_tx(
&self,
tx: &mut Transaction<'_, Postgres>,
node: &DagNode,
) -> DagResult<()> {
let cbor = SqliteDagStore::encode_node(node)?;
sqlx::query(
"INSERT INTO dagdb_node_dag_nodes (tenant_id, namespace, hash, cbor_payload) \
VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(node.hash.as_bytes().to_vec())
.bind(cbor)
.execute(&mut **tx)
.await
.map_err(store_err)?;
for parent in &node.parents {
sqlx::query(
"INSERT INTO dagdb_node_dag_parents \
(tenant_id, namespace, child_hash, parent_hash) \
VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(node.hash.as_bytes().to_vec())
.bind(parent.as_bytes().to_vec())
.execute(&mut **tx)
.await
.map_err(store_err)?;
}
Ok(())
}
async fn ensure_node_exists_tx(
&self,
tx: &mut Transaction<'_, Postgres>,
hash: &Hash256,
) -> DagResult<()> {
let present: bool = sqlx::query_scalar(
"SELECT EXISTS (
SELECT 1 FROM dagdb_node_dag_nodes
WHERE tenant_id = $1 AND namespace = $2 AND hash = $3
)",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(hash.as_bytes().to_vec())
.fetch_one(&mut **tx)
.await
.map_err(store_err)?;
if present {
Ok(())
} else {
Err(DagError::NodeNotFound(*hash))
}
}
async fn insert_committed_tx(
&self,
tx: &mut Transaction<'_, Postgres>,
hash: &Hash256,
height: u64,
) -> DagResult<()> {
let height = sqlite_u64_to_i64(height, "dagdb_node_committed.height")?;
sqlx::query(
"INSERT INTO dagdb_node_committed (tenant_id, namespace, hash, height) \
VALUES ($1, $2, $3, $4) \
ON CONFLICT (tenant_id, namespace, hash) DO UPDATE SET height = EXCLUDED.height",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(hash.as_bytes().to_vec())
.bind(height)
.execute(&mut **tx)
.await
.map_err(store_err)?;
Ok(())
}
async fn insert_certificate_tx(
&self,
tx: &mut Transaction<'_, Postgres>,
cert: &CommitCertificate,
) -> DagResult<()> {
let round = sqlite_u64_to_i64(cert.round, "dagdb_node_commit_certificates.round")?;
validate_commit_certificate(cert)?;
let cbor_buf = encode_cbor(cert, "dagdb_node_commit_certificates.cbor_data")?;
sqlx::query(
"INSERT INTO dagdb_node_commit_certificates \
(tenant_id, namespace, node_hash, round, cbor_data) \
VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(cert.node_hash.as_bytes().to_vec())
.bind(round)
.bind(cbor_buf)
.execute(&mut **tx)
.await
.map_err(store_err)?;
Ok(())
}
async fn insert_receipt_tx(
&self,
tx: &mut Transaction<'_, Postgres>,
receipt: &TrustReceipt,
) -> DagResult<()> {
validate_signature(&receipt.signature, "dagdb_node_trust_receipts.signature")?;
let timestamp_ms = sqlite_u64_to_i64(
receipt.timestamp.physical_ms,
"dagdb_node_trust_receipts.timestamp_ms",
)?;
let buf = encode_cbor(receipt, "dagdb_node_trust_receipts.cbor_data")?;
sqlx::query(
"INSERT INTO dagdb_node_trust_receipts \
(tenant_id, namespace, receipt_hash, actor_did, action_type, outcome, timestamp_ms, cbor_data) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT DO NOTHING",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(receipt.receipt_hash.as_bytes().to_vec())
.bind(receipt.actor_did.to_string())
.bind(receipt.action_type.as_str())
.bind(receipt.outcome.to_string())
.bind(timestamp_ms)
.bind(buf)
.execute(&mut **tx)
.await
.map_err(store_err)?;
Ok(())
}
async fn get_sync_async(&self, hash: &Hash256) -> DagResult<Option<DagNode>> {
let mut tx = self.begin().await?;
let result: Option<Vec<u8>> = sqlx::query_scalar(
"SELECT cbor_payload FROM dagdb_node_dag_nodes \
WHERE tenant_id = $1 AND namespace = $2 AND hash = $3",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(hash.as_bytes().to_vec())
.fetch_optional(&mut *tx)
.await
.map_err(store_err)?;
tx.commit().await.map_err(store_err)?;
result
.map(|bytes| SqliteDagStore::decode_node(&bytes))
.transpose()
}
async fn put_many_sync_async(&self, nodes: Vec<DagNode>) -> DagResult<()> {
let mut tx = self.begin().await?;
for node in &nodes {
self.insert_node_tx(&mut tx, node).await?;
}
tx.commit().await.map_err(store_err)?;
Ok(())
}
async fn put_committed_node_with_receipt_sync_async(
&self,
node: DagNode,
height: u64,
receipt: TrustReceipt,
) -> DagResult<()> {
if receipt.action_hash != node.hash {
return Err(store_err(
"dagdb_node_trust_receipts.action_hash must match committed node hash",
));
}
let mut tx = self.begin().await?;
self.insert_node_tx(&mut tx, &node).await?;
self.insert_committed_tx(&mut tx, &node.hash, height)
.await?;
self.insert_receipt_tx(&mut tx, &receipt).await?;
tx.commit().await.map_err(store_err)?;
Ok(())
}
async fn contains_sync_async(&self, hash: &Hash256) -> DagResult<bool> {
let mut tx = self.begin().await?;
let present: bool = sqlx::query_scalar(
"SELECT EXISTS (
SELECT 1 FROM dagdb_node_dag_nodes
WHERE tenant_id = $1 AND namespace = $2 AND hash = $3
)",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(hash.as_bytes().to_vec())
.fetch_one(&mut *tx)
.await
.map_err(store_err)?;
tx.commit().await.map_err(store_err)?;
Ok(present)
}
async fn tips_sync_async(&self) -> DagResult<Vec<Hash256>> {
let mut tx = self.begin().await?;
let rows: Vec<Vec<u8>> = sqlx::query_scalar(
"SELECT node.hash FROM dagdb_node_dag_nodes node
WHERE node.tenant_id = $1
AND node.namespace = $2
AND NOT EXISTS (
SELECT 1 FROM dagdb_node_dag_parents parent
WHERE parent.tenant_id = node.tenant_id
AND parent.namespace = node.namespace
AND parent.parent_hash = node.hash
)
ORDER BY node.hash ASC",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.fetch_all(&mut *tx)
.await
.map_err(store_err)?;
tx.commit().await.map_err(store_err)?;
rows.into_iter()
.map(|bytes| decode_hash_bytes(&bytes, "dagdb_node_dag_nodes.hash"))
.collect()
}
async fn committed_height_sync_async(&self) -> DagResult<u64> {
let mut tx = self.begin().await?;
let height: i64 = sqlx::query_scalar(
"SELECT COALESCE(MAX(height), 0) FROM dagdb_node_committed \
WHERE tenant_id = $1 AND namespace = $2",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.fetch_one(&mut *tx)
.await
.map_err(store_err)?;
tx.commit().await.map_err(store_err)?;
sqlite_i64_to_u64(height, "dagdb_node_committed.height")
}
async fn mark_committed_sync_async(&self, hash: Hash256, height: u64) -> DagResult<()> {
let mut tx = self.begin().await?;
self.ensure_node_exists_tx(&mut tx, &hash).await?;
self.insert_committed_tx(&mut tx, &hash, height).await?;
tx.commit().await.map_err(store_err)?;
Ok(())
}
async fn committed_nodes_in_range_async(
&self,
from_height: u64,
to_height: u64,
) -> DagResult<Vec<(Hash256, u64)>> {
let from_height = sqlite_u64_to_i64(from_height, "dagdb_node_committed.from_height")?;
let to_height = sqlite_u64_to_i64(to_height, "dagdb_node_committed.to_height")?;
let mut tx = self.begin().await?;
let rows = sqlx::query(
"SELECT hash, height FROM dagdb_node_committed
WHERE tenant_id = $1 AND namespace = $2 AND height >= $3 AND height <= $4
ORDER BY height ASC",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(from_height)
.bind(to_height)
.fetch_all(&mut *tx)
.await
.map_err(store_err)?;
tx.commit().await.map_err(store_err)?;
rows.into_iter()
.map(|row| {
let hash: Vec<u8> = row.get("hash");
let height: i64 = row.get("height");
Ok((
decode_hash_bytes(&hash, "dagdb_node_committed.hash")?,
sqlite_i64_to_u64(height, "dagdb_node_committed.height")?,
))
})
.collect()
}
async fn save_consensus_round_async(&self, round: u64) -> DagResult<()> {
let mut tx = self.begin().await?;
sqlx::query(
"INSERT INTO dagdb_node_consensus_meta (tenant_id, namespace, key, value) \
VALUES ($1, $2, 'round', $3) \
ON CONFLICT (tenant_id, namespace, key) DO UPDATE SET value = EXCLUDED.value",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(round.to_string())
.execute(&mut *tx)
.await
.map_err(store_err)?;
tx.commit().await.map_err(store_err)?;
Ok(())
}
async fn load_consensus_round_async(&self) -> DagResult<u64> {
let mut tx = self.begin().await?;
let value: Option<String> = sqlx::query_scalar(
"SELECT value FROM dagdb_node_consensus_meta \
WHERE tenant_id = $1 AND namespace = $2 AND key = 'round'",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.fetch_optional(&mut *tx)
.await
.map_err(store_err)?;
tx.commit().await.map_err(store_err)?;
value.map_or(Ok(0), |text| text.parse::<u64>().map_err(store_err))
}
async fn save_vote_async(&self, vote: Vote) -> DagResult<()> {
let round = sqlite_u64_to_i64(vote.round, "dagdb_node_consensus_votes.round")?;
validate_vote(&vote, "dagdb_node_consensus_votes")?;
let signature =
validate_ed25519_signature(&vote.signature, "dagdb_node_consensus_votes.signature")?;
let mut tx = self.begin().await?;
sqlx::query(
"INSERT INTO dagdb_node_consensus_votes \
(tenant_id, namespace, round, node_hash, voter_did, signature) \
VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(round)
.bind(vote.node_hash.as_bytes().to_vec())
.bind(vote.voter.to_string())
.bind(signature.to_vec())
.execute(&mut *tx)
.await
.map_err(store_err)?;
tx.commit().await.map_err(store_err)?;
Ok(())
}
async fn load_votes_for_round_async(&self, round: u64) -> DagResult<Vec<Vote>> {
let round_i64 = sqlite_u64_to_i64(round, "dagdb_node_consensus_votes.round")?;
let mut tx = self.begin().await?;
let rows = sqlx::query(
"SELECT node_hash, voter_did, signature FROM dagdb_node_consensus_votes \
WHERE tenant_id = $1 AND namespace = $2 AND round = $3 \
ORDER BY node_hash ASC, voter_did ASC",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(round_i64)
.fetch_all(&mut *tx)
.await
.map_err(store_err)?;
tx.commit().await.map_err(store_err)?;
rows.into_iter()
.map(|row| {
let hash_bytes: Vec<u8> = row.get("node_hash");
let voter_str: String = row.get("voter_did");
let sig_bytes: Vec<u8> = row.get("signature");
Ok(Vote {
voter: decode_did(&voter_str, "dagdb_node_consensus_votes.voter_did")?,
round,
node_hash: decode_hash_bytes(
&hash_bytes,
"dagdb_node_consensus_votes.node_hash",
)?,
signature: decode_signature_bytes(
&sig_bytes,
"dagdb_node_consensus_votes.signature",
)?,
})
})
.collect()
}
async fn save_certificate_async(&self, cert: CommitCertificate) -> DagResult<()> {
let mut tx = self.begin().await?;
self.insert_certificate_tx(&mut tx, &cert).await?;
tx.commit().await.map_err(store_err)?;
Ok(())
}
async fn load_certificates_async(&self) -> DagResult<Vec<CommitCertificate>> {
let mut tx = self.begin().await?;
let rows: Vec<Vec<u8>> = sqlx::query_scalar(
"SELECT cbor_data FROM dagdb_node_commit_certificates \
WHERE tenant_id = $1 AND namespace = $2 ORDER BY round ASC, node_hash ASC",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.fetch_all(&mut *tx)
.await
.map_err(store_err)?;
tx.commit().await.map_err(store_err)?;
rows.into_iter()
.map(|bytes| {
let cert: CommitCertificate = ciborium::from_reader(bytes.as_slice())
.map_err(|e| store_err(format!("CBOR decode certificate: {e}")))?;
validate_commit_certificate(&cert)?;
Ok(cert)
})
.collect()
}
async fn load_certificate_for_hash_async(
&self,
hash: Hash256,
) -> DagResult<Option<CommitCertificate>> {
let mut tx = self.begin().await?;
let result: Option<Vec<u8>> = sqlx::query_scalar(
"SELECT cbor_data FROM dagdb_node_commit_certificates \
WHERE tenant_id = $1 AND namespace = $2 AND node_hash = $3",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(hash.as_bytes().to_vec())
.fetch_optional(&mut *tx)
.await
.map_err(store_err)?;
tx.commit().await.map_err(store_err)?;
result
.map(|bytes| {
let certificate: CommitCertificate = ciborium::from_reader(bytes.as_slice())
.map_err(|e| store_err(format!("CBOR decode certificate: {e}")))?;
if certificate.node_hash != hash {
return Err(store_err(
"dagdb_node_commit_certificates.node_hash does not match CBOR certificate node_hash",
));
}
validate_commit_certificate(&certificate)?;
Ok(certificate)
})
.transpose()
}
async fn save_validator_set_async(&self, validators: BTreeSet<Did>) -> DagResult<()> {
let mut tx = self.begin().await?;
sqlx::query("DELETE FROM dagdb_node_validators WHERE tenant_id = $1 AND namespace = $2")
.bind(&self.tenant_id)
.bind(&self.namespace)
.execute(&mut *tx)
.await
.map_err(store_err)?;
for did in validators {
sqlx::query(
"INSERT INTO dagdb_node_validators (tenant_id, namespace, did) VALUES ($1, $2, $3)",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(did.to_string())
.execute(&mut *tx)
.await
.map_err(store_err)?;
}
tx.commit().await.map_err(store_err)?;
Ok(())
}
async fn load_validator_set_async(&self) -> DagResult<BTreeSet<Did>> {
let mut tx = self.begin().await?;
let rows: Vec<String> = sqlx::query_scalar(
"SELECT did FROM dagdb_node_validators \
WHERE tenant_id = $1 AND namespace = $2 ORDER BY did ASC",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.fetch_all(&mut *tx)
.await
.map_err(store_err)?;
tx.commit().await.map_err(store_err)?;
rows.into_iter()
.map(|did| decode_did(&did, "dagdb_node_validators.did"))
.collect()
}
async fn mark_committed_with_receipt_sync_async(
&self,
hash: Hash256,
height: u64,
receipt: TrustReceipt,
) -> DagResult<()> {
if receipt.action_hash != hash {
return Err(store_err(
"dagdb_node_trust_receipts.action_hash must match committed node hash",
));
}
let mut tx = self.begin().await?;
self.ensure_node_exists_tx(&mut tx, &hash).await?;
self.insert_committed_tx(&mut tx, &hash, height).await?;
self.insert_receipt_tx(&mut tx, &receipt).await?;
tx.commit().await.map_err(store_err)?;
Ok(())
}
async fn persist_commit_certificate_with_receipt_sync_async(
&self,
hash: Hash256,
height: u64,
cert: CommitCertificate,
receipt: TrustReceipt,
) -> DagResult<()> {
if cert.node_hash != hash {
return Err(store_err(
"dagdb_node_commit_certificates.node_hash must match committed node hash",
));
}
if receipt.action_hash != hash {
return Err(store_err(
"dagdb_node_trust_receipts.action_hash must match committed node hash",
));
}
let mut tx = self.begin().await?;
self.ensure_node_exists_tx(&mut tx, &hash).await?;
self.insert_committed_tx(&mut tx, &hash, height).await?;
self.insert_certificate_tx(&mut tx, &cert).await?;
self.insert_receipt_tx(&mut tx, &receipt).await?;
tx.commit().await.map_err(store_err)?;
Ok(())
}
async fn load_receipt_async(&self, receipt_hash: Hash256) -> DagResult<Option<TrustReceipt>> {
let mut tx = self.begin().await?;
let result: Option<Vec<u8>> = sqlx::query_scalar(
"SELECT cbor_data FROM dagdb_node_trust_receipts \
WHERE tenant_id = $1 AND namespace = $2 AND receipt_hash = $3",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(receipt_hash.as_bytes().to_vec())
.fetch_optional(&mut *tx)
.await
.map_err(store_err)?;
tx.commit().await.map_err(store_err)?;
result
.map(|data| {
ciborium::from_reader(&data[..])
.map_err(|e| store_err(format!("CBOR decode receipt: {e}")))
})
.transpose()
}
async fn load_receipts_by_actor_async(
&self,
actor_did: String,
limit: u32,
) -> DagResult<Vec<TrustReceipt>> {
let mut tx = self.begin().await?;
let rows: Vec<Vec<u8>> = sqlx::query_scalar(
"SELECT cbor_data FROM dagdb_node_trust_receipts \
WHERE tenant_id = $1 AND namespace = $2 AND actor_did = $3 \
ORDER BY timestamp_ms DESC, receipt_hash ASC LIMIT $4",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(actor_did)
.bind(i64::from(limit))
.fetch_all(&mut *tx)
.await
.map_err(store_err)?;
tx.commit().await.map_err(store_err)?;
rows.into_iter()
.map(|data| {
ciborium::from_reader(&data[..])
.map_err(|e| store_err(format!("CBOR decode receipt: {e}")))
})
.collect()
}
async fn load_recent_receipts_async(&self, limit: u32) -> DagResult<Vec<TrustReceipt>> {
let mut tx = self.begin().await?;
let rows: Vec<Vec<u8>> = sqlx::query_scalar(
"SELECT cbor_data FROM dagdb_node_trust_receipts \
WHERE tenant_id = $1 AND namespace = $2 \
ORDER BY timestamp_ms DESC, receipt_hash ASC LIMIT $3",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(i64::from(limit))
.fetch_all(&mut *tx)
.await
.map_err(store_err)?;
tx.commit().await.map_err(store_err)?;
rows.into_iter()
.map(|data| {
ciborium::from_reader(&data[..])
.map_err(|e| store_err(format!("CBOR decode receipt: {e}")))
})
.collect()
}
async fn children_async(&self, parent_hash: Hash256) -> DagResult<Vec<Hash256>> {
let mut tx = self.begin().await?;
let rows: Vec<Vec<u8>> = sqlx::query_scalar(
"SELECT child_hash FROM dagdb_node_dag_parents \
WHERE tenant_id = $1 AND namespace = $2 AND parent_hash = $3 \
ORDER BY child_hash ASC",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(parent_hash.as_bytes().to_vec())
.fetch_all(&mut *tx)
.await
.map_err(store_err)?;
tx.commit().await.map_err(store_err)?;
rows.into_iter()
.map(|bytes| decode_hash_bytes(&bytes, "dagdb_node_dag_parents.child_hash"))
.collect()
}
async fn committed_height_for_async(&self, hash: Hash256) -> DagResult<Option<u64>> {
let mut tx = self.begin().await?;
let height: Option<i64> = sqlx::query_scalar(
"SELECT height FROM dagdb_node_committed \
WHERE tenant_id = $1 AND namespace = $2 AND hash = $3",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(hash.as_bytes().to_vec())
.fetch_optional(&mut *tx)
.await
.map_err(store_err)?;
tx.commit().await.map_err(store_err)?;
height
.map(|value| sqlite_i64_to_u64(value, "dagdb_node_committed.height"))
.transpose()
}
async fn put_committed_many_with_certificates_sync_async(
&self,
nodes: Vec<(DagNode, u64)>,
certificates: Vec<CommitCertificate>,
) -> DagResult<()> {
if nodes.len() != certificates.len() {
return Err(store_err(format!(
"committed batch must include one certificate per node: got {} certificates for {} nodes",
certificates.len(),
nodes.len()
)));
}
let mut tx = self.begin().await?;
for ((node, height), certificate) in nodes.iter().zip(certificates.iter()) {
if certificate.node_hash != node.hash {
return Err(store_err(format!(
"commit certificate node_hash {} does not match DAG node hash {}",
certificate.node_hash, node.hash
)));
}
self.insert_node_tx(&mut tx, node).await?;
self.insert_committed_tx(&mut tx, &node.hash, *height)
.await?;
self.insert_certificate_tx(&mut tx, certificate).await?;
}
tx.commit().await.map_err(store_err)?;
Ok(())
}
async fn latest_economy_anchor_hash_sync_async(&self) -> DagResult<Hash256> {
let mut tx = self.begin().await?;
let value: Option<Vec<u8>> = sqlx::query_scalar(
"SELECT value FROM dagdb_node_economy_meta \
WHERE tenant_id = $1 AND namespace = $2 AND key = 'latest_anchor_hash'",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.fetch_optional(&mut *tx)
.await
.map_err(store_err)?;
tx.commit().await.map_err(store_err)?;
match value {
Some(bytes) => decode_hash_bytes(&bytes, "dagdb_node_economy_meta.latest_anchor_hash"),
None => Ok(Hash256::ZERO),
}
}
async fn latest_economy_anchor_hash_tx(
&self,
tx: &mut Transaction<'_, Postgres>,
) -> DagResult<Hash256> {
let value: Option<Vec<u8>> = sqlx::query_scalar(
"SELECT value FROM dagdb_node_economy_meta \
WHERE tenant_id = $1 AND namespace = $2 AND key = 'latest_anchor_hash'",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.fetch_optional(&mut **tx)
.await
.map_err(store_err)?;
match value {
Some(bytes) => decode_hash_bytes(&bytes, "dagdb_node_economy_meta.latest_anchor_hash"),
None => Ok(Hash256::ZERO),
}
}
async fn put_economy_object_sync_async(
&self,
object_kind: EconomyObjectKind,
object_id: Hash256,
content_hash: Hash256,
created_at: Timestamp,
object_cbor: Vec<u8>,
) -> DagResult<EconomyRecordAnchor> {
if object_id == Hash256::ZERO {
return Err(store_err("economy object_id must not be Hash256::ZERO"));
}
if content_hash == Hash256::ZERO {
return Err(store_err("economy content_hash must not be Hash256::ZERO"));
}
if created_at == Timestamp::ZERO {
return Err(store_err("economy created_at must not be Timestamp::ZERO"));
}
let mut tx = self.begin().await?;
let previous_anchor_hash = self.latest_economy_anchor_hash_tx(&mut tx).await?;
let anchor = EconomyRecordAnchor {
anchor_hash: Hash256::ZERO,
previous_anchor_hash,
object_kind,
object_id,
object_hash: content_hash,
created_at,
}
.anchor()
.map_err(store_err)?;
let anchor_cbor = encode_cbor(&anchor, "dagdb_node_economy_anchors.cbor_data")?;
let created_physical_ms = sqlite_u64_to_i64(
created_at.physical_ms,
"dagdb_node_economy.created_physical_ms",
)?;
let created_logical = sqlite_u64_to_i64(
u64::from(created_at.logical),
"dagdb_node_economy.created_logical",
)?;
sqlx::query(
"INSERT INTO dagdb_node_economy_objects (
tenant_id, namespace, object_kind, object_id, content_hash,
created_physical_ms, created_logical, cbor_data
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(object_kind.label())
.bind(object_id.as_bytes().to_vec())
.bind(content_hash.as_bytes().to_vec())
.bind(created_physical_ms)
.bind(created_logical)
.bind(object_cbor)
.execute(&mut *tx)
.await
.map_err(|e| store_err(format!("insert DAG DB economy object: {e}")))?;
sqlx::query(
"INSERT INTO dagdb_node_economy_anchors (
tenant_id, namespace, anchor_hash, previous_anchor_hash, object_kind,
object_id, object_hash, created_physical_ms, created_logical, cbor_data
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(anchor.anchor_hash.as_bytes().to_vec())
.bind(anchor.previous_anchor_hash.as_bytes().to_vec())
.bind(object_kind.label())
.bind(object_id.as_bytes().to_vec())
.bind(content_hash.as_bytes().to_vec())
.bind(created_physical_ms)
.bind(created_logical)
.bind(anchor_cbor)
.execute(&mut *tx)
.await
.map_err(|e| store_err(format!("insert DAG DB economy anchor: {e}")))?;
sqlx::query(
"INSERT INTO dagdb_node_economy_meta (tenant_id, namespace, key, value) \
VALUES ($1, $2, 'latest_anchor_hash', $3) \
ON CONFLICT (tenant_id, namespace, key) DO UPDATE SET value = EXCLUDED.value",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(anchor.anchor_hash.as_bytes().to_vec())
.execute(&mut *tx)
.await
.map_err(store_err)?;
tx.commit().await.map_err(store_err)?;
Ok(anchor)
}
async fn get_economy_object_sync_async<T>(
&self,
object_kind: EconomyObjectKind,
object_id: Hash256,
) -> DagResult<Option<T>>
where
T: DeserializeOwned,
{
let mut tx = self.begin().await?;
let result: Option<Vec<u8>> = sqlx::query_scalar(
"SELECT cbor_data FROM dagdb_node_economy_objects \
WHERE tenant_id = $1 AND namespace = $2 AND object_kind = $3 AND object_id = $4",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(object_kind.label())
.bind(object_id.as_bytes().to_vec())
.fetch_optional(&mut *tx)
.await
.map_err(store_err)?;
tx.commit().await.map_err(store_err)?;
result
.map(|bytes| decode_cbor(&bytes, "dagdb_node_economy_objects.cbor_data"))
.transpose()
}
async fn get_economy_anchor_sync_async(
&self,
anchor_hash: Hash256,
) -> DagResult<Option<EconomyRecordAnchor>> {
let mut tx = self.begin().await?;
let result: Option<Vec<u8>> = sqlx::query_scalar(
"SELECT cbor_data FROM dagdb_node_economy_anchors \
WHERE tenant_id = $1 AND namespace = $2 AND anchor_hash = $3",
)
.bind(&self.tenant_id)
.bind(&self.namespace)
.bind(anchor_hash.as_bytes().to_vec())
.fetch_optional(&mut *tx)
.await
.map_err(store_err)?;
tx.commit().await.map_err(store_err)?;
result
.map(|bytes| decode_cbor(&bytes, "dagdb_node_economy_anchors.cbor_data"))
.transpose()
}
}
impl SqliteDagStore {
#[allow(dead_code)]
pub fn open(data_dir: &Path) -> anyhow::Result<Self> {
let db_path = data_dir.join("dag.db");
let conn = Connection::open(&db_path)?;
conn.execute_batch("PRAGMA journal_mode=WAL;")?;
conn.execute_batch("PRAGMA synchronous=NORMAL;")?;
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS dag_nodes (
hash BLOB PRIMARY KEY NOT NULL,
cbor_payload BLOB NOT NULL
);
CREATE TABLE IF NOT EXISTS dag_parents (
child_hash BLOB NOT NULL,
parent_hash BLOB NOT NULL,
PRIMARY KEY (child_hash, parent_hash)
);
CREATE TABLE IF NOT EXISTS committed (
hash BLOB PRIMARY KEY NOT NULL,
height INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_parents_parent ON dag_parents(parent_hash);
CREATE INDEX IF NOT EXISTS idx_committed_height ON committed(height);
-- Persistent consensus state: survives restarts.
CREATE TABLE IF NOT EXISTS consensus_meta (
key TEXT PRIMARY KEY NOT NULL,
value TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS consensus_votes (
round INTEGER NOT NULL,
node_hash BLOB NOT NULL,
voter_did TEXT NOT NULL,
signature BLOB NOT NULL,
PRIMARY KEY (round, node_hash, voter_did)
);
CREATE TABLE IF NOT EXISTS commit_certificates (
node_hash BLOB PRIMARY KEY NOT NULL,
round INTEGER NOT NULL,
cbor_data BLOB NOT NULL
);
CREATE TABLE IF NOT EXISTS validators (
did TEXT PRIMARY KEY NOT NULL
);
CREATE TABLE IF NOT EXISTS trust_receipts (
receipt_hash BLOB PRIMARY KEY NOT NULL,
actor_did TEXT NOT NULL,
action_type TEXT NOT NULL,
outcome TEXT NOT NULL,
timestamp_ms INTEGER NOT NULL,
cbor_data BLOB NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_receipts_actor
ON trust_receipts(actor_did);
CREATE INDEX IF NOT EXISTS idx_receipts_ts
ON trust_receipts(timestamp_ms);
CREATE TABLE IF NOT EXISTS economy_objects (
object_kind TEXT NOT NULL,
object_id BLOB NOT NULL,
content_hash BLOB NOT NULL,
created_physical_ms INTEGER NOT NULL,
created_logical INTEGER NOT NULL,
cbor_data BLOB NOT NULL,
PRIMARY KEY (object_kind, object_id)
);
CREATE INDEX IF NOT EXISTS idx_economy_objects_hash
ON economy_objects(content_hash);
CREATE TABLE IF NOT EXISTS economy_anchors (
anchor_hash BLOB PRIMARY KEY NOT NULL,
previous_anchor_hash BLOB NOT NULL,
object_kind TEXT NOT NULL,
object_id BLOB NOT NULL,
object_hash BLOB NOT NULL,
created_physical_ms INTEGER NOT NULL,
created_logical INTEGER NOT NULL,
cbor_data BLOB NOT NULL
);
CREATE TABLE IF NOT EXISTS economy_meta (
key TEXT PRIMARY KEY NOT NULL,
value BLOB NOT NULL
);",
)?;
Ok(Self {
backend: NodeStoreBackend::LegacySqlite(conn),
})
}
pub fn committed_height_value(&self) -> DagResult<u64> {
if let Some(store) = self.dagdb() {
let store = store.clone();
return block_on_dagdb(async move { store.committed_height_sync_async().await });
}
self.committed_height_sync()
}
fn encode_node(node: &DagNode) -> DagResult<Vec<u8>> {
let mut buf = Vec::new();
ciborium::into_writer(node, &mut buf)
.map_err(|e| store_err(format!("CBOR encode: {e}")))?;
Ok(buf)
}
fn decode_node(bytes: &[u8]) -> DagResult<DagNode> {
ciborium::from_reader(bytes).map_err(|e| store_err(format!("CBOR decode: {e}")))
}
pub fn committed_nodes_in_range(
&self,
from_height: u64,
to_height: u64,
) -> DagResult<Vec<(Hash256, u64)>> {
if let Some(store) = self.dagdb() {
let store = store.clone();
return block_on_dagdb(async move {
store
.committed_nodes_in_range_async(from_height, to_height)
.await
});
}
let from_height = sqlite_u64_to_i64(from_height, "committed.from_height")?;
let to_height = sqlite_u64_to_i64(to_height, "committed.to_height")?;
let mut stmt = self
.sqlite_conn()?
.prepare_cached(
"SELECT hash, height FROM committed
WHERE height >= ?1 AND height <= ?2
ORDER BY height ASC",
)
.map_err(store_err)?;
let rows = stmt
.query_map(params![from_height, to_height], |row| {
let bytes: Vec<u8> = row.get(0)?;
let height: i64 = row.get(1)?;
Ok((bytes, height))
})
.map_err(store_err)?;
let mut result = Vec::new();
for row in rows {
let (bytes, height) = row.map_err(store_err)?;
result.push((
decode_hash_bytes(&bytes, "committed.hash")?,
sqlite_i64_to_u64(height, "committed.height")?,
));
}
Ok(result)
}
pub fn save_consensus_round(&mut self, round: u64) -> DagResult<()> {
if let Some(store) = self.dagdb() {
let store = store.clone();
return block_on_dagdb(async move { store.save_consensus_round_async(round).await });
}
self.sqlite_conn_mut()?
.execute(
"INSERT OR REPLACE INTO consensus_meta (key, value) VALUES ('round', ?1)",
params![round.to_string()],
)
.map_err(store_err)?;
Ok(())
}
pub fn load_consensus_round(&self) -> DagResult<u64> {
if let Some(store) = self.dagdb() {
let store = store.clone();
return block_on_dagdb(async move { store.load_consensus_round_async().await });
}
let result: Result<String, _> = self.sqlite_conn()?.query_row(
"SELECT value FROM consensus_meta WHERE key = 'round'",
[],
|row| row.get(0),
);
match result {
Ok(s) => s.parse::<u64>().map_err(store_err),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(0),
Err(e) => Err(store_err(e)),
}
}
pub fn save_vote(&mut self, vote: &Vote) -> DagResult<()> {
if let Some(store) = self.dagdb() {
let store = store.clone();
let vote = vote.clone();
return block_on_dagdb(async move { store.save_vote_async(vote).await });
}
let round = sqlite_u64_to_i64(vote.round, "consensus_votes.round")?;
validate_vote(vote, "consensus_votes")?;
let signature = validate_ed25519_signature(&vote.signature, "consensus_votes.signature")?;
self.sqlite_conn_mut()?
.execute(
"INSERT OR IGNORE INTO consensus_votes (round, node_hash, voter_did, signature) VALUES (?1, ?2, ?3, ?4)",
params![
round,
vote.node_hash.0.as_slice(),
vote.voter.to_string(),
signature,
],
)
.map_err(store_err)?;
Ok(())
}
pub fn load_votes_for_round(&self, round: u64) -> DagResult<Vec<Vote>> {
if let Some(store) = self.dagdb() {
let store = store.clone();
return block_on_dagdb(async move { store.load_votes_for_round_async(round).await });
}
let round_i64 = sqlite_u64_to_i64(round, "consensus_votes.round")?;
let mut stmt = self
.sqlite_conn()?
.prepare_cached(
"SELECT node_hash, voter_did, signature FROM consensus_votes WHERE round = ?1",
)
.map_err(store_err)?;
let rows = stmt
.query_map(params![round_i64], |row| {
let hash_bytes: Vec<u8> = row.get(0)?;
let voter_str: String = row.get(1)?;
let sig_bytes: Vec<u8> = row.get(2)?;
Ok((hash_bytes, voter_str, sig_bytes))
})
.map_err(store_err)?;
let mut votes = Vec::new();
for row in rows {
let (hash_bytes, voter_str, sig_bytes) = row.map_err(store_err)?;
votes.push(Vote {
voter: decode_did(&voter_str, "consensus_votes.voter_did")?,
round,
node_hash: decode_hash_bytes(&hash_bytes, "consensus_votes.node_hash")?,
signature: decode_signature_bytes(&sig_bytes, "consensus_votes.signature")?,
});
}
Ok(votes)
}
#[allow(dead_code)]
pub fn save_certificate(&mut self, cert: &CommitCertificate) -> DagResult<()> {
if let Some(store) = self.dagdb() {
let store = store.clone();
let cert = cert.clone();
return block_on_dagdb(async move { store.save_certificate_async(cert).await });
}
let round = sqlite_u64_to_i64(cert.round, "commit_certificates.round")?;
validate_commit_certificate(cert)?;
let mut cbor_buf = Vec::new();
ciborium::into_writer(cert, &mut cbor_buf)
.map_err(|e| store_err(format!("CBOR encode certificate: {e}")))?;
self.sqlite_conn_mut()?
.execute(
"INSERT OR IGNORE INTO commit_certificates (node_hash, round, cbor_data) VALUES (?1, ?2, ?3)",
params![
cert.node_hash.0.as_slice(),
round,
cbor_buf,
],
)
.map_err(store_err)?;
Ok(())
}
fn ensure_node_exists_tx(tx: &rusqlite::Transaction<'_>, hash: &Hash256) -> DagResult<()> {
match tx.query_row(
"SELECT 1 FROM dag_nodes WHERE hash = ?1",
params![hash.0.as_slice()],
|_| Ok(()),
) {
Ok(()) => Ok(()),
Err(rusqlite::Error::QueryReturnedNoRows) => Err(DagError::NodeNotFound(*hash)),
Err(e) => Err(store_err(format!("dag_nodes.hash presence query: {e}"))),
}
}
fn insert_committed_tx(
tx: &rusqlite::Transaction<'_>,
hash: &Hash256,
height: u64,
) -> DagResult<()> {
let height = sqlite_u64_to_i64(height, "committed.height")?;
tx.execute(
"INSERT OR REPLACE INTO committed (hash, height) VALUES (?1, ?2)",
params![hash.0.as_slice(), height],
)
.map_err(store_err)?;
Ok(())
}
fn insert_certificate_tx(
tx: &rusqlite::Transaction<'_>,
cert: &CommitCertificate,
) -> DagResult<()> {
let round = sqlite_u64_to_i64(cert.round, "commit_certificates.round")?;
validate_commit_certificate(cert)?;
let cbor_buf = encode_cbor(cert, "commit_certificates.cbor_data")?;
tx.execute(
"INSERT OR IGNORE INTO commit_certificates (node_hash, round, cbor_data) VALUES (?1, ?2, ?3)",
params![cert.node_hash.0.as_slice(), round, cbor_buf],
)
.map_err(store_err)?;
Ok(())
}
fn insert_receipt_tx(tx: &rusqlite::Transaction<'_>, receipt: &TrustReceipt) -> DagResult<()> {
validate_signature(&receipt.signature, "trust_receipts.signature")?;
let timestamp_ms =
sqlite_u64_to_i64(receipt.timestamp.physical_ms, "trust_receipts.timestamp_ms")?;
let buf = encode_cbor(receipt, "trust_receipts.cbor_data")?;
tx.execute(
"INSERT OR IGNORE INTO trust_receipts (receipt_hash, actor_did, action_type, outcome, timestamp_ms, cbor_data) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
params![
receipt.receipt_hash.0.as_slice(),
receipt.actor_did.to_string(),
receipt.action_type.as_str(),
receipt.outcome.to_string(),
timestamp_ms,
buf,
],
)
.map_err(store_err)?;
Ok(())
}
pub fn load_certificates(&self) -> DagResult<Vec<CommitCertificate>> {
if let Some(store) = self.dagdb() {
let store = store.clone();
return block_on_dagdb(async move { store.load_certificates_async().await });
}
let mut stmt = self
.sqlite_conn()?
.prepare_cached("SELECT cbor_data FROM commit_certificates ORDER BY round ASC")
.map_err(store_err)?;
let rows = stmt
.query_map([], |row| {
let bytes: Vec<u8> = row.get(0)?;
Ok(bytes)
})
.map_err(store_err)?;
let mut certs = Vec::new();
for row in rows {
let bytes = row.map_err(store_err)?;
let cert: CommitCertificate = ciborium::from_reader(bytes.as_slice())
.map_err(|e| store_err(format!("CBOR decode certificate: {e}")))?;
validate_commit_certificate(&cert)?;
certs.push(cert);
}
Ok(certs)
}
pub fn load_certificate_for_hash(
&self,
hash: &Hash256,
) -> DagResult<Option<CommitCertificate>> {
if let Some(store) = self.dagdb() {
let store = store.clone();
let hash = *hash;
return block_on_dagdb(
async move { store.load_certificate_for_hash_async(hash).await },
);
}
let mut stmt = self
.sqlite_conn()?
.prepare_cached("SELECT cbor_data FROM commit_certificates WHERE node_hash = ?1")
.map_err(store_err)?;
let result: Result<Vec<u8>, rusqlite::Error> =
stmt.query_row(params![hash.0.as_slice()], |row| row.get(0));
match result {
Ok(bytes) => {
let certificate: CommitCertificate = ciborium::from_reader(bytes.as_slice())
.map_err(|e| store_err(format!("CBOR decode certificate: {e}")))?;
if certificate.node_hash != *hash {
return Err(store_err(
"commit_certificates.node_hash does not match CBOR certificate node_hash",
));
}
validate_commit_certificate(&certificate)?;
Ok(Some(certificate))
}
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(store_err(format!("commit_certificates.cbor_data: {e}"))),
}
}
#[allow(dead_code)]
pub fn save_validator_set(&mut self, validators: &BTreeSet<Did>) -> DagResult<()> {
if let Some(store) = self.dagdb() {
let store = store.clone();
let validators = validators.clone();
return block_on_dagdb(async move { store.save_validator_set_async(validators).await });
}
let tx = self.sqlite_conn_mut()?.transaction().map_err(store_err)?;
tx.execute("DELETE FROM validators", [])
.map_err(store_err)?;
for did in validators {
tx.execute(
"INSERT INTO validators (did) VALUES (?1)",
params![did.to_string()],
)
.map_err(store_err)?;
}
tx.commit().map_err(store_err)?;
Ok(())
}
pub fn load_validator_set(&self) -> DagResult<BTreeSet<Did>> {
if let Some(store) = self.dagdb() {
let store = store.clone();
return block_on_dagdb(async move { store.load_validator_set_async().await });
}
let mut stmt = self
.sqlite_conn()?
.prepare_cached("SELECT did FROM validators ORDER BY did ASC")
.map_err(store_err)?;
let rows = stmt
.query_map([], |row| {
let did_str: String = row.get(0)?;
Ok(did_str)
})
.map_err(store_err)?;
let mut set = BTreeSet::new();
for row in rows {
let did_str = row.map_err(store_err)?;
let did = decode_did(&did_str, "validators.did")?;
set.insert(did);
}
Ok(set)
}
#[cfg(test)]
pub fn save_receipt(&mut self, receipt: &TrustReceipt) -> DagResult<()> {
if let Some(store) = self.dagdb() {
let store = store.clone();
let receipt = receipt.clone();
return block_on_dagdb(async move {
let mut tx = store.begin().await?;
store.insert_receipt_tx(&mut tx, &receipt).await?;
tx.commit().await.map_err(store_err)?;
Ok(())
});
}
validate_signature(&receipt.signature, "trust_receipts.signature")?;
let timestamp_ms =
sqlite_u64_to_i64(receipt.timestamp.physical_ms, "trust_receipts.timestamp_ms")?;
let mut buf = Vec::new();
ciborium::into_writer(receipt, &mut buf)
.map_err(|e| store_err(format!("CBOR encode receipt: {e}")))?;
self.sqlite_conn_mut()?
.execute(
"INSERT OR IGNORE INTO trust_receipts (receipt_hash, actor_did, action_type, outcome, timestamp_ms, cbor_data) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
params![
receipt.receipt_hash.0.as_slice(),
receipt.actor_did.to_string(),
receipt.action_type,
receipt.outcome.to_string(),
timestamp_ms,
buf,
],
)
.map_err(store_err)?;
Ok(())
}
pub fn mark_committed_with_receipt_sync(
&mut self,
hash: &Hash256,
height: u64,
receipt: &TrustReceipt,
) -> DagResult<()> {
if let Some(store) = self.dagdb() {
let store = store.clone();
let hash = *hash;
let receipt = receipt.clone();
return block_on_dagdb(async move {
store
.mark_committed_with_receipt_sync_async(hash, height, receipt)
.await
});
}
if receipt.action_hash != *hash {
return Err(store_err(
"trust_receipts.action_hash must match committed node hash",
));
}
let tx = self.sqlite_conn_mut()?.transaction().map_err(store_err)?;
Self::ensure_node_exists_tx(&tx, hash)?;
Self::insert_committed_tx(&tx, hash, height)?;
Self::insert_receipt_tx(&tx, receipt)?;
tx.commit().map_err(store_err)?;
Ok(())
}
pub fn persist_commit_certificate_with_receipt_sync(
&mut self,
hash: &Hash256,
height: u64,
cert: &CommitCertificate,
receipt: &TrustReceipt,
) -> DagResult<()> {
if let Some(store) = self.dagdb() {
let store = store.clone();
let hash = *hash;
let cert = cert.clone();
let receipt = receipt.clone();
return block_on_dagdb(async move {
store
.persist_commit_certificate_with_receipt_sync_async(hash, height, cert, receipt)
.await
});
}
if cert.node_hash != *hash {
return Err(store_err(
"commit_certificates.node_hash must match committed node hash",
));
}
if receipt.action_hash != *hash {
return Err(store_err(
"trust_receipts.action_hash must match committed node hash",
));
}
let tx = self.sqlite_conn_mut()?.transaction().map_err(store_err)?;
Self::ensure_node_exists_tx(&tx, hash)?;
Self::insert_committed_tx(&tx, hash, height)?;
Self::insert_certificate_tx(&tx, cert)?;
Self::insert_receipt_tx(&tx, receipt)?;
tx.commit().map_err(store_err)?;
Ok(())
}
pub fn load_receipt(
&self,
receipt_hash: &Hash256,
) -> DagResult<Option<exo_core::types::TrustReceipt>> {
if let Some(store) = self.dagdb() {
let store = store.clone();
let receipt_hash = *receipt_hash;
return block_on_dagdb(async move { store.load_receipt_async(receipt_hash).await });
}
let mut stmt = self
.sqlite_conn()?
.prepare_cached("SELECT cbor_data FROM trust_receipts WHERE receipt_hash = ?1")
.map_err(store_err)?;
let mut rows = stmt
.query_map(params![receipt_hash.0.as_slice()], |row| {
let data: Vec<u8> = row.get(0)?;
Ok(data)
})
.map_err(store_err)?;
match rows.next() {
Some(row) => {
let data = row.map_err(store_err)?;
let receipt: exo_core::types::TrustReceipt = ciborium::from_reader(&data[..])
.map_err(|e| store_err(format!("CBOR decode receipt: {e}")))?;
Ok(Some(receipt))
}
None => Ok(None),
}
}
pub fn load_receipts_by_actor(
&self,
actor_did: &str,
limit: u32,
) -> DagResult<Vec<exo_core::types::TrustReceipt>> {
if let Some(store) = self.dagdb() {
let store = store.clone();
let actor_did = actor_did.to_owned();
return block_on_dagdb(async move {
store.load_receipts_by_actor_async(actor_did, limit).await
});
}
let mut stmt = self
.sqlite_conn()?
.prepare_cached(
"SELECT cbor_data FROM trust_receipts WHERE actor_did = ?1 ORDER BY timestamp_ms DESC LIMIT ?2",
)
.map_err(store_err)?;
let rows = stmt
.query_map(params![actor_did, limit], |row| {
let data: Vec<u8> = row.get(0)?;
Ok(data)
})
.map_err(store_err)?;
let mut receipts = Vec::new();
for row in rows {
let data = row.map_err(store_err)?;
let receipt: exo_core::types::TrustReceipt = ciborium::from_reader(&data[..])
.map_err(|e| store_err(format!("CBOR decode receipt: {e}")))?;
receipts.push(receipt);
}
Ok(receipts)
}
pub fn load_recent_receipts(
&self,
limit: u32,
) -> DagResult<Vec<exo_core::types::TrustReceipt>> {
if let Some(store) = self.dagdb() {
let store = store.clone();
return block_on_dagdb(async move { store.load_recent_receipts_async(limit).await });
}
let mut stmt = self
.sqlite_conn()?
.prepare_cached(
"SELECT cbor_data FROM trust_receipts
ORDER BY timestamp_ms DESC, receipt_hash ASC
LIMIT ?1",
)
.map_err(store_err)?;
let rows = stmt
.query_map(params![limit], |row| {
let data: Vec<u8> = row.get(0)?;
Ok(data)
})
.map_err(store_err)?;
let mut receipts = Vec::new();
for row in rows {
let data = row.map_err(store_err)?;
let receipt: exo_core::types::TrustReceipt = ciborium::from_reader(&data[..])
.map_err(|e| store_err(format!("CBOR decode receipt: {e}")))?;
receipts.push(receipt);
}
Ok(receipts)
}
pub fn children(&self, parent_hash: &Hash256) -> DagResult<Vec<Hash256>> {
if let Some(store) = self.dagdb() {
let store = store.clone();
let parent_hash = *parent_hash;
return block_on_dagdb(async move { store.children_async(parent_hash).await });
}
let mut stmt = self
.sqlite_conn()?
.prepare_cached("SELECT child_hash FROM dag_parents WHERE parent_hash = ?1")
.map_err(store_err)?;
let rows = stmt
.query_map(params![parent_hash.0.as_slice()], |row| {
let bytes: Vec<u8> = row.get(0)?;
Ok(bytes)
})
.map_err(store_err)?;
let mut result = Vec::new();
for row in rows {
let bytes = row.map_err(store_err)?;
result.push(decode_hash_bytes(&bytes, "dag_parents.child_hash")?);
}
Ok(result)
}
#[allow(dead_code)]
pub fn is_committed(&self, hash: &Hash256) -> DagResult<bool> {
if let Some(store) = self.dagdb() {
let store = store.clone();
let hash = *hash;
return block_on_dagdb(async move {
Ok(store.committed_height_for_async(hash).await?.is_some())
});
}
let mut stmt = self
.sqlite_conn()?
.prepare_cached("SELECT 1 FROM committed WHERE hash = ?1")
.map_err(store_err)?;
match stmt.query_row(params![hash.0.as_slice()], |_| Ok(())) {
Ok(()) => Ok(true),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(false),
Err(e) => Err(store_err(format!("committed.hash presence query: {e}"))),
}
}
pub fn committed_height_for(&self, hash: &Hash256) -> DagResult<Option<u64>> {
if let Some(store) = self.dagdb() {
let store = store.clone();
let hash = *hash;
return block_on_dagdb(async move { store.committed_height_for_async(hash).await });
}
let mut stmt = self
.sqlite_conn()?
.prepare_cached("SELECT height FROM committed WHERE hash = ?1")
.map_err(store_err)?;
match stmt.query_row(params![hash.0.as_slice()], |row| {
let h: i64 = row.get(0)?;
Ok(h)
}) {
Ok(h) => Ok(Some(sqlite_i64_to_u64(h, "committed.height")?)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(store_err(e)),
}
}
pub fn committed_dag_nodes_in_range(
&self,
from_height: u64,
to_height: u64,
) -> DagResult<Vec<DagNode>> {
let committed = self.committed_nodes_in_range(from_height, to_height)?;
let mut nodes = Vec::with_capacity(committed.len());
for (hash, _height) in committed {
if let Some(node) = self.get_sync(&hash)? {
nodes.push(node);
}
}
Ok(nodes)
}
}
impl SqliteDagStore {
fn insert_node_tx(tx: &rusqlite::Transaction<'_>, node: &DagNode) -> DagResult<()> {
let cbor = Self::encode_node(node)?;
tx.execute(
"INSERT OR IGNORE INTO dag_nodes (hash, cbor_payload) VALUES (?1, ?2)",
params![node.hash.0.as_slice(), cbor],
)
.map_err(store_err)?;
for parent in &node.parents {
tx.execute(
"INSERT OR IGNORE INTO dag_parents (child_hash, parent_hash) VALUES (?1, ?2)",
params![node.hash.0.as_slice(), parent.0.as_slice()],
)
.map_err(store_err)?;
}
Ok(())
}
pub fn get_sync(&self, hash: &Hash256) -> DagResult<Option<DagNode>> {
if let Some(store) = self.dagdb() {
let store = store.clone();
let hash = *hash;
return block_on_dagdb(async move { store.get_sync_async(&hash).await });
}
let mut stmt = self
.sqlite_conn()?
.prepare_cached("SELECT cbor_payload FROM dag_nodes WHERE hash = ?1")
.map_err(store_err)?;
let result: Result<Vec<u8>, rusqlite::Error> =
stmt.query_row(params![hash.0.as_slice()], |row| row.get(0));
match result {
Ok(bytes) => Ok(Some(Self::decode_node(&bytes)?)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(store_err(format!("dag_nodes.cbor_payload: {e}"))),
}
}
pub fn put_sync(&mut self, node: DagNode) -> DagResult<()> {
self.put_many_sync(&[node])
}
pub fn put_many_sync(&mut self, nodes: &[DagNode]) -> DagResult<()> {
if let Some(store) = self.dagdb() {
let store = store.clone();
let nodes = nodes.to_vec();
return block_on_dagdb(async move { store.put_many_sync_async(nodes).await });
}
let tx = self.sqlite_conn_mut()?.transaction().map_err(store_err)?;
for node in nodes {
Self::insert_node_tx(&tx, node)?;
}
tx.commit().map_err(store_err)?;
Ok(())
}
pub fn put_committed_node_with_receipt_sync(
&mut self,
node: &DagNode,
height: u64,
receipt: &TrustReceipt,
) -> DagResult<()> {
if let Some(store) = self.dagdb() {
let store = store.clone();
let node = node.clone();
let receipt = receipt.clone();
return block_on_dagdb(async move {
store
.put_committed_node_with_receipt_sync_async(node, height, receipt)
.await
});
}
if receipt.action_hash != node.hash {
return Err(store_err(
"trust_receipts.action_hash must match committed node hash",
));
}
let tx = self.sqlite_conn_mut()?.transaction().map_err(store_err)?;
Self::insert_node_tx(&tx, node)?;
Self::insert_committed_tx(&tx, &node.hash, height)?;
Self::insert_receipt_tx(&tx, receipt)?;
tx.commit().map_err(store_err)?;
Ok(())
}
pub fn put_committed_many_with_certificates_sync(
&mut self,
nodes: &[(DagNode, u64)],
certificates: &[CommitCertificate],
) -> DagResult<()> {
if let Some(store) = self.dagdb() {
let store = store.clone();
let nodes = nodes.to_vec();
let certificates = certificates.to_vec();
return block_on_dagdb(async move {
store
.put_committed_many_with_certificates_sync_async(nodes, certificates)
.await
});
}
if nodes.len() != certificates.len() {
return Err(store_err(format!(
"committed batch must include one certificate per node: got {} certificates for {} nodes",
certificates.len(),
nodes.len()
)));
}
let tx = self.sqlite_conn_mut()?.transaction().map_err(store_err)?;
for ((node, height), certificate) in nodes.iter().zip(certificates) {
if certificate.node_hash != node.hash {
return Err(store_err(format!(
"commit certificate node_hash {} does not match DAG node hash {}",
certificate.node_hash, node.hash
)));
}
Self::insert_node_tx(&tx, node)?;
Self::insert_committed_tx(&tx, &node.hash, *height)?;
Self::insert_certificate_tx(&tx, certificate)?;
}
tx.commit().map_err(store_err)?;
Ok(())
}
#[allow(dead_code)]
pub fn contains_sync(&self, hash: &Hash256) -> DagResult<bool> {
if let Some(store) = self.dagdb() {
let store = store.clone();
let hash = *hash;
return block_on_dagdb(async move { store.contains_sync_async(&hash).await });
}
let mut stmt = self
.sqlite_conn()?
.prepare_cached("SELECT 1 FROM dag_nodes WHERE hash = ?1")
.map_err(store_err)?;
match stmt.query_row(params![hash.0.as_slice()], |_| Ok(())) {
Ok(()) => Ok(true),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(false),
Err(e) => Err(store_err(format!("dag_nodes.hash presence query: {e}"))),
}
}
pub fn tips_sync(&self) -> DagResult<Vec<Hash256>> {
if let Some(store) = self.dagdb() {
let store = store.clone();
return block_on_dagdb(async move { store.tips_sync_async().await });
}
let mut stmt = self
.sqlite_conn()?
.prepare_cached(
"SELECT hash FROM dag_nodes
WHERE hash NOT IN (SELECT parent_hash FROM dag_parents)
ORDER BY hash",
)
.map_err(store_err)?;
let rows = stmt
.query_map([], |row| {
let bytes: Vec<u8> = row.get(0)?;
Ok(bytes)
})
.map_err(store_err)?;
let mut tips = Vec::new();
for row in rows {
let bytes = row.map_err(store_err)?;
tips.push(decode_hash_bytes(&bytes, "dag_nodes.hash")?);
}
Ok(tips)
}
pub fn committed_height_sync(&self) -> DagResult<u64> {
if let Some(store) = self.dagdb() {
let store = store.clone();
return block_on_dagdb(async move { store.committed_height_sync_async().await });
}
let mut stmt = self
.sqlite_conn()?
.prepare_cached("SELECT COALESCE(MAX(height), 0) FROM committed")
.map_err(store_err)?;
let height: i64 = stmt.query_row([], |row| row.get(0)).map_err(store_err)?;
sqlite_i64_to_u64(height, "committed.height")
}
#[allow(dead_code)]
pub fn mark_committed_sync(&mut self, hash: &Hash256, height: u64) -> DagResult<()> {
if let Some(store) = self.dagdb() {
let store = store.clone();
let hash = *hash;
return block_on_dagdb(
async move { store.mark_committed_sync_async(hash, height).await },
);
}
if !self.contains_sync(hash)? {
return Err(DagError::NodeNotFound(*hash));
}
let height = sqlite_u64_to_i64(height, "committed.height")?;
self.sqlite_conn_mut()?
.execute(
"INSERT OR REPLACE INTO committed (hash, height) VALUES (?1, ?2)",
params![hash.0.as_slice(), height],
)
.map_err(store_err)?;
Ok(())
}
fn latest_economy_anchor_hash_tx(tx: &rusqlite::Transaction<'_>) -> DagResult<Hash256> {
let result: Result<Vec<u8>, rusqlite::Error> = tx.query_row(
"SELECT value FROM economy_meta WHERE key = 'latest_anchor_hash'",
[],
|row| row.get(0),
);
match result {
Ok(bytes) => decode_hash_bytes(&bytes, "economy_meta.latest_anchor_hash"),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(Hash256::ZERO),
Err(e) => Err(store_err(format!("economy_meta.latest_anchor_hash: {e}"))),
}
}
pub fn latest_economy_anchor_hash_sync(&self) -> DagResult<Hash256> {
if let Some(store) = self.dagdb() {
let store = store.clone();
return block_on_dagdb(
async move { store.latest_economy_anchor_hash_sync_async().await },
);
}
let result: Result<Vec<u8>, rusqlite::Error> = self.sqlite_conn()?.query_row(
"SELECT value FROM economy_meta WHERE key = 'latest_anchor_hash'",
[],
|row| row.get(0),
);
match result {
Ok(bytes) => decode_hash_bytes(&bytes, "economy_meta.latest_anchor_hash"),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(Hash256::ZERO),
Err(e) => Err(store_err(format!("economy_meta.latest_anchor_hash: {e}"))),
}
}
pub fn put_economy_object_sync<T: Serialize>(
&mut self,
object_kind: EconomyObjectKind,
object_id: &Hash256,
content_hash: &Hash256,
created_at: Timestamp,
object: &T,
) -> DagResult<EconomyRecordAnchor> {
if let Some(store) = self.dagdb() {
let store = store.clone();
let object_id = *object_id;
let content_hash = *content_hash;
let object_cbor = encode_cbor(object, "dagdb_node_economy_objects.cbor_data")?;
return block_on_dagdb(async move {
store
.put_economy_object_sync_async(
object_kind,
object_id,
content_hash,
created_at,
object_cbor,
)
.await
});
}
if *object_id == Hash256::ZERO {
return Err(store_err("economy object_id must not be Hash256::ZERO"));
}
if *content_hash == Hash256::ZERO {
return Err(store_err("economy content_hash must not be Hash256::ZERO"));
}
if created_at == Timestamp::ZERO {
return Err(store_err("economy created_at must not be Timestamp::ZERO"));
}
let object_cbor = encode_cbor(object, "economy_objects.cbor_data")?;
let tx = self.sqlite_conn_mut()?.transaction().map_err(store_err)?;
let previous_anchor_hash = Self::latest_economy_anchor_hash_tx(&tx)?;
let anchor = EconomyRecordAnchor {
anchor_hash: Hash256::ZERO,
previous_anchor_hash,
object_kind,
object_id: *object_id,
object_hash: *content_hash,
created_at,
}
.anchor()
.map_err(store_err)?;
let anchor_cbor = encode_cbor(&anchor, "economy_anchors.cbor_data")?;
let created_physical_ms =
sqlite_u64_to_i64(created_at.physical_ms, "economy.created_physical_ms")?;
let created_logical =
sqlite_u64_to_i64(u64::from(created_at.logical), "economy.created_logical")?;
tx.execute(
"INSERT INTO economy_objects (
object_kind, object_id, content_hash, created_physical_ms,
created_logical, cbor_data
) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
params![
object_kind.label(),
object_id.0.as_slice(),
content_hash.0.as_slice(),
created_physical_ms,
created_logical,
object_cbor
],
)
.map_err(|e| store_err(format!("insert economy object: {e}")))?;
tx.execute(
"INSERT INTO economy_anchors (
anchor_hash, previous_anchor_hash, object_kind, object_id,
object_hash, created_physical_ms, created_logical, cbor_data
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
params![
anchor.anchor_hash.0.as_slice(),
anchor.previous_anchor_hash.0.as_slice(),
object_kind.label(),
object_id.0.as_slice(),
content_hash.0.as_slice(),
created_physical_ms,
created_logical,
anchor_cbor
],
)
.map_err(|e| store_err(format!("insert economy anchor: {e}")))?;
tx.execute(
"INSERT OR REPLACE INTO economy_meta (key, value)
VALUES ('latest_anchor_hash', ?1)",
params![anchor.anchor_hash.0.as_slice()],
)
.map_err(store_err)?;
tx.commit().map_err(store_err)?;
Ok(anchor)
}
pub fn get_economy_object_sync<T: DeserializeOwned + Send + 'static>(
&self,
object_kind: EconomyObjectKind,
object_id: &Hash256,
) -> DagResult<Option<T>> {
if let Some(store) = self.dagdb() {
let store = store.clone();
let object_id = *object_id;
return block_on_dagdb(async move {
store
.get_economy_object_sync_async(object_kind, object_id)
.await
});
}
let result: Result<Vec<u8>, rusqlite::Error> = self.sqlite_conn()?.query_row(
"SELECT cbor_data FROM economy_objects
WHERE object_kind = ?1 AND object_id = ?2",
params![object_kind.label(), object_id.0.as_slice()],
|row| row.get(0),
);
match result {
Ok(bytes) => Ok(Some(decode_cbor(&bytes, "economy_objects.cbor_data")?)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(store_err(format!("economy_objects.cbor_data: {e}"))),
}
}
pub fn get_economy_anchor_sync(
&self,
anchor_hash: &Hash256,
) -> DagResult<Option<EconomyRecordAnchor>> {
if let Some(store) = self.dagdb() {
let store = store.clone();
let anchor_hash = *anchor_hash;
return block_on_dagdb(async move {
store.get_economy_anchor_sync_async(anchor_hash).await
});
}
let result: Result<Vec<u8>, rusqlite::Error> = self.sqlite_conn()?.query_row(
"SELECT cbor_data FROM economy_anchors WHERE anchor_hash = ?1",
params![anchor_hash.0.as_slice()],
|row| row.get(0),
);
match result {
Ok(bytes) => Ok(Some(decode_cbor(&bytes, "economy_anchors.cbor_data")?)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(store_err(format!("economy_anchors.cbor_data: {e}"))),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use std::collections::BTreeSet;
use exo_core::types::{Did, Signature};
use exo_dag::dag::{Dag, DeterministicDagClock, append};
use exo_economy::{
EconomyObjectKind, LegacyReceipt, Mission, apex_velocity_catalyst_client_services_mission,
archon_exoforge_legacy_receipt,
};
use super::*;
type SignFn = Box<dyn Fn(&[u8]) -> Signature>;
fn make_sign_fn() -> SignFn {
Box::new(|data: &[u8]| {
let h = blake3::hash(data);
let mut sig = [0u8; 64];
sig[..32].copy_from_slice(h.as_bytes());
Signature::from_bytes(sig)
})
}
fn make_test_node() -> DagNode {
let mut dag = Dag::new();
let mut clock = DeterministicDagClock::new();
let creator = Did::new("did:exo:test").expect("valid");
let sign_fn = make_sign_fn();
append(&mut dag, &[], b"genesis", &creator, &*sign_fn, &mut clock).unwrap()
}
fn commit_certificate_for(hash: Hash256, round: u64) -> CommitCertificate {
CommitCertificate {
node_hash: hash,
round,
votes: vec![Vote {
voter: Did::new("did:exo:v0").unwrap(),
round,
node_hash: hash,
signature: Signature::from_bytes([7u8; 64]),
}],
}
}
fn temp_store() -> SqliteDagStore {
let dir = tempfile::tempdir().unwrap();
SqliteDagStore::open(dir.path()).unwrap()
}
#[test]
fn production_store_source_does_not_suppress_or_use_truncating_sqlite_integer_casts() {
let source = include_str!("store.rs");
let production = source
.split("\n#[cfg(test)]")
.next()
.expect("production section");
assert!(
!production.contains("clippy::as_conversions"),
"production store source must not suppress integer conversion lints"
);
assert!(
!production.contains("value as u64"),
"SQLite INTEGER conversion must use checked conversion, not an as cast"
);
}
#[test]
fn production_store_presence_checks_do_not_squash_sqlite_errors() {
let source = include_str!("store.rs");
let production = source
.split("\n#[cfg(test)]")
.next()
.expect("production section");
assert!(
!production.contains(
".query_row(params![hash.0.as_slice()], |_| Ok(()))\n .is_ok()"
),
"presence checks must distinguish missing rows from SQLite read errors"
);
assert!(
production.contains("Err(rusqlite::Error::QueryReturnedNoRows) => Ok(false)"),
"missing rows may map to false, but other SQLite errors must propagate"
);
}
#[test]
fn new_store_is_empty() {
let store = temp_store();
assert_eq!(store.committed_height_sync().unwrap(), 0);
assert!(store.tips_sync().unwrap().is_empty());
}
#[test]
fn put_and_get() {
let mut store = temp_store();
let node = make_test_node();
store.put_sync(node.clone()).unwrap();
let retrieved = store.get_sync(&node.hash).unwrap();
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().hash, node.hash);
}
#[test]
fn economy_object_persistence_round_trips_and_hash_links_anchors() {
let mut store = temp_store();
let mission = apex_velocity_catalyst_client_services_mission(Some(1_000_000)).unwrap();
let mission_anchor = store
.put_economy_object_sync(
EconomyObjectKind::Mission,
&mission.mission_id,
&mission.content_hash,
mission.created_at,
&mission,
)
.unwrap();
let legacy = archon_exoforge_legacy_receipt().unwrap();
let legacy_anchor = store
.put_economy_object_sync(
EconomyObjectKind::LegacyReceipt,
&legacy.legacy_receipt_id,
&legacy.content_hash,
legacy.created_at,
&legacy,
)
.unwrap();
let loaded_mission: Mission = store
.get_economy_object_sync(EconomyObjectKind::Mission, &mission.mission_id)
.unwrap()
.unwrap();
let loaded_legacy: LegacyReceipt = store
.get_economy_object_sync(EconomyObjectKind::LegacyReceipt, &legacy.legacy_receipt_id)
.unwrap()
.unwrap();
assert_eq!(loaded_mission, mission);
assert_eq!(loaded_legacy, legacy);
assert_eq!(mission_anchor.previous_anchor_hash, Hash256::ZERO);
assert_eq!(
legacy_anchor.previous_anchor_hash,
mission_anchor.anchor_hash
);
assert_eq!(
store.latest_economy_anchor_hash_sync().unwrap(),
legacy_anchor.anchor_hash
);
assert_eq!(
store
.get_economy_anchor_sync(&legacy_anchor.anchor_hash)
.unwrap(),
Some(legacy_anchor)
);
}
#[test]
fn economy_object_persistence_rejects_duplicate_or_zero_ids() {
let mut store = temp_store();
let mission = apex_velocity_catalyst_client_services_mission(None).unwrap();
store
.put_economy_object_sync(
EconomyObjectKind::Mission,
&mission.mission_id,
&mission.content_hash,
mission.created_at,
&mission,
)
.unwrap();
assert!(
store
.put_economy_object_sync(
EconomyObjectKind::Mission,
&mission.mission_id,
&mission.content_hash,
mission.created_at,
&mission,
)
.is_err()
);
assert!(
store
.put_economy_object_sync(
EconomyObjectKind::Mission,
&Hash256::ZERO,
&mission.content_hash,
mission.created_at,
&mission,
)
.is_err()
);
}
#[test]
fn get_nonexistent() {
let store = temp_store();
let result = store.get_sync(&Hash256::ZERO).unwrap();
assert!(result.is_none());
}
#[test]
fn get_sync_propagates_payload_read_errors() {
let store = temp_store();
let mut hash = [0u8; 32];
hash[0] = 0xD0;
let hash = Hash256::from_bytes(hash);
store
.sqlite_conn()
.unwrap()
.execute(
"INSERT INTO dag_nodes (hash, cbor_payload) VALUES (?1, ?2)",
rusqlite::params![hash.0.as_slice(), 7_i64],
)
.unwrap();
let err = store.get_sync(&hash).unwrap_err();
assert!(
err.to_string().contains("dag_nodes.cbor_payload"),
"malformed persisted payload must surface as a store error"
);
}
#[test]
fn contains() {
let mut store = temp_store();
let node = make_test_node();
assert!(!store.contains_sync(&node.hash).unwrap());
store.put_sync(node.clone()).unwrap();
assert!(store.contains_sync(&node.hash).unwrap());
}
#[test]
fn tips_single_node() {
let mut store = temp_store();
let node = make_test_node();
store.put_sync(node.clone()).unwrap();
let t = store.tips_sync().unwrap();
assert_eq!(t, vec![node.hash]);
}
#[test]
fn tips_with_children() {
let mut dag = Dag::new();
let mut clock = DeterministicDagClock::new();
let creator = Did::new("did:exo:test").expect("valid");
let sign_fn = make_sign_fn();
let genesis = append(&mut dag, &[], b"genesis", &creator, &*sign_fn, &mut clock).unwrap();
let child = append(
&mut dag,
&[genesis.hash],
b"child",
&creator,
&*sign_fn,
&mut clock,
)
.unwrap();
let mut store = temp_store();
store.put_sync(genesis).unwrap();
store.put_sync(child.clone()).unwrap();
let t = store.tips_sync().unwrap();
assert_eq!(t, vec![child.hash]);
}
#[test]
fn committed_height_tracking() {
let mut store = temp_store();
let node = make_test_node();
store.put_sync(node.clone()).unwrap();
assert_eq!(store.committed_height_sync().unwrap(), 0);
store.mark_committed_sync(&node.hash, 1).unwrap();
assert_eq!(store.committed_height_sync().unwrap(), 1);
}
#[test]
fn mark_committed_nonexistent_fails() {
let mut store = temp_store();
let err = store.mark_committed_sync(&Hash256::ZERO, 1).unwrap_err();
assert!(matches!(err, DagError::NodeNotFound(_)));
}
#[test]
fn consensus_round_persistence() {
let mut store = temp_store();
assert_eq!(store.load_consensus_round().unwrap(), 0);
store.save_consensus_round(42).unwrap();
assert_eq!(store.load_consensus_round().unwrap(), 42);
store.save_consensus_round(100).unwrap();
assert_eq!(store.load_consensus_round().unwrap(), 100);
}
#[test]
fn load_consensus_round_propagates_store_errors() {
let store = temp_store();
store
.sqlite_conn()
.unwrap()
.execute("DROP TABLE consensus_meta", [])
.unwrap();
let err = store.load_consensus_round().unwrap_err();
assert!(err.to_string().contains("consensus_meta"));
}
#[test]
fn vote_persistence_roundtrip() {
use exo_core::types::Signature;
let mut store = temp_store();
let did = Did::new("did:exo:voter1").unwrap();
let mut hash = [0u8; 32];
hash[0] = 0xAB;
let vote = Vote {
voter: did.clone(),
round: 5,
node_hash: Hash256::from_bytes(hash),
signature: Signature::from_bytes([7u8; 64]),
};
store.save_vote(&vote).unwrap();
let loaded = store.load_votes_for_round(5).unwrap();
assert_eq!(loaded.len(), 1);
assert_eq!(loaded[0].voter, did);
assert_eq!(loaded[0].round, 5);
assert_eq!(loaded[0].node_hash, Hash256::from_bytes(hash));
let empty = store.load_votes_for_round(6).unwrap();
assert!(empty.is_empty());
}
#[test]
fn load_votes_for_round_rejects_short_hash() {
let store = temp_store();
store
.sqlite_conn()
.unwrap()
.execute(
"INSERT INTO consensus_votes (round, node_hash, voter_did, signature)
VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![7_i64, vec![0xABu8; 31], "did:exo:voter1", vec![7u8; 64]],
)
.unwrap();
let err = store.load_votes_for_round(7).unwrap_err();
assert!(err.to_string().contains("consensus_votes.node_hash"));
}
#[test]
fn load_votes_for_round_rejects_short_signature() {
let store = temp_store();
let mut hash = [0u8; 32];
hash[0] = 0xAB;
store
.sqlite_conn()
.unwrap()
.execute(
"INSERT INTO consensus_votes (round, node_hash, voter_did, signature)
VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![7_i64, hash.as_slice(), "did:exo:voter1", vec![7u8; 63]],
)
.unwrap();
let err = store.load_votes_for_round(7).unwrap_err();
assert!(err.to_string().contains("consensus_votes.signature"));
}
#[test]
fn load_votes_for_round_rejects_zero_signature() {
let store = temp_store();
let mut hash = [0u8; 32];
hash[0] = 0xAB;
store
.sqlite_conn()
.unwrap()
.execute(
"INSERT INTO consensus_votes (round, node_hash, voter_did, signature)
VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![7_i64, hash.as_slice(), "did:exo:voter1", vec![0u8; 64]],
)
.unwrap();
let err = store.load_votes_for_round(7).unwrap_err();
assert!(err.to_string().contains("consensus_votes.signature"));
}
#[test]
fn load_votes_for_round_rejects_invalid_voter_did() {
let store = temp_store();
let mut hash = [0u8; 32];
hash[0] = 0xAB;
store
.sqlite_conn()
.unwrap()
.execute(
"INSERT INTO consensus_votes (round, node_hash, voter_did, signature)
VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![7_i64, hash.as_slice(), "not-a-did", vec![7u8; 64]],
)
.unwrap();
let err = store.load_votes_for_round(7).unwrap_err();
assert!(err.to_string().contains("consensus_votes.voter_did"));
}
#[test]
fn save_vote_rejects_empty_signature() {
let mut store = temp_store();
let vote = Vote {
voter: Did::new("did:exo:voter1").unwrap(),
round: 5,
node_hash: Hash256::digest(b"vote-target"),
signature: Signature::from_bytes([0u8; 64]),
};
let err = store.save_vote(&vote).unwrap_err();
assert!(err.to_string().contains("consensus_votes.signature"));
}
#[test]
fn save_vote_rejects_signature_variants_that_cannot_roundtrip() {
let mut store = temp_store();
let vote = Vote {
voter: Did::new("did:exo:voter1").unwrap(),
round: 5,
node_hash: Hash256::digest(b"vote-target"),
signature: Signature::PostQuantum(vec![7u8; 64]),
};
let err = store.save_vote(&vote).unwrap_err();
assert!(err.to_string().contains("consensus_votes.signature"));
}
#[test]
fn save_vote_rejects_rounds_that_do_not_fit_sqlite_integer() {
let mut store = temp_store();
let vote = Vote {
voter: Did::new("did:exo:voter1").unwrap(),
round: u64::MAX,
node_hash: Hash256::digest(b"vote-target"),
signature: Signature::from_bytes([7u8; 64]),
};
let err = store.save_vote(&vote).unwrap_err();
assert!(err.to_string().contains("consensus_votes.round"));
}
#[test]
fn certificate_persistence_roundtrip() {
let mut store = temp_store();
let mut hash = [0u8; 32];
hash[0] = 0xCD;
let cert = commit_certificate_for(Hash256::from_bytes(hash), 3);
store.save_certificate(&cert).unwrap();
let loaded = store.load_certificates().unwrap();
assert_eq!(loaded.len(), 1);
assert_eq!(loaded[0].round, 3);
assert_eq!(loaded[0].node_hash, Hash256::from_bytes(hash));
assert_eq!(loaded[0].votes.len(), 1);
}
#[test]
fn load_certificate_for_hash_returns_matching_certificate_only() {
let mut store = temp_store();
let hash = Hash256::digest(b"cert-target");
let cert = commit_certificate_for(hash, 3);
store.save_certificate(&cert).unwrap();
let loaded = store
.load_certificate_for_hash(&hash)
.unwrap()
.expect("certificate should exist");
assert_eq!(loaded, cert);
assert!(
store
.load_certificate_for_hash(&Hash256::digest(b"missing-cert"))
.unwrap()
.is_none()
);
}
#[test]
fn load_certificate_for_hash_rejects_cbor_node_hash_mismatch() {
let store = temp_store();
let row_hash = Hash256::digest(b"row-node");
let cert = commit_certificate_for(Hash256::digest(b"cbor-node"), 3);
let mut cbor = Vec::new();
ciborium::into_writer(&cert, &mut cbor).unwrap();
store
.sqlite_conn()
.unwrap()
.execute(
"INSERT INTO commit_certificates (node_hash, round, cbor_data)
VALUES (?1, ?2, ?3)",
rusqlite::params![row_hash.0.as_slice(), 3_i64, cbor],
)
.unwrap();
let err = store.load_certificate_for_hash(&row_hash).unwrap_err();
assert!(err.to_string().contains("CBOR certificate node_hash"));
}
#[test]
fn put_committed_many_with_certificates_persists_finality_rows() {
let mut store = temp_store();
let node = make_test_node();
let certificate = commit_certificate_for(node.hash, 1);
store
.put_committed_many_with_certificates_sync(
&[(node.clone(), 1)],
std::slice::from_ref(&certificate),
)
.unwrap();
assert!(store.contains_sync(&node.hash).unwrap());
assert!(store.is_committed(&node.hash).unwrap());
assert_eq!(
store.load_certificate_for_hash(&node.hash).unwrap(),
Some(certificate)
);
}
#[test]
fn put_committed_many_with_certificates_rejects_mismatched_certificate_without_partial_rows() {
let mut store = temp_store();
let node = make_test_node();
let certificate = commit_certificate_for(Hash256::digest(b"wrong-node"), 1);
let err = store
.put_committed_many_with_certificates_sync(&[(node.clone(), 1)], &[certificate])
.unwrap_err();
assert!(err.to_string().contains("does not match DAG node hash"));
assert!(!store.contains_sync(&node.hash).unwrap());
assert!(!store.is_committed(&node.hash).unwrap());
assert!(store.load_certificates().unwrap().is_empty());
}
#[test]
fn put_committed_many_with_certificates_rolls_back_when_certificate_is_rejected() {
let mut store = temp_store();
let node = make_test_node();
let mut certificate = commit_certificate_for(node.hash, 1);
certificate.votes[0].signature = Signature::Empty;
let err = store
.put_committed_many_with_certificates_sync(&[(node.clone(), 1)], &[certificate])
.unwrap_err();
assert!(
err.to_string()
.contains("commit_certificates.votes[0].signature")
);
assert!(!store.contains_sync(&node.hash).unwrap());
assert!(!store.is_committed(&node.hash).unwrap());
assert!(store.load_certificates().unwrap().is_empty());
}
#[test]
fn save_certificate_rejects_empty_vote_signature() {
let mut store = temp_store();
let hash = Hash256::digest(b"cert-target");
let cert = CommitCertificate {
node_hash: hash,
round: 3,
votes: vec![Vote {
voter: Did::new("did:exo:v0").unwrap(),
round: 3,
node_hash: hash,
signature: Signature::Empty,
}],
};
let err = store.save_certificate(&cert).unwrap_err();
assert!(
err.to_string()
.contains("commit_certificates.votes[0].signature")
);
}
#[test]
fn save_certificate_rejects_signature_variants_that_cannot_verify() {
let mut store = temp_store();
let hash = Hash256::digest(b"cert-target");
let cert = CommitCertificate {
node_hash: hash,
round: 3,
votes: vec![Vote {
voter: Did::new("did:exo:v0").unwrap(),
round: 3,
node_hash: hash,
signature: Signature::PostQuantum(vec![7u8; 64]),
}],
};
let err = store.save_certificate(&cert).unwrap_err();
assert!(
err.to_string()
.contains("commit_certificates.votes[0].signature")
);
}
#[test]
fn load_certificates_rejects_empty_vote_signature() {
let store = temp_store();
let hash = Hash256::digest(b"cert-target");
let cert = CommitCertificate {
node_hash: hash,
round: 3,
votes: vec![Vote {
voter: Did::new("did:exo:v0").unwrap(),
round: 3,
node_hash: hash,
signature: Signature::Empty,
}],
};
let mut cbor = Vec::new();
ciborium::into_writer(&cert, &mut cbor).unwrap();
store
.sqlite_conn()
.unwrap()
.execute(
"INSERT INTO commit_certificates (node_hash, round, cbor_data)
VALUES (?1, ?2, ?3)",
rusqlite::params![hash.0.as_slice(), 3_i64, cbor],
)
.unwrap();
let err = store.load_certificates().unwrap_err();
assert!(
err.to_string()
.contains("commit_certificates.votes[0].signature")
);
}
#[test]
fn save_certificate_rejects_rounds_that_do_not_fit_sqlite_integer() {
let mut store = temp_store();
let hash = Hash256::digest(b"cert-target");
let cert = CommitCertificate {
node_hash: hash,
round: u64::MAX,
votes: vec![Vote {
voter: Did::new("did:exo:v0").unwrap(),
round: u64::MAX,
node_hash: hash,
signature: Signature::from_bytes([1u8; 64]),
}],
};
let err = store.save_certificate(&cert).unwrap_err();
assert!(err.to_string().contains("commit_certificates.round"));
}
#[test]
fn validator_set_persistence() {
let mut store = temp_store();
let empty = store.load_validator_set().unwrap();
assert!(empty.is_empty());
let mut set = BTreeSet::new();
set.insert(Did::new("did:exo:v0").unwrap());
set.insert(Did::new("did:exo:v1").unwrap());
set.insert(Did::new("did:exo:v2").unwrap());
store.save_validator_set(&set).unwrap();
let loaded = store.load_validator_set().unwrap();
assert_eq!(loaded.len(), 3);
assert!(loaded.contains(&Did::new("did:exo:v0").unwrap()));
assert!(loaded.contains(&Did::new("did:exo:v2").unwrap()));
let mut smaller = BTreeSet::new();
smaller.insert(Did::new("did:exo:v0").unwrap());
store.save_validator_set(&smaller).unwrap();
let loaded2 = store.load_validator_set().unwrap();
assert_eq!(loaded2.len(), 1);
}
#[test]
fn save_validator_set_preserves_existing_set_if_replacement_insert_fails() {
let mut store = temp_store();
let mut original = BTreeSet::new();
original.insert(Did::new("did:exo:v0").unwrap());
original.insert(Did::new("did:exo:v1").unwrap());
store.save_validator_set(&original).unwrap();
store
.sqlite_conn()
.unwrap()
.execute_batch(
"CREATE TEMP TRIGGER fail_validator_insert
BEFORE INSERT ON validators
WHEN NEW.did = 'did:exo:blocked'
BEGIN
SELECT RAISE(ABORT, 'injected validator insert failure');
END;",
)
.unwrap();
let mut replacement = BTreeSet::new();
replacement.insert(Did::new("did:exo:blocked").unwrap());
replacement.insert(Did::new("did:exo:z").unwrap());
let err = store.save_validator_set(&replacement).unwrap_err();
assert!(
err.to_string()
.contains("injected validator insert failure")
);
let loaded = store.load_validator_set().unwrap();
assert_eq!(loaded, original);
}
#[test]
fn load_validator_set_rejects_invalid_did() {
let store = temp_store();
store
.sqlite_conn()
.unwrap()
.execute("INSERT INTO validators (did) VALUES (?1)", ["not-a-did"])
.unwrap();
let err = store.load_validator_set().unwrap_err();
assert!(err.to_string().contains("validators.did"));
}
#[test]
fn receipt_save_and_load_by_hash() {
use exo_core::types::{ReceiptOutcome, Timestamp, TrustReceipt};
let mut store = temp_store();
let sign_fn = make_sign_fn();
let receipt = TrustReceipt::new(
Did::new("did:exo:agent-a").unwrap(),
Hash256::ZERO,
None,
"dag.commit".to_string(),
Hash256::digest(b"action-payload"),
ReceiptOutcome::Executed,
Timestamp {
physical_ms: 1_700_000_000_000,
logical: 0,
},
&*sign_fn,
)
.expect("test trust receipt should encode");
let hash = receipt.receipt_hash;
store.save_receipt(&receipt).unwrap();
let loaded = store.load_receipt(&hash).unwrap();
assert!(loaded.is_some());
let loaded = loaded.unwrap();
assert_eq!(loaded.receipt_hash, hash);
assert_eq!(loaded.actor_did.to_string(), "did:exo:agent-a");
assert_eq!(loaded.action_type, "dag.commit");
assert_eq!(loaded.outcome, ReceiptOutcome::Executed);
}
#[test]
fn save_receipt_rejects_empty_signature() {
use exo_core::types::{ReceiptOutcome, Timestamp, TrustReceipt};
let mut store = temp_store();
let receipt = TrustReceipt::new(
Did::new("did:exo:agent-a").unwrap(),
Hash256::digest(b"authority"),
None,
"dag.commit".to_string(),
Hash256::digest(b"action-payload"),
ReceiptOutcome::Executed,
Timestamp {
physical_ms: 1_700_000_000_000,
logical: 0,
},
&|_| Signature::Empty,
)
.expect("test trust receipt should encode");
let err = store.save_receipt(&receipt).unwrap_err();
assert!(err.to_string().contains("trust_receipts.signature"));
}
#[test]
fn save_receipt_rejects_timestamps_that_do_not_fit_sqlite_integer() {
use exo_core::types::{ReceiptOutcome, Timestamp, TrustReceipt};
let mut store = temp_store();
let sign_fn = make_sign_fn();
let receipt = TrustReceipt::new(
Did::new("did:exo:agent-a").unwrap(),
Hash256::digest(b"authority"),
None,
"dag.commit".to_string(),
Hash256::digest(b"action-payload"),
ReceiptOutcome::Executed,
Timestamp {
physical_ms: u64::MAX,
logical: 0,
},
&*sign_fn,
)
.expect("test trust receipt should encode");
let err = store.save_receipt(&receipt).unwrap_err();
assert!(err.to_string().contains("trust_receipts.timestamp_ms"));
}
#[test]
fn receipt_load_nonexistent() {
let store = temp_store();
let result = store.load_receipt(&Hash256::ZERO).unwrap();
assert!(result.is_none());
}
#[test]
fn receipt_load_by_actor_filters_and_limits() {
use exo_core::types::{ReceiptOutcome, Timestamp, TrustReceipt};
let mut store = temp_store();
let sign_fn = make_sign_fn();
for i in 0u64..3 {
let receipt = TrustReceipt::new(
Did::new("did:exo:actor-a").unwrap(),
Hash256::ZERO,
None,
format!("action.{i}"),
Hash256::digest(format!("payload-{i}").as_bytes()),
ReceiptOutcome::Executed,
Timestamp {
physical_ms: 1_000_000 + i * 1000,
logical: 0,
},
&*sign_fn,
)
.expect("test trust receipt should encode");
store.save_receipt(&receipt).unwrap();
}
let other = TrustReceipt::new(
Did::new("did:exo:actor-b").unwrap(),
Hash256::ZERO,
None,
"other.action".to_string(),
Hash256::digest(b"other"),
ReceiptOutcome::Denied,
Timestamp {
physical_ms: 2_000_000,
logical: 0,
},
&*sign_fn,
)
.expect("test trust receipt should encode");
store.save_receipt(&other).unwrap();
let results = store.load_receipts_by_actor("did:exo:actor-a", 10).unwrap();
assert_eq!(results.len(), 3);
let limited = store.load_receipts_by_actor("did:exo:actor-a", 2).unwrap();
assert_eq!(limited.len(), 2);
assert!(limited[0].timestamp.physical_ms >= limited[1].timestamp.physical_ms);
let b_results = store.load_receipts_by_actor("did:exo:actor-b", 10).unwrap();
assert_eq!(b_results.len(), 1);
assert_eq!(b_results[0].outcome, ReceiptOutcome::Denied);
let none = store.load_receipts_by_actor("did:exo:unknown", 10).unwrap();
assert!(none.is_empty());
}
#[test]
fn receipt_load_recent_across_actors_orders_and_limits() {
use exo_core::types::{ReceiptOutcome, Timestamp, TrustReceipt};
let mut store = temp_store();
let sign_fn = make_sign_fn();
let actors = [
"did:exo:actor-a",
"did:exo:actor-b",
"did:exo:actor-c",
"did:exo:actor-d",
];
for (idx, actor) in actors.iter().enumerate() {
let timestamp = Timestamp {
physical_ms: 1_000_000 + u64::try_from(idx).unwrap() * 1000,
logical: 0,
};
let receipt = TrustReceipt::new(
Did::new(actor).unwrap(),
Hash256::digest(format!("authority-{idx}").as_bytes()),
None,
format!("action.{idx}"),
Hash256::digest(format!("payload-{idx}").as_bytes()),
ReceiptOutcome::Executed,
timestamp,
&*sign_fn,
)
.expect("test trust receipt should encode");
store.save_receipt(&receipt).unwrap();
}
let recent = store.load_recent_receipts(3).unwrap();
assert_eq!(recent.len(), 3);
assert_eq!(recent[0].actor_did.to_string(), "did:exo:actor-d");
assert_eq!(recent[1].actor_did.to_string(), "did:exo:actor-c");
assert_eq!(recent[2].actor_did.to_string(), "did:exo:actor-b");
assert!(recent.iter().all(|receipt| receipt.verify_hash().unwrap()));
}
#[test]
fn multiple_tips() {
let mut dag = Dag::new();
let mut clock = DeterministicDagClock::new();
let creator = Did::new("did:exo:test").expect("valid");
let sign_fn = make_sign_fn();
let genesis = append(&mut dag, &[], b"genesis", &creator, &*sign_fn, &mut clock).unwrap();
let c1 = append(
&mut dag,
&[genesis.hash],
b"c1",
&creator,
&*sign_fn,
&mut clock,
)
.unwrap();
let c2 = append(
&mut dag,
&[genesis.hash],
b"c2",
&creator,
&*sign_fn,
&mut clock,
)
.unwrap();
let mut store = temp_store();
store.put_sync(genesis).unwrap();
store.put_sync(c1.clone()).unwrap();
store.put_sync(c2.clone()).unwrap();
let t = store.tips_sync().unwrap();
assert_eq!(t.len(), 2);
assert!(t.contains(&c1.hash));
assert!(t.contains(&c2.hash));
}
#[test]
fn committed_nodes_in_range_rejects_short_hash() {
let store = temp_store();
store
.sqlite_conn()
.unwrap()
.execute(
"INSERT INTO committed (hash, height) VALUES (?1, ?2)",
rusqlite::params![vec![0xCDu8; 31], 1_i64],
)
.unwrap();
let err = store.committed_nodes_in_range(0, 10).unwrap_err();
assert!(err.to_string().contains("committed.hash"));
}
#[test]
fn committed_height_for_rejects_negative_height() {
let store = temp_store();
let hash = Hash256::digest(b"committed-node");
store
.sqlite_conn()
.unwrap()
.execute(
"INSERT INTO committed (hash, height) VALUES (?1, ?2)",
rusqlite::params![hash.0.as_slice(), -1_i64],
)
.unwrap();
let err = store.committed_height_for(&hash).unwrap_err();
assert!(err.to_string().contains("committed.height"));
}
#[test]
fn committed_height_value_rejects_negative_height() {
let store = temp_store();
let hash = Hash256::digest(b"committed-node");
store
.sqlite_conn()
.unwrap()
.execute(
"INSERT INTO committed (hash, height) VALUES (?1, ?2)",
rusqlite::params![hash.0.as_slice(), -1_i64],
)
.unwrap();
let err = store.committed_height_value().unwrap_err();
assert!(err.to_string().contains("committed.height"));
}
#[test]
fn committed_height_sync_rejects_negative_height() {
let store = temp_store();
let hash = Hash256::digest(b"committed-node");
store
.sqlite_conn()
.unwrap()
.execute(
"INSERT INTO committed (hash, height) VALUES (?1, ?2)",
rusqlite::params![hash.0.as_slice(), -1_i64],
)
.unwrap();
let err = store.committed_height_sync().unwrap_err();
assert!(err.to_string().contains("committed.height"));
}
#[test]
fn mark_committed_rejects_heights_that_do_not_fit_sqlite_integer() {
let mut store = temp_store();
let node = make_test_node();
store.put_sync(node.clone()).unwrap();
let err = store.mark_committed_sync(&node.hash, u64::MAX).unwrap_err();
assert!(err.to_string().contains("committed.height"));
}
#[test]
fn mark_committed_with_receipt_rolls_back_when_receipt_is_rejected() {
use exo_core::types::{ReceiptOutcome, Timestamp, TrustReceipt};
let mut store = temp_store();
let node = make_test_node();
store.put_sync(node.clone()).unwrap();
let receipt = TrustReceipt::new(
Did::new("did:exo:test").unwrap(),
Hash256::digest(b"authority"),
None,
"dag.commit".to_string(),
node.hash,
ReceiptOutcome::Executed,
Timestamp {
physical_ms: 1_700_000_000_000,
logical: 0,
},
&|_| Signature::empty(),
)
.expect("test receipt should encode");
let err = store
.mark_committed_with_receipt_sync(&node.hash, 1, &receipt)
.unwrap_err();
assert!(err.to_string().contains("trust_receipts.signature"));
assert!(
!store.is_committed(&node.hash).unwrap(),
"commit marker must not persist when receipt insert fails"
);
assert!(
store.load_receipt(&receipt.receipt_hash).unwrap().is_none(),
"rejected receipt must not persist partial receipt data"
);
}
#[test]
fn certificate_commit_with_receipt_rolls_back_every_row_when_receipt_is_rejected() {
use exo_core::types::{ReceiptOutcome, Timestamp, TrustReceipt};
let mut store = temp_store();
let node = make_test_node();
store.put_sync(node.clone()).unwrap();
let cert = CommitCertificate {
node_hash: node.hash,
round: 0,
votes: vec![Vote {
voter: Did::new("did:exo:v0").unwrap(),
round: 0,
node_hash: node.hash,
signature: Signature::from_bytes([7u8; 64]),
}],
};
let receipt = TrustReceipt::new(
Did::new("did:exo:test").unwrap(),
Hash256::digest(b"authority"),
None,
"dag.commit".to_string(),
node.hash,
ReceiptOutcome::Executed,
Timestamp {
physical_ms: 1_700_000_000_000,
logical: 0,
},
&|_| Signature::empty(),
)
.expect("test receipt should encode");
let err = store
.persist_commit_certificate_with_receipt_sync(&node.hash, 1, &cert, &receipt)
.unwrap_err();
assert!(err.to_string().contains("trust_receipts.signature"));
assert!(
!store.is_committed(&node.hash).unwrap(),
"commit marker must not persist when receipt insert fails"
);
assert!(
store.load_certificates().unwrap().is_empty(),
"certificate must not persist without its matching receipt"
);
assert!(
store.load_receipt(&receipt.receipt_hash).unwrap().is_none(),
"rejected receipt must not persist partial receipt data"
);
}
#[test]
fn children_rejects_short_child_hash() {
let store = temp_store();
let parent = Hash256::digest(b"parent");
store
.sqlite_conn()
.unwrap()
.execute(
"INSERT INTO dag_parents (child_hash, parent_hash) VALUES (?1, ?2)",
rusqlite::params![vec![0xABu8; 31], parent.0.as_slice()],
)
.unwrap();
let err = store.children(&parent).unwrap_err();
assert!(err.to_string().contains("dag_parents.child_hash"));
}
#[test]
fn tips_rejects_short_hash() {
let store = temp_store();
store
.sqlite_conn()
.unwrap()
.execute(
"INSERT INTO dag_nodes (hash, cbor_payload) VALUES (?1, ?2)",
rusqlite::params![vec![0xABu8; 31], vec![0x00u8]],
)
.unwrap();
let err = store.tips_sync().unwrap_err();
assert!(err.to_string().contains("dag_nodes.hash"));
}
}