use exo_core::Timestamp;
use exo_dag_db_api::{ReceiptEventType, SubjectKind};
use serde_json::{json, to_value};
use sqlx::{PgPool, Postgres, Row, Transaction};
use crate::{
context_packet_persistence::{
ContextPacketAcceptanceEvidence, ContextPacketError, ContextPacketRecord,
accept_context_packet_record, validate_context_packet_record,
},
receipt::{
OperationalReceiptInsert, ReceiptStoreError, insert_operational_receipt_in_transaction,
operational_receipt_subject_id,
},
scoring::hash_event_body,
};
const CONTEXT_PACKET_AUDIT_ACTOR_DID: &str = "did:exo:dagdb-context-packet-writer";
const CONTEXT_PACKET_ROUTE_NAME: &str = "dagdb.context_packet";
const CREATED_AT: Timestamp = Timestamp::new(1, 0);
pub async fn persist_context_packet_record(
pool: &PgPool,
record: &ContextPacketRecord,
) -> Result<u64, ContextPacketPostgresError> {
validate_context_packet_record(record).map_err(ContextPacketPostgresError::Contract)?;
let mut tx = pool
.begin()
.await
.map_err(ContextPacketPostgresError::Sqlx)?;
let result = persist_context_packet_record_in_transaction(&mut tx, record).await;
match result {
Ok(rows) => {
tx.commit()
.await
.map_err(ContextPacketPostgresError::Sqlx)?;
Ok(rows)
}
Err(error) => {
if let Err(rollback_error) = tx.rollback().await {
tracing::warn!(
operation = "persist_context_packet_record",
tenant_id = %record.tenant_id,
project_id = %record.project_id,
memory_namespace = %record.memory_namespace,
packet_id = %record.packet_id,
route_id = %record.route_id,
error = %rollback_error,
"failed to rollback transaction after context packet persistence error"
);
}
Err(error)
}
}
}
pub async fn persist_accepted_context_packet_record(
pool: &PgPool,
record: &ContextPacketRecord,
evidence: &ContextPacketAcceptanceEvidence,
) -> Result<u64, ContextPacketPostgresError> {
let accepted = accept_context_packet_record(record, evidence)
.map_err(ContextPacketPostgresError::Contract)?;
persist_context_packet_record(pool, &accepted).await
}
pub async fn persist_context_packet_record_in_transaction(
tx: &mut Transaction<'_, Postgres>,
record: &ContextPacketRecord,
) -> Result<u64, ContextPacketPostgresError> {
validate_context_packet_record(record).map_err(ContextPacketPostgresError::Contract)?;
sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
.execute(&mut **tx)
.await
.map_err(ContextPacketPostgresError::Sqlx)?;
super::bind_tenant_context(tx, &record.tenant_id)
.await
.map_err(ContextPacketPostgresError::Sqlx)?;
let selected_memory_ids =
to_value(&record.selected_memory_ids).map_err(ContextPacketPostgresError::Json)?;
let selected_edge_ids =
to_value(&record.selected_edge_ids).map_err(ContextPacketPostgresError::Json)?;
let source_proof_refs =
to_value(&record.source_proof_refs).map_err(ContextPacketPostgresError::Json)?;
if let Some(existing) = sqlx::query(
"SELECT \
(tenant_id = $2 AND project_id = $3 AND memory_namespace = $4) AS scope_matches, \
(route_id = $5 AND query_hash = $6 AND selected_memory_ids = $7 \
AND selected_edge_ids = $8 AND token_budget = $9 AND token_estimate = $10 \
AND context_quality = $11 AND citation_coverage_bp = $12 \
AND validation_coverage_bp = $13 AND freshness_status = $14 \
AND validation_status = $15 AND source_proof_refs = $16 \
AND fallback_reason IS NOT DISTINCT FROM $17 AND idempotency_key = $18 \
AND persistence_status = $19 \
AND production_default_route_approval_status = $20 \
AND packet_quality_review_status = $21 AND created_at = $22) AS body_matches \
FROM dagdb_context_packet_records \
WHERE packet_id = $1 AND tenant_id = $2 AND project_id = $3 AND memory_namespace = $4",
)
.bind(&record.packet_id)
.bind(&record.tenant_id)
.bind(&record.project_id)
.bind(&record.memory_namespace)
.bind(&record.route_id)
.bind(&record.query_hash)
.bind(&selected_memory_ids)
.bind(&selected_edge_ids)
.bind(i32::try_from(record.token_budget).unwrap_or(i32::MAX))
.bind(i32::try_from(record.token_estimate).unwrap_or(i32::MAX))
.bind(serde_label(&record.context_quality)?)
.bind(i32::from(record.citation_coverage_bp))
.bind(i32::from(record.validation_coverage_bp))
.bind(serde_label(&record.freshness_status)?)
.bind(serde_label(&record.validation_status)?)
.bind(&source_proof_refs)
.bind(&record.fallback_reason)
.bind(&record.idempotency_key)
.bind(serde_label(&record.persistence_status)?)
.bind(&record.production_default_route_approval_status)
.bind(&record.packet_quality_review_status)
.bind(&record.created_at)
.fetch_optional(&mut **tx)
.await
.map_err(ContextPacketPostgresError::Sqlx)?
{
let scope_matches: bool = existing
.try_get("scope_matches")
.map_err(ContextPacketPostgresError::Sqlx)?;
let body_matches: bool = existing
.try_get("body_matches")
.map_err(ContextPacketPostgresError::Sqlx)?;
if scope_matches && body_matches {
return Ok(0);
}
return Err(ContextPacketPostgresError::UnsafeReplay {
packet_id: record.packet_id.clone(),
});
}
let result = sqlx::query(
r#"
INSERT INTO dagdb_context_packet_records (
packet_id,
route_id,
query_hash,
tenant_id,
project_id,
memory_namespace,
selected_memory_ids,
selected_edge_ids,
token_budget,
token_estimate,
context_quality,
citation_coverage_bp,
validation_coverage_bp,
freshness_status,
validation_status,
source_proof_refs,
fallback_reason,
idempotency_key,
persistence_status,
production_default_route_approval_status,
packet_quality_review_status,
created_at
)
VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12,
$13, $14, $15, $16, $17, $18, $19, $20, $21, $22
)
ON CONFLICT (packet_id)
DO UPDATE SET
route_id = EXCLUDED.route_id,
query_hash = EXCLUDED.query_hash,
selected_memory_ids = EXCLUDED.selected_memory_ids,
selected_edge_ids = EXCLUDED.selected_edge_ids,
token_budget = EXCLUDED.token_budget,
token_estimate = EXCLUDED.token_estimate,
context_quality = EXCLUDED.context_quality,
citation_coverage_bp = EXCLUDED.citation_coverage_bp,
validation_coverage_bp = EXCLUDED.validation_coverage_bp,
freshness_status = EXCLUDED.freshness_status,
validation_status = EXCLUDED.validation_status,
source_proof_refs = EXCLUDED.source_proof_refs,
fallback_reason = EXCLUDED.fallback_reason,
idempotency_key = EXCLUDED.idempotency_key,
persistence_status = EXCLUDED.persistence_status,
production_default_route_approval_status =
EXCLUDED.production_default_route_approval_status,
packet_quality_review_status = EXCLUDED.packet_quality_review_status
WHERE dagdb_context_packet_records.tenant_id = EXCLUDED.tenant_id
AND dagdb_context_packet_records.project_id = EXCLUDED.project_id
AND dagdb_context_packet_records.memory_namespace = EXCLUDED.memory_namespace
"#,
)
.bind(&record.packet_id)
.bind(&record.route_id)
.bind(&record.query_hash)
.bind(&record.tenant_id)
.bind(&record.project_id)
.bind(&record.memory_namespace)
.bind(selected_memory_ids)
.bind(selected_edge_ids)
.bind(i32::try_from(record.token_budget).unwrap_or(i32::MAX))
.bind(i32::try_from(record.token_estimate).unwrap_or(i32::MAX))
.bind(serde_label(&record.context_quality)?)
.bind(i32::from(record.citation_coverage_bp))
.bind(i32::from(record.validation_coverage_bp))
.bind(serde_label(&record.freshness_status)?)
.bind(serde_label(&record.validation_status)?)
.bind(source_proof_refs)
.bind(&record.fallback_reason)
.bind(&record.idempotency_key)
.bind(serde_label(&record.persistence_status)?)
.bind(&record.production_default_route_approval_status)
.bind(&record.packet_quality_review_status)
.bind(&record.created_at)
.execute(&mut **tx)
.await
.map_err(ContextPacketPostgresError::Sqlx)?;
if result.rows_affected() == 0 {
return Err(ContextPacketPostgresError::UnsafeReplay {
packet_id: record.packet_id.clone(),
});
}
insert_context_packet_approval_receipts(tx, record).await?;
insert_context_packet_record_accepted_receipt(tx, record).await?;
Ok(result.rows_affected())
}
async fn insert_context_packet_approval_receipts(
tx: &mut Transaction<'_, Postgres>,
record: &ContextPacketRecord,
) -> Result<u64, ContextPacketPostgresError> {
let mut inserted = 0_u64;
for event_type in [
ReceiptEventType::DagdbApprovalRequestSubmitted,
ReceiptEventType::DagdbApprovalGranted,
] {
let receipt_body = json!({
"route_name": CONTEXT_PACKET_ROUTE_NAME,
"packet_id": record.packet_id,
"route_id": record.route_id,
"idempotency_key": record.idempotency_key,
"source": "context_packet_persistence_adapter",
});
let event_body_hash =
hash_event_body(&receipt_body).map_err(ContextPacketPostgresError::ReceiptHash)?;
inserted = inserted.saturating_add(
insert_operational_receipt_in_transaction(
tx,
OperationalReceiptInsert {
tenant_id: &record.tenant_id,
namespace: &record.memory_namespace,
subject_kind: SubjectKind::ContextPacket,
subject_id: operational_receipt_subject_id(
CONTEXT_PACKET_ROUTE_NAME,
&record.packet_id,
event_type,
),
event_type,
actor_did: CONTEXT_PACKET_AUDIT_ACTOR_DID,
event_hlc: CREATED_AT,
event_body_hash,
receipt_body,
},
)
.await?,
);
}
Ok(inserted)
}
async fn insert_context_packet_record_accepted_receipt(
tx: &mut Transaction<'_, Postgres>,
record: &ContextPacketRecord,
) -> Result<u64, ContextPacketPostgresError> {
if record.production_default_route_approval_status != "accepted"
|| record.packet_quality_review_status != "accepted"
{
return Ok(0);
}
let event_type = ReceiptEventType::DagdbRecordAccepted;
let receipt_body = json!({
"route_name": CONTEXT_PACKET_ROUTE_NAME,
"packet_id": record.packet_id,
"idempotency_key": record.idempotency_key,
"source": "context_packet_persistence_adapter",
});
let event_body_hash =
hash_event_body(&receipt_body).map_err(ContextPacketPostgresError::ReceiptHash)?;
insert_operational_receipt_in_transaction(
tx,
OperationalReceiptInsert {
tenant_id: &record.tenant_id,
namespace: &record.memory_namespace,
subject_kind: SubjectKind::ContextPacket,
subject_id: operational_receipt_subject_id(
CONTEXT_PACKET_ROUTE_NAME,
&record.packet_id,
event_type,
),
event_type,
actor_did: CONTEXT_PACKET_AUDIT_ACTOR_DID,
event_hlc: CREATED_AT,
event_body_hash,
receipt_body,
},
)
.await
.map_err(ContextPacketPostgresError::Receipt)
}
#[derive(Debug, thiserror::Error)]
pub enum ContextPacketPostgresError {
#[error("context_packet_contract_failed")]
Contract(#[source] ContextPacketError),
#[error("context_packet_json_failed")]
Json(#[source] serde_json::Error),
#[error("context_packet_receipt_hash_failed")]
ReceiptHash(#[source] crate::scoring::DomainError),
#[error("context_packet_receipt_failed")]
Receipt(#[from] ReceiptStoreError),
#[error("context_packet_unsafe_replay: {packet_id}")]
UnsafeReplay {
packet_id: String,
},
#[error("context_packet_sql_failed")]
Sqlx(#[source] sqlx::Error),
}
fn serde_label<T: serde::Serialize>(value: &T) -> Result<String, ContextPacketPostgresError> {
let label = serde_json::to_value(value).map_err(ContextPacketPostgresError::Json)?;
label
.as_str()
.map(ToOwned::to_owned)
.ok_or_else(|| ContextPacketPostgresError::Json(label_to_string_error()))
}
fn label_to_string_error() -> serde_json::Error {
serde::de::Error::custom("enum label did not serialize to a string")
}