exochain-dag-db-postgres 0.2.0-beta

EXOCHAIN DAG DB PostgreSQL persistence adapters
Documentation
#![cfg(feature = "postgres")]
#![allow(clippy::expect_used, clippy::panic, clippy::unwrap_used)]

use exo_dag_db_postgres::postgres::{DAGDB_EXPORT_SCHEMA_SQL, init_pool, migrator};
use sqlx::{PgPool, Row, postgres::PgPoolOptions};

const EXPORT_MIGRATION_VERSION: i64 = 20260511000001;
const FORBIDDEN_COLUMN_FRAGMENTS: &[&str] = &[
    "raw_body",
    "raw_markdown",
    "raw_private_payload",
    "raw_model_output",
    "source_excerpt",
    "gateway_secret",
    "database_url",
    "db_url",
    "private_key",
    "absolute_path",
];

#[test]
fn export_persistence_migration_source_is_narrow_and_additive() {
    let lower = DAGDB_EXPORT_SCHEMA_SQL.to_ascii_lowercase();
    assert!(lower.contains("create table if not exists dagdb_exports"));
    assert!(lower.contains("create table if not exists dagdb_export_challenges"));
    assert!(lower.contains("export_created"));
    assert!(lower.contains("export_verified"));
    assert!(lower.contains("export_failed"));
    assert!(lower.contains("export_challenge_created"));
    assert!(lower.contains("export_challenge_verified"));
    assert!(!lower.contains("dagdb_dag_outbox"));
    assert!(!lower.contains("create table dag_nodes"));
    assert!(!lower.contains("alter table dag_nodes"));
    assert!(!lower.contains("create table dag_committed"));
    assert!(!lower.contains("alter table dag_committed"));
    assert!(!lower.contains("route_invalidations"));
    for fragment in FORBIDDEN_COLUMN_FRAGMENTS {
        assert!(
            !lower.contains(fragment),
            "export migration must not contain forbidden raw/private fragment {fragment}"
        );
    }
}

#[test]
fn export_persistence_migration_is_registered() {
    assert!(
        migrator()
            .iter()
            .any(|migration| migration.version == EXPORT_MIGRATION_VERSION),
        "export persistence migration must be registered"
    );
}

#[tokio::test]
async fn export_persistence_migration_live_schema_contract() {
    let Some(db) = TestDb::new("export_persistence_migration").await else {
        return;
    };
    let pool = init_pool(&db.scoped_url)
        .await
        .expect("init_pool must apply export persistence migration");

    assert_export_tables(&pool).await;
    assert_export_columns(&pool).await;
    assert_export_constraints(&pool).await;
    assert_export_indexes(&pool).await;
    assert_no_exo_dag_tables(&pool).await;

    pool.close().await;
    db.cleanup().await;
}

async fn assert_export_tables(pool: &PgPool) {
    let tables = sqlx::query_scalar::<_, String>(
        "SELECT table_name FROM information_schema.tables \
         WHERE table_schema = current_schema() AND table_name LIKE 'dagdb_export%' \
         ORDER BY table_name",
    )
    .fetch_all(pool)
    .await
    .expect("query export tables");
    assert_eq!(
        tables,
        vec![
            "dagdb_export_challenges".to_owned(),
            "dagdb_exports".to_owned(),
        ]
    );
}

async fn assert_export_columns(pool: &PgPool) {
    for (table, columns) in expected_columns() {
        let actual = sqlx::query_scalar::<_, String>(
            "SELECT column_name FROM information_schema.columns \
             WHERE table_schema = current_schema() AND table_name = $1 \
             ORDER BY ordinal_position",
        )
        .bind(table)
        .fetch_all(pool)
        .await
        .unwrap_or_else(|err| panic!("query columns for {table}: {err}"));
        assert_eq!(actual, columns, "column names for {table}");
        for column in actual {
            let lower = column.to_ascii_lowercase();
            for fragment in FORBIDDEN_COLUMN_FRAGMENTS {
                assert!(
                    !lower.contains(fragment),
                    "{table}.{column} must not persist forbidden raw/private material"
                );
            }
        }
    }
}

async fn assert_export_constraints(pool: &PgPool) {
    let constraints = sqlx::query(
        "SELECT rel.relname AS table_name, pg_get_constraintdef(con.oid) AS definition \
         FROM pg_constraint con \
         JOIN pg_class rel ON rel.oid = con.conrelid \
         JOIN pg_namespace ns ON ns.oid = rel.relnamespace \
         WHERE ns.nspname = current_schema() \
           AND rel.relname IN ('dagdb_exports','dagdb_export_challenges','dagdb_receipts','dagdb_subject_receipt_heads')",
    )
    .fetch_all(pool)
    .await
    .expect("query export constraints");
    assert_constraint(
        &constraints,
        "dagdb_exports",
        "schema_version = 'dagdb_kg_portable_export_v1'::text",
    );
    assert_constraint(
        &constraints,
        "dagdb_exports",
        "octet_length(export_id) = 32",
    );
    assert_constraint(&constraints, "dagdb_exports", "export_status = ANY");
    assert_constraint(
        &constraints,
        "dagdb_export_challenges",
        "challenge_kind = ANY",
    );
    assert_constraint(
        &constraints,
        "dagdb_export_challenges",
        "proof_algorithm = 'hash_commitment_v1'::text",
    );
    assert_constraint(&constraints, "dagdb_receipts", "export");
    assert_constraint(&constraints, "dagdb_receipts", "dagdb_export_completed");
    assert_constraint(&constraints, "dagdb_receipts", "export_challenge_verified");
    assert_constraint(&constraints, "dagdb_subject_receipt_heads", "export");
}

