pub mod consumer;
pub mod dispatcher;
pub mod router;
pub mod tracker;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use nostr_sdk::prelude::{Client, Keys};
use tokio::sync::Mutex as AsyncMutex;
use tracing::{debug, error, info, warn};
use crate::db::disputes::get_dispute;
use crate::db::escalation_dispatches::PendingHandoff;
use crate::mediation::escalation::HandoffPackage;
use crate::models::{EscalationConfig, LifecycleState, SolverConfig};
use self::dispatcher::{build_dm_body, send_to_recipients};
use self::router::{resolve_recipients, Recipients};
const SCAN_BATCH_LIMIT: i64 = 128;
pub async fn run_dispatcher(
conn: Arc<AsyncMutex<rusqlite::Connection>>,
client: Client,
serbero_keys: Keys,
solvers: Vec<SolverConfig>,
cfg: EscalationConfig,
) {
let interval_secs = cfg.dispatch_interval_seconds.max(1);
info!(
dispatch_interval_seconds = interval_secs,
fallback_to_all_solvers = cfg.fallback_to_all_solvers,
"phase4_dispatcher_loop_started"
);
let mut ticker = tokio::time::interval(Duration::from_secs(interval_secs));
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
ticker.tick().await;
loop {
ticker.tick().await;
debug!("phase4_dispatcher_tick");
if let Err(e) = run_once(&conn, &client, &serbero_keys, &solvers, &cfg).await {
error!(error = %e, "phase4_dispatcher_cycle_failed");
}
}
}
pub async fn run_once(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
client: &Client,
serbero_keys: &Keys,
solvers: &[SolverConfig],
cfg: &EscalationConfig,
) -> crate::error::Result<()> {
let pending = consumer::scan_pending(conn, SCAN_BATCH_LIMIT).await?;
if pending.is_empty() {
return Ok(());
}
debug!(count = pending.len(), "phase4_cycle_pending");
for handoff in pending {
process_one(conn, client, serbero_keys, solvers, cfg, handoff).await;
}
Ok(())
}
async fn process_one(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
client: &Client,
serbero_keys: &Keys,
solvers: &[SolverConfig],
cfg: &EscalationConfig,
handoff: PendingHandoff,
) {
let pkg: HandoffPackage = match serde_json::from_str(&handoff.payload_json) {
Ok(p) => p,
Err(e) => {
warn!(
handoff_event_id = handoff.handoff_event_id,
error = %e,
"phase4_dispatch: handoff payload deserialize failed (T028 handler not yet live)"
);
return;
}
};
let metadata = match dispute_metadata(conn, &pkg.dispute_id).await {
Ok(md) => md,
Err(e) => {
warn!(
dispute_id = %pkg.dispute_id,
error = %e,
"phase4_dispatch: dispute lookup failed (T028 orphan handler not yet live)"
);
return;
}
};
if metadata.lifecycle_state == LifecycleState::Resolved {
info!(
dispute_id = %pkg.dispute_id,
handoff_event_id = handoff.handoff_event_id,
"phase4_superseded — dispute already resolved; skipping dispatch"
);
if let Err(e) =
tracker::record_supersession(conn, &handoff, &pkg.dispute_id, current_unix_seconds())
.await
{
error!(
dispute_id = %pkg.dispute_id,
handoff_event_id = handoff.handoff_event_id,
error = %e,
"phase4_dispatch: record_supersession failed; handoff remains unconsumed"
);
}
return;
}
let assigned_solver: Option<String> = metadata.assigned_solver;
let recipients = resolve_recipients(
solvers,
assigned_solver.as_deref(),
cfg.fallback_to_all_solvers,
);
let (pubkeys, via_fallback) = match recipients {
Recipients::Targeted(pk) => (vec![pk], false),
Recipients::Broadcast {
pubkeys,
via_fallback,
} => (pubkeys, via_fallback),
Recipients::Unroutable => {
error!(
dispute_id = %pkg.dispute_id,
handoff_event_id = handoff.handoff_event_id,
"phase4_dispatch: no Write-permission solvers configured and \
fallback_to_all_solvers = false; handoff remains unconsumed \
(T022 handler will add escalation_dispatch_unroutable audit row)"
);
return;
}
};
let body = build_dm_body(&pkg);
let now = current_unix_seconds();
let outcome = match send_to_recipients(
conn,
client,
serbero_keys,
&pkg.dispute_id,
&pubkeys,
&body,
now,
)
.await
{
Ok(o) => o,
Err(e) => {
error!(
dispute_id = %pkg.dispute_id,
handoff_event_id = handoff.handoff_event_id,
error = %e,
"phase4_dispatch: send loop errored; handoff remains unconsumed"
);
return;
}
};
if let Err(e) = tracker::record_successful_dispatch(
conn,
&handoff,
&pkg.dispute_id,
&outcome,
via_fallback,
now,
)
.await
{
error!(
dispute_id = %pkg.dispute_id,
handoff_event_id = handoff.handoff_event_id,
error = %e,
"phase4_dispatch: record_successful_dispatch failed AFTER send; \
next cycle will re-dispatch per the at-least-once semantics"
);
} else {
info!(
dispute_id = %pkg.dispute_id,
handoff_event_id = handoff.handoff_event_id,
recipients = pubkeys.len(),
via_fallback,
"phase4_dispatched"
);
}
}
#[derive(Debug)]
struct DisputeMetadata {
assigned_solver: Option<String>,
lifecycle_state: LifecycleState,
}
async fn dispute_metadata(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
dispute_id: &str,
) -> crate::error::Result<DisputeMetadata> {
let guard = conn.lock().await;
let d = get_dispute(&guard, dispute_id)?.ok_or_else(|| {
crate::error::Error::InvalidEvent(format!(
"phase4_dispatch: handoff references unknown dispute {dispute_id} \
(T028 orphan_dispute_reference handler not yet live)"
))
})?;
Ok(DisputeMetadata {
assigned_solver: d.assigned_solver,
lifecycle_state: d.lifecycle_state,
})
}
fn current_unix_seconds() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::migrations::run_migrations;
use crate::db::open_in_memory;
use crate::mediation::escalation::HandoffPackage;
use crate::models::SolverPermission;
use nostr_sdk::Keys;
use rusqlite::params;
async fn fresh_conn() -> Arc<AsyncMutex<rusqlite::Connection>> {
let mut c = open_in_memory().unwrap();
run_migrations(&mut c).unwrap();
Arc::new(AsyncMutex::new(c))
}
fn solver(pk: &str, perm: SolverPermission) -> SolverConfig {
SolverConfig {
pubkey: pk.to_string(),
permission: perm,
}
}
fn sample_cfg(interval_secs: u64, fallback: bool) -> EscalationConfig {
EscalationConfig {
enabled: true,
dispatch_interval_seconds: interval_secs,
fallback_to_all_solvers: fallback,
}
}
async fn seed_handoff_for_dispute(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
dispute_id: &str,
assigned_solver: Option<&str>,
pkg: &HandoffPackage,
) -> i64 {
let c = conn.lock().await;
c.execute(
"INSERT INTO disputes (
dispute_id, event_id, mostro_pubkey, initiator_role,
dispute_status, event_timestamp, detected_at, lifecycle_state,
assigned_solver
) VALUES (?1, ?2, 'mostro', 'buyer', 'initiated', 10, 11, 'notified', ?3)",
params![dispute_id, format!("evt-{dispute_id}"), assigned_solver],
)
.unwrap();
let payload = serde_json::to_string(pkg).unwrap();
c.query_row(
"INSERT INTO mediation_events (
session_id, kind, payload_json,
prompt_bundle_id, policy_hash, occurred_at
) VALUES (NULL, 'handoff_prepared', ?1,
'phase3-default', 'hash-1', 100)
RETURNING id",
params![payload],
|r| r.get::<_, i64>(0),
)
.unwrap()
}
fn sample_package(dispute_id: &str) -> HandoffPackage {
HandoffPackage {
dispute_id: dispute_id.to_string(),
session_id: None,
trigger: "conflicting_claims".to_string(),
evidence_refs: Vec::new(),
prompt_bundle_id: "phase3-default".to_string(),
policy_hash: "hash-1".to_string(),
rationale_refs: Vec::new(),
assembled_at: 100,
}
}
#[tokio::test]
async fn empty_pending_set_is_cycle_noop() {
let conn = fresh_conn().await;
let keys = Keys::generate();
let client = nostr_sdk::Client::new(keys.clone());
let cfg = sample_cfg(1, false);
run_once(&conn, &client, &keys, &[], &cfg).await.unwrap();
let count: i64 = {
let c = conn.lock().await;
c.query_row("SELECT COUNT(*) FROM escalation_dispatches", [], |r| {
r.get(0)
})
.unwrap()
};
assert_eq!(count, 0);
}
#[tokio::test]
async fn handoff_with_no_write_solvers_and_fallback_off_stays_unconsumed() {
let conn = fresh_conn().await;
let keys = Keys::generate();
let client = nostr_sdk::Client::new(keys.clone());
let pkg = sample_package("d-us3");
seed_handoff_for_dispute(&conn, "d-us3", None, &pkg).await;
let solvers = vec![solver("pk-r1", SolverPermission::Read)];
let cfg = sample_cfg(1, false);
run_once(&conn, &client, &keys, &solvers, &cfg)
.await
.unwrap();
let dispatches: i64 = {
let c = conn.lock().await;
c.query_row("SELECT COUNT(*) FROM escalation_dispatches", [], |r| {
r.get(0)
})
.unwrap()
};
assert_eq!(
dispatches, 0,
"unroutable handoff must not create a dispatch row"
);
let events: i64 = {
let c = conn.lock().await;
c.query_row(
"SELECT COUNT(*) FROM mediation_events
WHERE kind IN ('escalation_dispatched',
'escalation_dispatch_unroutable')",
[],
|r| r.get(0),
)
.unwrap()
};
assert_eq!(
events, 0,
"T022 handler not live yet; no unroutable audit row should fire — \
handoff stays in the pending set for a future cycle to pick up"
);
}
}