use std::sync::Arc;
use rusqlite::params;
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex as AsyncMutex;
use tracing::{debug, info, instrument, warn};
use crate::db;
use crate::db::mediation_events::MediationEventKind;
use crate::error::{Error, Result};
use crate::models::mediation::EscalationTrigger;
#[cfg(test)]
use crate::models::mediation::MediationSessionState;
const ESCALATABLE_STATES: &[&str] = &[
"opening",
"awaiting_response",
"classified",
"follow_up_pending",
"summary_pending",
];
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HandoffPackage {
pub dispute_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub session_id: Option<String>,
pub trigger: String,
pub evidence_refs: Vec<String>,
pub prompt_bundle_id: String,
pub policy_hash: String,
pub rationale_refs: Vec<String>,
pub assembled_at: i64,
}
pub struct RecommendParams<'a> {
pub conn: &'a Arc<AsyncMutex<rusqlite::Connection>>,
pub session_id: Option<&'a str>,
pub dispute_id: &'a str,
pub trigger: EscalationTrigger,
pub evidence_refs: Vec<String>,
pub rationale_refs: Vec<String>,
pub prompt_bundle_id: &'a str,
pub policy_hash: &'a str,
}
#[instrument(skip_all, fields(
session_id = params.session_id.unwrap_or("<none>"),
dispute_id = %params.dispute_id,
trigger = %params.trigger,
))]
pub async fn recommend(params: RecommendParams<'_>) -> Result<()> {
let RecommendParams {
conn,
session_id,
dispute_id,
trigger,
evidence_refs,
rationale_refs,
prompt_bundle_id,
policy_hash,
} = params;
let now = super::current_ts_secs()?;
let mut guard = conn.lock().await;
if let Some(sid) = session_id {
let stored: String = match guard.query_row(
"SELECT dispute_id FROM mediation_sessions WHERE session_id = ?1",
params![sid],
|r| r.get::<_, String>(0),
) {
Ok(s) => s,
Err(rusqlite::Error::QueryReturnedNoRows) => {
return Err(Error::InvalidEvent(format!(
"escalation::recommend: no mediation_sessions row for session_id={sid}"
)));
}
Err(e) => return Err(Error::Db(e)),
};
if stored != dispute_id {
return Err(Error::InvalidEvent(format!(
"escalation::recommend: session_id={sid} stored dispute_id={stored} \
does not match caller-supplied dispute_id={dispute_id}"
)));
}
}
let tx = guard.transaction()?;
if let Some(sid) = session_id {
let placeholders = (0..ESCALATABLE_STATES.len())
.map(|i| format!("?{}", i + 3))
.collect::<Vec<_>>()
.join(", ");
let update_sql = format!(
"UPDATE mediation_sessions
SET state = 'escalation_recommended', last_transition_at = ?1
WHERE session_id = ?2 AND state IN ({placeholders})"
);
let mut sql_params: Vec<&dyn rusqlite::ToSql> =
Vec::with_capacity(2 + ESCALATABLE_STATES.len());
sql_params.push(&now);
sql_params.push(&sid);
for s in ESCALATABLE_STATES {
sql_params.push(s);
}
let rows = tx.execute(&update_sql, sql_params.as_slice())?;
if rows == 0 {
let actual: String = tx
.query_row(
"SELECT state FROM mediation_sessions WHERE session_id = ?1",
params![sid],
|r| r.get::<_, String>(0),
)
.unwrap_or_else(|_| "<session row missing>".to_string());
return Err(Error::InvalidStateTransition {
from: actual,
to: "escalation_recommended".to_string(),
});
}
} else {
let already_present: bool = tx.query_row(
"SELECT EXISTS(
SELECT 1 FROM mediation_events
WHERE session_id IS NULL
AND kind = 'escalation_recommended'
AND json_extract(payload_json, '$.dispute_id') = ?1
)",
params![dispute_id],
|r| r.get::<_, bool>(0),
)?;
if already_present {
debug!(
dispute_id = %dispute_id,
trigger = %trigger,
"escalation::recommend: dispute-scoped handoff already present; \
skipping duplicate event writes"
);
tx.commit()?;
return Ok(());
}
}
let escalation_payload = serde_json::json!({
"dispute_id": dispute_id,
"trigger": trigger.to_string(),
"evidence_refs": evidence_refs,
"rationale_refs": rationale_refs,
})
.to_string();
db::mediation_events::record_event(
&tx,
MediationEventKind::EscalationRecommended,
session_id,
&escalation_payload,
None,
Some(prompt_bundle_id),
Some(policy_hash),
now,
)?;
let package = HandoffPackage {
dispute_id: dispute_id.to_string(),
session_id: session_id.map(|s| s.to_string()),
trigger: trigger.to_string(),
evidence_refs,
prompt_bundle_id: prompt_bundle_id.to_string(),
policy_hash: policy_hash.to_string(),
rationale_refs,
assembled_at: now,
};
let handoff_payload = serde_json::to_string(&package).map_err(|e| {
Error::InvalidEvent(format!(
"escalation::recommend: failed to serialize HandoffPackage: {e}"
))
})?;
db::mediation_events::record_event(
&tx,
MediationEventKind::HandoffPrepared,
session_id,
&handoff_payload,
None,
Some(prompt_bundle_id),
Some(policy_hash),
now,
)?;
tx.commit()?;
drop(guard);
warn!(
session_id = session_id.unwrap_or("<none>"),
dispute_id = %dispute_id,
trigger = %trigger,
"escalation_recommended"
);
info!(
session_id = session_id.unwrap_or("<none>"),
dispute_id = %dispute_id,
"handoff_prepared"
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::migrations::run_migrations;
use crate::db::open_in_memory;
#[test]
fn escalatable_states_match_can_transition_to() {
for s in [
MediationSessionState::Opening,
MediationSessionState::AwaitingResponse,
MediationSessionState::Classified,
MediationSessionState::FollowUpPending,
MediationSessionState::SummaryPending,
MediationSessionState::SummaryDelivered,
MediationSessionState::EscalationRecommended,
MediationSessionState::SupersededByHuman,
MediationSessionState::Closed,
] {
let tag = s.to_string();
let sql_allows = ESCALATABLE_STATES.contains(&tag.as_str());
let rust_allows = s.can_transition_to(MediationSessionState::EscalationRecommended);
assert_eq!(
sql_allows, rust_allows,
"drift for {tag}: SQL whitelist says {sql_allows}, \
can_transition_to says {rust_allows}"
);
}
}
fn fresh_conn() -> Arc<AsyncMutex<rusqlite::Connection>> {
let mut conn = open_in_memory().unwrap();
run_migrations(&mut conn).unwrap();
conn.execute(
"INSERT INTO disputes (
dispute_id, event_id, mostro_pubkey, initiator_role,
dispute_status, event_timestamp, detected_at, lifecycle_state
) VALUES ('d-esc', 'e1', 'm1', 'buyer',
'initiated', 1, 2, 'notified')",
[],
)
.unwrap();
conn.execute(
"INSERT INTO mediation_sessions (
session_id, dispute_id, state, round_count,
prompt_bundle_id, policy_hash,
started_at, last_transition_at
) VALUES ('sess-esc', 'd-esc', 'awaiting_response', 0,
'phase3-default', 'test-policy-hash',
100, 100)",
[],
)
.unwrap();
Arc::new(AsyncMutex::new(conn))
}
async fn count_events(conn: &Arc<AsyncMutex<rusqlite::Connection>>, kind: &str) -> i64 {
let c = conn.lock().await;
c.query_row(
"SELECT COUNT(*) FROM mediation_events WHERE session_id = 'sess-esc' AND kind = ?1",
params![kind],
|r| r.get::<_, i64>(0),
)
.unwrap()
}
#[tokio::test]
async fn second_call_on_same_session_is_rejected_without_duplicate_events() {
let conn = fresh_conn();
recommend(RecommendParams {
conn: &conn,
session_id: Some("sess-esc"),
dispute_id: "d-esc",
trigger: EscalationTrigger::LowConfidence,
evidence_refs: Vec::new(),
rationale_refs: Vec::new(),
prompt_bundle_id: "phase3-default",
policy_hash: "test-policy-hash",
})
.await
.expect("first call must succeed");
assert_eq!(count_events(&conn, "escalation_recommended").await, 1);
assert_eq!(count_events(&conn, "handoff_prepared").await, 1);
let err = recommend(RecommendParams {
conn: &conn,
session_id: Some("sess-esc"),
dispute_id: "d-esc",
trigger: EscalationTrigger::RoundLimit,
evidence_refs: Vec::new(),
rationale_refs: Vec::new(),
prompt_bundle_id: "phase3-default",
policy_hash: "test-policy-hash",
})
.await
.expect_err("second call on the same session must fail");
match err {
Error::InvalidStateTransition { from, to } => {
assert_eq!(from, "escalation_recommended");
assert_eq!(to, "escalation_recommended");
}
other => panic!("unexpected error: {other:?}"),
}
assert_eq!(
count_events(&conn, "escalation_recommended").await,
1,
"no duplicate escalation_recommended event in release or debug"
);
assert_eq!(
count_events(&conn, "handoff_prepared").await,
1,
"no duplicate handoff_prepared event in release or debug"
);
}
#[tokio::test]
async fn explicit_rationale_refs_are_preserved_verbatim() {
let conn = fresh_conn();
let event_id = "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad";
let rationale_id = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
recommend(RecommendParams {
conn: &conn,
session_id: Some("sess-esc"),
dispute_id: "d-esc",
trigger: EscalationTrigger::ConflictingClaims,
evidence_refs: vec![event_id.into()],
rationale_refs: vec![rationale_id.into()],
prompt_bundle_id: "phase3-default",
policy_hash: "test-policy-hash",
})
.await
.unwrap();
let payload: String = {
let c = conn.lock().await;
c.query_row(
"SELECT payload_json FROM mediation_events
WHERE session_id='sess-esc' AND kind='handoff_prepared'",
[],
|r| r.get(0),
)
.unwrap()
};
let v: serde_json::Value = serde_json::from_str(&payload).unwrap();
assert_eq!(v["evidence_refs"][0], event_id);
assert_eq!(v["rationale_refs"][0], rationale_id);
}
}