fn assert_constraint(rows: &[sqlx::postgres::PgRow], table: &str, snippet: &str) {
    assert!(
        rows.iter().any(|row| {
            row.get::<String, _>("table_name") == table
                && row.get::<String, _>("definition").contains(snippet)
        }),
        "missing constraint snippet {snippet:?} on {table}"
    );
}

async fn assert_export_indexes(pool: &PgPool) {
    let indexes = sqlx::query_scalar::<_, String>(
        "SELECT indexname FROM pg_indexes \
         WHERE schemaname = current_schema() AND indexname LIKE 'idx_dagdb_export%' \
         ORDER BY indexname",
    )
    .fetch_all(pool)
    .await
    .expect("query export indexes");
    for expected in [
        "idx_dagdb_export_challenges_export",
        "idx_dagdb_export_challenges_status",
        "idx_dagdb_exports_receipt",
        "idx_dagdb_exports_scope_hash",
        "idx_dagdb_exports_scope_status",
    ] {
        assert!(
            indexes.iter().any(|index| index == expected),
            "missing {expected}"
        );
    }
}

async fn assert_no_exo_dag_tables(pool: &PgPool) {
    let count: i64 = sqlx::query_scalar(
        "SELECT count(*) FROM information_schema.tables \
         WHERE table_schema = current_schema() \
           AND table_name IN ('dag_nodes','dag_committed')",
    )
    .fetch_one(pool)
    .await
    .expect("count exo-dag tables");
    assert_eq!(count, 0);
}

fn expected_columns() -> Vec<(&'static str, Vec<String>)> {
    vec![
        (
            "dagdb_exports",
            vec![
                "export_id",
                "tenant_id",
                "namespace",
                "schema_version",
                "export_scope_hash",
                "source_commit_or_repo_ref",
                "included_memory_ids_hash",
                "included_receipt_heads_hash",
                "section_hashes",
                "section_counts",
                "citation_index_hash",
                "provenance_index_hash",
                "redaction_summary_hash",
                "omission_summary_hash",
                "verification_hash",
                "whole_export_hash",
                "export_status",
                "authority_ref_hash",
                "consent_ref_hash",
                "approval_ref_hash",
                "requester_did",
                "latest_receipt_hash",
                "created_at_physical_ms",
                "created_at_logical",
                "updated_at_physical_ms",
                "updated_at_logical",
            ],
        ),
        (
            "dagdb_export_challenges",
            vec![
                "challenge_id",
                "tenant_id",
                "namespace",
                "export_id",
                "challenge_kind",
                "challenge_hash",
                "proof_hash",
                "proof_algorithm",
                "verifier_did",
                "verification_status",
                "verification_notes_hash",
                "created_at_physical_ms",
                "created_at_logical",
            ],
        ),
    ]
    .into_iter()
    .map(|(table, columns)| {
        (
            table,
            columns.into_iter().map(str::to_owned).collect::<Vec<_>>(),
        )
    })
    .collect()
}

struct TestDb {
    admin_pool: PgPool,
    scoped_url: String,
    schema: String,
}

impl TestDb {
    async fn new(label: &str) -> Option<Self> {
        let Ok(database_url) = std::env::var("EXO_DAGDB_TEST_DATABASE_URL") else {
            eprintln!(
                "skipping export_persistence_migration live test: EXO_DAGDB_TEST_DATABASE_URL is not set"
            );
            return None;
        };
        let schema = format!("dagdb_{label}_{}", std::process::id());
        let admin_pool = PgPoolOptions::new()
            .max_connections(1)
            .connect(&database_url)
            .await
            .expect("connect admin Postgres pool");
        sqlx::query(&format!(r#"DROP SCHEMA IF EXISTS "{schema}" CASCADE"#))
            .execute(&admin_pool)
            .await
            .expect("drop isolated schema");
        sqlx::query(&format!(r#"CREATE SCHEMA "{schema}""#))
            .execute(&admin_pool)
            .await
            .expect("create isolated schema");
        Some(Self {
            admin_pool,
            scoped_url: database_url_with_search_path(&database_url, &schema),
            schema,
        })
    }

    async fn cleanup(self) {
        sqlx::query(&format!(
            r#"DROP SCHEMA IF EXISTS "{}" CASCADE"#,
            self.schema
        ))
        .execute(&self.admin_pool)
        .await
        .expect("drop isolated schema after test");
        self.admin_pool.close().await;
    }
}

fn database_url_with_search_path(database_url: &str, schema: &str) -> String {
    let separator = if database_url.contains('?') { "&" } else { "?" };
    format!("{database_url}{separator}options=-csearch_path%3D{schema}%2Cpublic")
}