#![allow(missing_docs)]
use std::sync::Arc;
use chrono::{DateTime, Utc};
use mempill_types::{
AgentId, Disposition, LedgerEntry, LedgerEventKind, TransactionTime,
};
use crate::{
engine_handle::ErasedPendingStore,
error::MemError,
ports::{
pending_adjudication::{OrphanedQueuedClaim, PendingAdjudicationRow},
PersistencePort,
},
};
pub struct SweepAdjudicationsUseCase<P>
where
P: PersistencePort + Send + Sync + 'static,
{
persistence: Arc<P>,
pending_store: Arc<dyn ErasedPendingStore>,
}
impl<P> SweepAdjudicationsUseCase<P>
where
P: PersistencePort + Send + Sync + 'static,
{
pub fn new(persistence: Arc<P>, pending_store: Arc<dyn ErasedPendingStore>) -> Self {
Self { persistence, pending_store }
}
pub fn revert_expired_row(
&self,
row: &PendingAdjudicationRow,
now: DateTime<Utc>,
) -> Result<bool, MemError> {
let tx_time = TransactionTime(now);
let agent_id: AgentId = row.agent_id.clone();
let challenger_ref = row.challenger_claim_ref.clone();
let handle_id = row.handle_id;
let ledger = self.persistence
.load_ledger(&agent_id, None, 10_000)
.map_err(|e| MemError::Persistence { source: Box::new(e) })?;
let challenger_disp = latest_disposition_from_ledger(&ledger, &challenger_ref);
if challenger_disp != Some(Disposition::QueuedForAdjudication) {
return Ok(false);
}
let mut txn = self.persistence
.begin_atomic(&agent_id)
.map_err(|e| MemError::Persistence { source: Box::new(e) })?;
let contested_entry = LedgerEntry {
entry_id: uuid::Uuid::new_v4(),
agent_id: agent_id.clone(),
claim_ref: challenger_ref.clone(),
event_kind: LedgerEventKind::AdjudicationExpired,
disposition: Disposition::Contested,
rationale: Some(serde_json::json!({
"event": "adjudication_ttl_expired",
"handle_id": handle_id.to_string(),
"expired_at": now.to_rfc3339(),
"incumbent_claim_ref": row.incumbent_claim_ref.0.to_string(),
})),
recorded_at: tx_time,
};
let write_result = self.persistence
.append_ledger_entry(&mut txn, &contested_entry)
.map_err(|e| MemError::Persistence { source: Box::new(e) });
match write_result {
Ok(()) => {
self.persistence
.commit(txn)
.map_err(|e| MemError::Persistence { source: Box::new(e) })?;
self.pending_store
.mark_expired_erased(handle_id)
.map_err(|e| MemError::PendingStore { source: e })?;
Ok(true)
}
Err(e) => {
let _ = self.persistence.rollback(txn);
Err(e)
}
}
}
pub fn revert_orphan(
&self,
orphan: &OrphanedQueuedClaim,
now: DateTime<Utc>,
) -> Result<bool, MemError> {
let tx_time = TransactionTime(now);
let agent_id: AgentId = orphan.agent_id.clone();
let challenger_ref = orphan.challenger_claim_ref.clone();
let ledger = self.persistence
.load_ledger(&agent_id, None, 10_000)
.map_err(|e| MemError::Persistence { source: Box::new(e) })?;
let challenger_disp = latest_disposition_from_ledger(&ledger, &challenger_ref);
if challenger_disp != Some(Disposition::QueuedForAdjudication) {
return Ok(false);
}
let mut txn = self.persistence
.begin_atomic(&agent_id)
.map_err(|e| MemError::Persistence { source: Box::new(e) })?;
let incumbent_ref_str = orphan.incumbent_claim_ref
.as_ref()
.map(|r| r.0.to_string())
.unwrap_or_else(|| "unknown".to_string());
let contested_entry = LedgerEntry {
entry_id: uuid::Uuid::new_v4(),
agent_id: agent_id.clone(),
claim_ref: challenger_ref.clone(),
event_kind: LedgerEventKind::AdjudicationExpired,
disposition: Disposition::Contested,
rationale: Some(serde_json::json!({
"event": "orphaned_queued_claim_recovery",
"reason": "QueuedForAdjudication claim had no matching pending_adjudications row",
"recovered_at": now.to_rfc3339(),
"incumbent_claim_ref": incumbent_ref_str,
"subject": orphan.subject,
"predicate": orphan.predicate,
})),
recorded_at: tx_time,
};
let write_result = self.persistence
.append_ledger_entry(&mut txn, &contested_entry)
.map_err(|e| MemError::Persistence { source: Box::new(e) });
match write_result {
Ok(()) => {
self.persistence
.commit(txn)
.map_err(|e| MemError::Persistence { source: Box::new(e) })?;
Ok(true)
}
Err(e) => {
let _ = self.persistence.rollback(txn);
Err(e)
}
}
}
}
fn latest_disposition_from_ledger(
ledger: &[mempill_types::LedgerEntry],
target: &mempill_types::ClaimRef,
) -> Option<Disposition> {
ledger
.iter()
.filter(|e| &e.claim_ref == target)
.max_by_key(|e| e.recorded_at.0)
.map(|e| e.disposition.clone())
}