use std::fmt;
use std::str::FromStr;
use rusqlite::{params, Connection, OptionalExtension, Transaction};
use crate::error::{Error, Result};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DispatchStatus {
Dispatched,
SendFailed,
}
impl fmt::Display for DispatchStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match self {
DispatchStatus::Dispatched => "dispatched",
DispatchStatus::SendFailed => "send_failed",
};
f.write_str(s)
}
}
impl FromStr for DispatchStatus {
type Err = Error;
fn from_str(s: &str) -> Result<Self> {
match s {
"dispatched" => Ok(DispatchStatus::Dispatched),
"send_failed" => Ok(DispatchStatus::SendFailed),
other => Err(Error::InvalidEvent(format!(
"unknown dispatch status: {other}"
))),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EscalationDispatch {
pub dispatch_id: String,
pub dispute_id: String,
pub session_id: Option<String>,
pub handoff_event_id: i64,
pub target_solver: String,
pub dispatched_at: i64,
pub created_at: i64,
pub status: DispatchStatus,
pub fallback_broadcast: bool,
}
pub fn insert_dispatch(tx: &Transaction<'_>, row: &EscalationDispatch) -> Result<()> {
tx.execute(
"INSERT INTO escalation_dispatches (
dispatch_id, dispute_id, session_id, handoff_event_id,
target_solver, dispatched_at, created_at, status, fallback_broadcast
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
params![
row.dispatch_id,
row.dispute_id,
row.session_id,
row.handoff_event_id,
row.target_solver,
row.dispatched_at,
row.created_at,
row.status.to_string(),
row.fallback_broadcast as i64,
],
)?;
Ok(())
}
pub fn find_dispatch_by_handoff_event_id(
conn: &Connection,
handoff_event_id: i64,
) -> Result<Option<EscalationDispatch>> {
let tuple = conn
.query_row(
"SELECT dispatch_id, dispute_id, session_id, handoff_event_id,
target_solver, dispatched_at, created_at, status, fallback_broadcast
FROM escalation_dispatches
WHERE handoff_event_id = ?1
LIMIT 1",
params![handoff_event_id],
|r| {
let status_s: String = r.get(7)?;
Ok((
r.get::<_, String>(0)?,
r.get::<_, String>(1)?,
r.get::<_, Option<String>>(2)?,
r.get::<_, i64>(3)?,
r.get::<_, String>(4)?,
r.get::<_, i64>(5)?,
r.get::<_, i64>(6)?,
status_s,
r.get::<_, i64>(8)? != 0,
))
},
)
.optional()?;
let Some(tuple) = tuple else {
return Ok(None);
};
let status = DispatchStatus::from_str(&tuple.7)?;
Ok(Some(EscalationDispatch {
dispatch_id: tuple.0,
dispute_id: tuple.1,
session_id: tuple.2,
handoff_event_id: tuple.3,
target_solver: tuple.4,
dispatched_at: tuple.5,
created_at: tuple.6,
status,
fallback_broadcast: tuple.8,
}))
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PendingHandoff {
pub handoff_event_id: i64,
pub session_id: Option<String>,
pub payload_json: String,
pub prompt_bundle_id: Option<String>,
pub policy_hash: Option<String>,
pub occurred_at: i64,
}
pub fn list_pending_handoffs(conn: &Connection, limit: i64) -> Result<Vec<PendingHandoff>> {
let clamped_limit = limit.max(1);
let mut stmt = conn.prepare(
"SELECT me.id, me.session_id, me.payload_json,
me.prompt_bundle_id, me.policy_hash, me.occurred_at
FROM mediation_events me
LEFT JOIN escalation_dispatches d
ON d.handoff_event_id = me.id
WHERE me.kind = 'handoff_prepared'
AND d.dispatch_id IS NULL
AND NOT EXISTS (
SELECT 1 FROM mediation_events e2
WHERE e2.kind = 'escalation_dispatch_parse_failed'
AND e2.id <> me.id
AND json_extract(e2.payload_json, '$.handoff_event_id') = me.id
)
ORDER BY me.id ASC
LIMIT ?1",
)?;
let rows = stmt
.query_map(params![clamped_limit], |r| {
Ok(PendingHandoff {
handoff_event_id: r.get::<_, i64>(0)?,
session_id: r.get::<_, Option<String>>(1)?,
payload_json: r.get::<_, String>(2)?,
prompt_bundle_id: r.get::<_, Option<String>>(3)?,
policy_hash: r.get::<_, Option<String>>(4)?,
occurred_at: r.get::<_, i64>(5)?,
})
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(rows)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::migrations::run_migrations;
use crate::db::open_in_memory;
fn seed_parent_rows(conn: &Connection) -> i64 {
conn.execute(
"INSERT INTO disputes (
dispute_id, event_id, mostro_pubkey, initiator_role,
dispute_status, event_timestamp, detected_at, lifecycle_state
) VALUES ('d-1', 'evt-1', 'mostro', 'buyer',
'initiated', 10, 11, 'notified')",
[],
)
.unwrap();
conn.execute(
"INSERT INTO mediation_sessions (
session_id, dispute_id, state, round_count,
prompt_bundle_id, policy_hash,
started_at, last_transition_at
) VALUES ('sess-1', 'd-1', 'escalation_recommended', 0,
'phase3-default', 'test-hash', 100, 100)",
[],
)
.unwrap();
conn.query_row(
"INSERT INTO mediation_events (session_id, kind, payload_json, occurred_at)
VALUES (NULL, 'handoff_prepared', '{\"dispute_id\":\"d-1\"}', 100)
RETURNING id",
[],
|r| r.get::<_, i64>(0),
)
.unwrap()
}
#[test]
fn status_roundtrips_display_and_from_str() {
for (token, variant) in [
("dispatched", DispatchStatus::Dispatched),
("send_failed", DispatchStatus::SendFailed),
] {
let parsed: DispatchStatus = token.parse().unwrap();
assert_eq!(parsed, variant);
assert_eq!(parsed.to_string(), token);
}
}
#[test]
fn status_from_str_rejects_unknown_token() {
let err = DispatchStatus::from_str("bogus").unwrap_err();
match err {
Error::InvalidEvent(msg) => {
assert!(
msg.contains("bogus"),
"error message should include the bad token: {msg}"
);
}
other => panic!("expected InvalidEvent, got {other:?}"),
}
}
#[test]
fn insert_and_lookup_dispatched_row() {
let mut conn = open_in_memory().unwrap();
run_migrations(&mut conn).unwrap();
let handoff_event_id = seed_parent_rows(&conn);
let row = EscalationDispatch {
dispatch_id: "dispatch-a".to_string(),
dispute_id: "d-1".to_string(),
session_id: None,
handoff_event_id,
target_solver: "solver-pk".to_string(),
dispatched_at: 200,
created_at: 200,
status: DispatchStatus::Dispatched,
fallback_broadcast: false,
};
let tx = conn.transaction().unwrap();
insert_dispatch(&tx, &row).unwrap();
tx.commit().unwrap();
let got = find_dispatch_by_handoff_event_id(&conn, handoff_event_id)
.unwrap()
.expect("row must exist after insert");
assert_eq!(got, row);
}
#[test]
fn insert_and_lookup_send_failed_row_with_broadcast() {
let mut conn = open_in_memory().unwrap();
run_migrations(&mut conn).unwrap();
let handoff_event_id = seed_parent_rows(&conn);
let row = EscalationDispatch {
dispatch_id: "dispatch-b".to_string(),
dispute_id: "d-1".to_string(),
session_id: Some("sess-1".to_string()),
handoff_event_id,
target_solver: "pk-1,pk-2,pk-3".to_string(),
dispatched_at: 300,
created_at: 300,
status: DispatchStatus::SendFailed,
fallback_broadcast: true,
};
let tx = conn.transaction().unwrap();
insert_dispatch(&tx, &row).unwrap();
tx.commit().unwrap();
let got = find_dispatch_by_handoff_event_id(&conn, handoff_event_id)
.unwrap()
.expect("row must exist after insert");
assert_eq!(got, row);
assert!(got.fallback_broadcast);
assert_eq!(got.status, DispatchStatus::SendFailed);
}
#[test]
fn lookup_returns_none_when_no_row_exists() {
let mut conn = open_in_memory().unwrap();
run_migrations(&mut conn).unwrap();
let handoff_event_id = seed_parent_rows(&conn);
let got = find_dispatch_by_handoff_event_id(&conn, handoff_event_id).unwrap();
assert!(got.is_none(), "no row should exist before insert");
}
#[test]
fn lookup_propagates_db_errors_instead_of_swallowing_them() {
let conn = open_in_memory().unwrap();
let err = find_dispatch_by_handoff_event_id(&conn, 42)
.expect_err("missing table must surface as Err, not Ok(None)");
match err {
Error::Db(rusqlite::Error::SqliteFailure(_, Some(msg))) => {
assert!(
msg.contains("no such table"),
"expected 'no such table' SQLite failure; got {msg}"
);
}
Error::Db(rusqlite::Error::SqlInputError { msg, .. }) => {
assert!(
msg.contains("no such table"),
"expected 'no such table' in SQL input error; got {msg}"
);
}
other => panic!(
"expected Error::Db(SqliteFailure | SqlInputError) for missing table, got {other:?}"
),
}
}
fn seed_handoff_event(conn: &Connection, _dispute_id: &str, payload: &str) -> i64 {
conn.query_row(
"INSERT INTO mediation_events (
session_id, kind, payload_json, prompt_bundle_id, policy_hash, occurred_at
) VALUES (NULL, 'handoff_prepared', ?1, 'phase3-default', 'hash-1', ?2)
RETURNING id",
params![payload, 100],
|r| r.get::<_, i64>(0),
)
.unwrap()
}
fn seed_dispute_row(conn: &Connection, dispute_id: &str) {
conn.execute(
"INSERT INTO disputes (
dispute_id, event_id, mostro_pubkey, initiator_role,
dispute_status, event_timestamp, detected_at, lifecycle_state
) VALUES (?1, ?2, 'mostro', 'buyer', 'initiated', 10, 11, 'notified')",
params![dispute_id, format!("evt-{dispute_id}")],
)
.unwrap();
}
#[test]
fn list_pending_handoffs_returns_empty_when_no_handoff_events_exist() {
let mut conn = open_in_memory().unwrap();
run_migrations(&mut conn).unwrap();
seed_dispute_row(&conn, "d-nohand");
conn.execute(
"INSERT INTO mediation_events (session_id, kind, payload_json, occurred_at)
VALUES (NULL, 'reasoning_verdict', '{}', 100)",
[],
)
.unwrap();
let pending = list_pending_handoffs(&conn, 100).unwrap();
assert!(
pending.is_empty(),
"only handoff_prepared rows should come back; got {pending:?}"
);
}
#[test]
fn list_pending_handoffs_returns_ascending_by_id() {
let mut conn = open_in_memory().unwrap();
run_migrations(&mut conn).unwrap();
seed_dispute_row(&conn, "d-1");
seed_dispute_row(&conn, "d-2");
seed_dispute_row(&conn, "d-3");
let id1 = seed_handoff_event(&conn, "d-1", "{\"dispute_id\":\"d-1\"}");
let id2 = seed_handoff_event(&conn, "d-2", "{\"dispute_id\":\"d-2\"}");
let id3 = seed_handoff_event(&conn, "d-3", "{\"dispute_id\":\"d-3\"}");
let pending = list_pending_handoffs(&conn, 100).unwrap();
let ids: Vec<i64> = pending.iter().map(|p| p.handoff_event_id).collect();
assert_eq!(
ids,
vec![id1, id2, id3],
"ascending id order required so the dispatcher processes oldest first"
);
assert!(pending[0].payload_json.contains("d-1"));
assert_eq!(
pending[0].prompt_bundle_id.as_deref(),
Some("phase3-default"),
"prompt bundle pin must flow through so Phase 4 audit rows can copy it"
);
}
#[test]
fn list_pending_handoffs_filters_already_dispatched() {
let mut conn = open_in_memory().unwrap();
run_migrations(&mut conn).unwrap();
seed_dispute_row(&conn, "d-consumed");
seed_dispute_row(&conn, "d-fresh");
let consumed_id =
seed_handoff_event(&conn, "d-consumed", "{\"dispute_id\":\"d-consumed\"}");
let fresh_id = seed_handoff_event(&conn, "d-fresh", "{\"dispute_id\":\"d-fresh\"}");
let tx = conn.transaction().unwrap();
insert_dispatch(
&tx,
&EscalationDispatch {
dispatch_id: "dispatch-consumed".to_string(),
dispute_id: "d-consumed".to_string(),
session_id: None,
handoff_event_id: consumed_id,
target_solver: "solver-pk".to_string(),
dispatched_at: 200,
created_at: 200,
status: DispatchStatus::Dispatched,
fallback_broadcast: false,
},
)
.unwrap();
tx.commit().unwrap();
let pending = list_pending_handoffs(&conn, 100).unwrap();
let ids: Vec<i64> = pending.iter().map(|p| p.handoff_event_id).collect();
assert_eq!(
ids,
vec![fresh_id],
"already-dispatched handoff must be filtered out; only the fresh one remains"
);
}
#[test]
fn list_pending_handoffs_filters_parse_failed() {
let mut conn = open_in_memory().unwrap();
run_migrations(&mut conn).unwrap();
seed_dispute_row(&conn, "d-malformed");
seed_dispute_row(&conn, "d-fresh");
let malformed_id = seed_handoff_event(&conn, "d-malformed", "not valid json");
let fresh_id = seed_handoff_event(&conn, "d-fresh", "{\"dispute_id\":\"d-fresh\"}");
conn.execute(
"INSERT INTO mediation_events (
session_id, kind, payload_json, occurred_at
) VALUES (NULL, 'escalation_dispatch_parse_failed',
?1, 200)",
params![format!(
r#"{{"dispute_id":"d-malformed","handoff_event_id":{malformed_id},"reason":"deserialize_failed","detail":"bad"}}"#
)],
)
.unwrap();
let pending = list_pending_handoffs(&conn, 100).unwrap();
let ids: Vec<i64> = pending.iter().map(|p| p.handoff_event_id).collect();
assert_eq!(
ids,
vec![fresh_id],
"parse-failed handoff must not re-surface; only the fresh one remains"
);
}
#[test]
fn list_pending_handoffs_respects_limit() {
let mut conn = open_in_memory().unwrap();
run_migrations(&mut conn).unwrap();
seed_dispute_row(&conn, "d-a");
seed_dispute_row(&conn, "d-b");
seed_dispute_row(&conn, "d-c");
seed_handoff_event(&conn, "d-a", "{\"dispute_id\":\"d-a\"}");
seed_handoff_event(&conn, "d-b", "{\"dispute_id\":\"d-b\"}");
seed_handoff_event(&conn, "d-c", "{\"dispute_id\":\"d-c\"}");
let pending = list_pending_handoffs(&conn, 2).unwrap();
assert_eq!(
pending.len(),
2,
"limit=2 must cap the batch so a restart backlog cannot starve other tasks"
);
}
#[test]
fn list_pending_handoffs_clamps_nonpositive_limit() {
let mut conn = open_in_memory().unwrap();
run_migrations(&mut conn).unwrap();
seed_dispute_row(&conn, "d-a");
seed_dispute_row(&conn, "d-b");
seed_dispute_row(&conn, "d-c");
seed_handoff_event(&conn, "d-a", "{\"dispute_id\":\"d-a\"}");
seed_handoff_event(&conn, "d-b", "{\"dispute_id\":\"d-b\"}");
seed_handoff_event(&conn, "d-c", "{\"dispute_id\":\"d-c\"}");
for bogus_limit in [0_i64, -1, i64::MIN] {
let pending = list_pending_handoffs(&conn, bogus_limit).unwrap();
assert_eq!(
pending.len(),
1,
"limit={bogus_limit} must clamp to 1 (got {} rows)",
pending.len()
);
}
}
}