exochain-dag-db-postgres 0.2.0-beta

EXOCHAIN DAG DB PostgreSQL persistence adapters
Documentation
#![cfg(feature = "postgres")]
#![allow(clippy::expect_used, clippy::panic, clippy::unwrap_used)]

use std::process;

use exo_dag_db_postgres::postgres::DAGDB_SCHEMA_SQL;
use serde_json::json;
use sqlx::{Connection, PgConnection};

#[tokio::test]
async fn active_duplicate_uniqueness_allows_only_non_active_replacements() {
    let Some(mut db) = TestDb::maybe_new("persistence_duplicate").await else {
        return;
    };
    db.apply_schema().await;
    insert_receipt(&mut db.conn, bytes(1), bytes(41), "memory").await;

    insert_memory(
        &mut db.conn,
        bytes(11),
        bytes(1),
        bytes(21),
        bytes(22),
        None,
    )
    .await
    .expect("first active memory insert succeeds");
    let duplicate = insert_memory(
        &mut db.conn,
        bytes(12),
        bytes(1),
        bytes(21),
        bytes(22),
        None,
    )
    .await
    .expect_err("duplicate active memory must fail");
    assert!(
        duplicate
            .to_string()
            .contains("uq_dagdb_memory_active_duplicate"),
        "unexpected duplicate error: {duplicate}"
    );

    sqlx::query(
        "UPDATE dagdb_memory_objects \
         SET status = 'revoked', revoked_at_physical_ms = 11, revoked_at_logical = 0 \
         WHERE memory_id = $1",
    )
    .bind(bytes(11))
    .execute(&mut db.conn)
    .await
    .expect("mark first memory revoked");

    insert_memory(
        &mut db.conn,
        bytes(13),
        bytes(1),
        bytes(21),
        bytes(22),
        None,
    )
    .await
    .expect("revoked memory no longer blocks replacement");
}

#[tokio::test]
async fn transaction_failure_rolls_back_domain_rows() {
    let Some(mut db) = TestDb::maybe_new("persistence_rollback").await else {
        return;
    };
    db.apply_schema().await;

    let mut tx = db.conn.begin().await.expect("begin rollback transaction");
    insert_receipt(&mut *tx, bytes(2), bytes(42), "memory").await;
    insert_memory(&mut *tx, bytes(14), bytes(2), bytes(23), bytes(24), None)
        .await
        .expect("insert memory inside transaction");
    sqlx::query(
        "INSERT INTO dagdb_idempotency_keys \
         (tenant_id, namespace, route_name, idempotency_key, request_hash, response_hash, response_body, status_code, created_at_physical_ms, created_at_logical, expires_at_physical_ms, expires_at_logical) \
         VALUES ('tenant-a', 'default', 'dagdb.intake', 'idem-rollback', $1, $2, '{}'::jsonb, 201, 1, 0, 2, 0)",
    )
    .bind(bytes(31))
    .bind(bytes(32))
    .execute(&mut *tx)
    .await
    .expect("insert idempotency inside transaction");
    tx.rollback()
        .await
        .expect("rollback persistence transaction");

    let memory_count: i64 = sqlx::query_scalar("SELECT count(*) FROM dagdb_memory_objects")
        .fetch_one(&mut db.conn)
        .await
        .expect("count memory rows");
    let receipt_count: i64 = sqlx::query_scalar("SELECT count(*) FROM dagdb_receipts")
        .fetch_one(&mut db.conn)
        .await
        .expect("count receipt rows");
    let idempotency_count: i64 = sqlx::query_scalar("SELECT count(*) FROM dagdb_idempotency_keys")
        .fetch_one(&mut db.conn)
        .await
        .expect("count idempotency rows");

    assert_eq!(memory_count, 0);
    assert_eq!(receipt_count, 0);
    assert_eq!(idempotency_count, 0);
}

async fn insert_receipt<'e, E>(executor: E, receipt_hash: Vec<u8>, subject_id: Vec<u8>, kind: &str)
where
    E: sqlx::Executor<'e, Database = sqlx::Postgres>,
{
    sqlx::query(
        "INSERT INTO dagdb_receipts \
         (receipt_hash, tenant_id, namespace, subject_kind, subject_id, prev_receipt_hash, seq, event_type, actor_did, event_hlc_physical_ms, event_hlc_logical, event_hash, receipt_body, created_at_physical_ms, created_at_logical) \
         VALUES ($1, 'tenant-a', 'default', $2, $3, $4, 1, 'intake_created', 'did:example:actor', 1, 0, $5, '{}'::jsonb, 1, 0)",
    )
    .bind(receipt_hash)
    .bind(kind)
    .bind(subject_id)
    .bind(bytes(0))
    .bind(bytes(9))
    .execute(executor)
    .await
    .expect("insert fixture receipt");
}

