exochain-dag-db-postgres 0.2.0-beta

EXOCHAIN DAG DB PostgreSQL persistence adapters
Documentation
#![cfg(feature = "postgres")]
#![allow(clippy::expect_used)]

use std::process;

use exo_dag_db_postgres::{
    context_packet_persistence::{
        CONTEXT_PACKET_RECORD_SCHEMA_VERSION, ContextPacketRecord, DefaultContextQuality,
        PacketFreshnessStatus, PacketPersistenceStatus, PacketValidationStatus,
        canonical_idempotency_key,
    },
    postgres::{
        DAGDB_PRD17_CONTEXT_PACKET_SCHEMA_SQL,
        context_packet_persistence::{ContextPacketPostgresError, persist_context_packet_record},
    },
};
use sqlx::{Connection, PgConnection, PgPool, postgres::PgPoolOptions};

fn record_for_scope(
    packet_id: &str,
    tenant_id: &str,
    project_id: &str,
    memory_namespace: &str,
) -> ContextPacketRecord {
    ContextPacketRecord {
        schema_version: CONTEXT_PACKET_RECORD_SCHEMA_VERSION.to_owned(),
        packet_id: packet_id.to_owned(),
        route_id: "route-prd17b-001".to_owned(),
        query_hash: "query-hash-prd17b-001".to_owned(),
        tenant_id: tenant_id.to_owned(),
        project_id: project_id.to_owned(),
        memory_namespace: memory_namespace.to_owned(),
        selected_memory_ids: vec!["memory-prd17b-001".to_owned()],
        selected_edge_ids: Vec::new(),
        token_budget: 1_000,
        token_estimate: 200,
        context_quality: DefaultContextQuality::UsableContext,
        citation_coverage_bp: 10_000,
        validation_coverage_bp: 10_000,
        freshness_status: PacketFreshnessStatus::Current,
        validation_status: PacketValidationStatus::Passed,
        source_proof_refs: vec!["receipt-prd17b-001".to_owned()],
        fallback_reason: None,
        idempotency_key: canonical_idempotency_key(
            "route-prd17b-001",
            "query-hash-prd17b-001",
            1_000,
        ),
        persistence_status: PacketPersistenceStatus::ProofBound,
        production_default_route_approval_status: "operator_deferred".to_owned(),
        packet_quality_review_status: "operator_deferred".to_owned(),
        created_at: "2026-06-09T00:00:00Z".to_owned(),
    }
}

#[tokio::test]
async fn context_packet_record_rejects_cross_scope_and_mutated_packet_id_replays() {
    let Some(db) = TestDb::maybe_new("ctx_packet_replay_guard").await else {
        return;
    };
    let record = record_for_scope("packet-prd17b-001", "tenant-a", "project-a", "primary");
    let rows = persist_context_packet_record(&db.pool, &record)
        .await
        .expect("first persist");
    assert_eq!(rows, 1);

    let replay_rows = persist_context_packet_record(&db.pool, &record)
        .await
        .expect("exact replay");
    assert_eq!(replay_rows, 0);

    let cross_scope = record_for_scope("packet-prd17b-001", "tenant-b", "project-b", "primary");
    assert!(matches!(
        persist_context_packet_record(&db.pool, &cross_scope).await,
        Err(ContextPacketPostgresError::UnsafeReplay { .. })
    ));

    let mut mutated = record_for_scope("packet-prd17b-001", "tenant-a", "project-a", "primary");
    mutated.persistence_status = PacketPersistenceStatus::Persisted;
    assert!(matches!(
        persist_context_packet_record(&db.pool, &mutated).await,
        Err(ContextPacketPostgresError::UnsafeReplay { .. })
    ));

    let (tenant_id, persistence_status) = sqlx::query_as::<_, (String, String)>(
        "SELECT tenant_id, persistence_status FROM dagdb_context_packet_records \
         WHERE packet_id = $1",
    )
    .bind("packet-prd17b-001")
    .fetch_one(&db.pool)
    .await
    .expect("load persisted packet row");
    assert_eq!(tenant_id, "tenant-a");
    assert_eq!(persistence_status, "proof_bound");
}

struct TestDb {
    pool: PgPool,
    schema: String,
    database_url: String,
}

impl TestDb {
    async fn maybe_new(label: &str) -> Option<Self> {
        let Ok(database_url) = std::env::var("EXO_DAGDB_TEST_DATABASE_URL") else {
            eprintln!(
                "skipping context packet postgres test: EXO_DAGDB_TEST_DATABASE_URL is not set"
            );
            return None;
        };
        let schema = format!("dagdb_{label}_{}", process::id());
        let mut admin = PgConnection::connect(&database_url)
            .await
            .expect("connect to EXO_DAGDB_TEST_DATABASE_URL");
        sqlx::raw_sql(&format!("DROP SCHEMA IF EXISTS {schema} CASCADE"))
            .execute(&mut admin)
            .await
            .expect("drop existing context packet test schema");
        sqlx::raw_sql(&format!("CREATE SCHEMA {schema}"))
            .execute(&mut admin)
            .await
            .expect("create context packet test schema");

        let separator = if database_url.contains('?') { '&' } else { '?' };
        let scoped_url =
            format!("{database_url}{separator}options=-csearch_path%3D{schema}%2Cpublic");
        let pool = PgPoolOptions::new()
            .max_connections(4)
            .connect(&scoped_url)
            .await
            .expect("connect context packet test pool");
        sqlx::raw_sql(DAGDB_PRD17_CONTEXT_PACKET_SCHEMA_SQL)
            .execute(&pool)
            .await
            .expect("apply PRD17B context packet schema");
        Some(Self {
            pool,
            schema,
            database_url,
        })
    }
}

impl Drop for TestDb {
    fn drop(&mut self) {
        let schema = self.schema.clone();
        let database_url = self.database_url.clone();
        std::thread::spawn(move || {
            let runtime = tokio::runtime::Runtime::new().expect("create cleanup runtime");
            runtime.block_on(async move {
                let mut conn = PgConnection::connect(&database_url)
                    .await
                    .expect("connect for context packet cleanup");
                sqlx::raw_sql(&format!("DROP SCHEMA IF EXISTS {schema} CASCADE"))
                    .execute(&mut conn)
                    .await
                    .expect("drop context packet test schema");
            });
        })
        .join()
        .expect("join cleanup thread");
    }
}