async fn insert_memory<'e, E>(
    executor: E,
    memory_id: Vec<u8>,
    latest_receipt_hash: Vec<u8>,
    payload_hash: Vec<u8>,
    source_hash: Vec<u8>,
    superseded_by: Option<Vec<u8>>,
) -> Result<(), sqlx::Error>
where
    E: sqlx::Executor<'e, Database = sqlx::Postgres>,
{
    let metadata = json!({
        "decision": "allow",
        "text": "safe",
        "redaction_codes": [],
        "original_hash": "caac13844969e521bb8bfcf8bc706ad54bcce3e3f260368eda31bdb0542d00e1",
        "truncated": false,
        "byte_len": 4
    });
    sqlx::query(
        "INSERT INTO dagdb_memory_objects \
         (memory_id, tenant_id, namespace, node_type, source_type, consent_purpose, payload_hash, source_hash, owner_did, controller_did, submitted_by_did, title, summary, keywords, risk_class, risk_bp, latest_receipt_hash, created_at_physical_ms, created_at_logical, updated_at_physical_ms, updated_at_logical, superseded_by_memory_id) \
         VALUES ($1, 'tenant-a', 'default', 'source', 'public_web', 'retrieval', $2, $3, 'did:example:owner', 'did:example:controller', 'did:example:submitter', $4, $4, '[]'::jsonb, 'R0', 0, $5, 1, 0, 1, 0, $6)",
    )
    .bind(memory_id)
    .bind(payload_hash)
    .bind(source_hash)
    .bind(metadata)
    .bind(latest_receipt_hash)
    .bind(superseded_by)
    .execute(executor)
    .await?;
    Ok(())
}

fn bytes(byte: u8) -> Vec<u8> {
    vec![byte; 32]
}

struct TestDb {
    conn: PgConnection,
    schema: String,
    database_url: String,
}

impl TestDb {
    async fn maybe_new(label: &str) -> Option<Self> {
        let Ok(database_url) = std::env::var("EXO_DAGDB_TEST_DATABASE_URL") else {
            eprintln!("skipping persistence postgres test: EXO_DAGDB_TEST_DATABASE_URL is not set");
            return None;
        };
        let schema = format!("dagdb_{label}_{}", process::id());
        let mut conn = PgConnection::connect(database_url.as_str())
            .await
            .expect("connect to EXO_DAGDB_TEST_DATABASE_URL");
        sqlx::raw_sql(&format!("DROP SCHEMA IF EXISTS {schema} CASCADE"))
            .execute(&mut conn)
            .await
            .expect("drop existing test schema");
        sqlx::raw_sql(&format!("CREATE SCHEMA {schema}"))
            .execute(&mut conn)
            .await
            .expect("create test schema");
        let mut db = Self {
            conn,
            schema,
            database_url,
        };
        db.set_search_path().await;
        Some(db)
    }

    async fn set_search_path(&mut self) {
        sqlx::raw_sql(&format!("SET search_path TO {}, public", self.schema))
            .execute(&mut self.conn)
            .await
            .expect("set DAG DB test search_path");
    }

    async fn apply_schema(&mut self) {
        self.set_search_path().await;
        sqlx::raw_sql(DAGDB_SCHEMA_SQL)
            .execute(&mut self.conn)
            .await
            .expect("apply DAG DB schema");
    }
}

impl Drop for TestDb {
    fn drop(&mut self) {
        let schema = self.schema.clone();
        let database_url = self.database_url.clone();
        std::thread::spawn(move || {
            let runtime = tokio::runtime::Runtime::new().expect("create cleanup runtime");
            runtime.block_on(async move {
                let mut conn = PgConnection::connect(&database_url)
                    .await
                    .expect("connect for cleanup");
                sqlx::raw_sql(&format!("DROP SCHEMA IF EXISTS {schema} CASCADE"))
                    .execute(&mut conn)
                    .await
                    .expect("drop DAG DB test schema");
            });
        })
        .join()
        .expect("join cleanup thread");
    }
}