use std::str::FromStr;
use rusqlite::{params, Connection};
use crate::error::Result;
use crate::models::dispute::InitiatorRole;
const ELIGIBLE_LIFECYCLE_STATES_SQL: &str = "('new', 'notified', 'taken', 'waiting')";
pub fn is_eligible_lifecycle(state: crate::models::LifecycleState) -> bool {
use crate::models::LifecycleState::*;
match state {
New | Notified | Taken | Waiting => true,
Escalated | Resolved => false,
}
}
const INELIGIBLE_REASONS_SQL: &str = "\
EXISTS (\
SELECT 1 FROM mediation_sessions s \
WHERE s.dispute_id = ?1 \
AND s.state NOT IN ('closed', 'superseded_by_human')\
) \
OR EXISTS (\
SELECT 1 FROM mediation_events e \
WHERE e.session_id IS NULL \
AND e.kind = 'escalation_recommended' \
AND json_extract(e.payload_json, '$.dispute_id') = ?1\
)";
pub fn is_mediation_eligible(conn: &Connection, dispute_id: &str) -> Result<bool> {
let lifecycle_s: String = match conn.query_row(
"SELECT lifecycle_state FROM disputes WHERE dispute_id = ?1",
params![dispute_id],
|r| r.get::<_, String>(0),
) {
Ok(s) => s,
Err(rusqlite::Error::QueryReturnedNoRows) => {
return Ok(false);
}
Err(e) => return Err(e.into()),
};
let lifecycle = crate::models::LifecycleState::from_str(&lifecycle_s)?;
if !is_eligible_lifecycle(lifecycle) {
return Ok(false);
}
let ineligible: i64 = conn.query_row(
&format!("SELECT CASE WHEN {INELIGIBLE_REASONS_SQL} THEN 1 ELSE 0 END"),
params![dispute_id],
|r| r.get(0),
)?;
Ok(ineligible == 0)
}
pub struct EligibleDispute {
pub dispute_id: String,
pub initiator_role: InitiatorRole,
}
pub fn list_mediation_eligible(conn: &Connection) -> Result<Vec<EligibleDispute>> {
let batch_rejection = INELIGIBLE_REASONS_SQL.replace("?1", "d.dispute_id");
let sql = format!(
"SELECT dispute_id, initiator_role
FROM disputes d
WHERE d.lifecycle_state IN {lifecycle}
AND NOT ({rejection})
ORDER BY d.event_timestamp ASC",
lifecycle = ELIGIBLE_LIFECYCLE_STATES_SQL,
rejection = batch_rejection,
);
let mut stmt = conn.prepare(&sql)?;
let rows = stmt.query_map([], |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)))?;
let mut out = Vec::new();
for row in rows {
let (dispute_id, initiator_role_s) = row?;
match InitiatorRole::from_str(&initiator_role_s) {
Ok(initiator_role) => out.push(EligibleDispute {
dispute_id,
initiator_role,
}),
Err(e) => {
tracing::warn!(
dispute_id = %dispute_id,
role = %initiator_role_s,
error = %e,
"eligibility: skipping dispute with unrecognised initiator_role"
);
}
}
}
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::migrations::run_migrations;
use crate::db::open_in_memory;
use crate::models::LifecycleState;
fn fresh() -> Connection {
let mut conn = open_in_memory().unwrap();
run_migrations(&mut conn).unwrap();
conn
}
fn insert_dispute(conn: &Connection, dispute_id: &str, lifecycle: LifecycleState) {
conn.execute(
"INSERT INTO disputes (
dispute_id, event_id, mostro_pubkey, initiator_role,
dispute_status, event_timestamp, detected_at, lifecycle_state
) VALUES (?1, 'e', 'm', 'buyer', 'initiated', 1, 2, ?2)",
params![dispute_id, lifecycle.to_string()],
)
.unwrap();
}
fn insert_session(conn: &Connection, session_id: &str, dispute_id: &str, state: &str) {
conn.execute(
"INSERT INTO mediation_sessions (
session_id, dispute_id, state, round_count,
prompt_bundle_id, policy_hash,
started_at, last_transition_at
) VALUES (?1, ?2, ?3, 0, 'bundle', 'hash', 100, 100)",
params![session_id, dispute_id, state],
)
.unwrap();
}
#[test]
fn eligible_lifecycle_accepts_every_non_terminal_non_escalated() {
assert!(is_eligible_lifecycle(LifecycleState::New));
assert!(is_eligible_lifecycle(LifecycleState::Notified));
assert!(is_eligible_lifecycle(LifecycleState::Taken));
assert!(is_eligible_lifecycle(LifecycleState::Waiting));
}
#[test]
fn eligible_lifecycle_rejects_resolved_and_escalated() {
assert!(!is_eligible_lifecycle(LifecycleState::Resolved));
assert!(!is_eligible_lifecycle(LifecycleState::Escalated));
}
#[test]
fn unknown_dispute_is_not_eligible() {
let conn = fresh();
assert!(!is_mediation_eligible(&conn, "no-such-dispute").unwrap());
}
#[test]
fn notified_dispute_without_sessions_is_eligible() {
let conn = fresh();
insert_dispute(&conn, "d1", LifecycleState::Notified);
assert!(is_mediation_eligible(&conn, "d1").unwrap());
}
#[test]
fn taken_dispute_without_sessions_is_eligible() {
let conn = fresh();
insert_dispute(&conn, "d-taken", LifecycleState::Taken);
assert!(is_mediation_eligible(&conn, "d-taken").unwrap());
}
#[test]
fn waiting_dispute_is_eligible() {
let conn = fresh();
insert_dispute(&conn, "d-waiting", LifecycleState::Waiting);
assert!(is_mediation_eligible(&conn, "d-waiting").unwrap());
}
#[test]
fn resolved_dispute_is_not_eligible() {
let conn = fresh();
insert_dispute(&conn, "d-resolved", LifecycleState::Resolved);
assert!(!is_mediation_eligible(&conn, "d-resolved").unwrap());
}
#[test]
fn escalated_dispute_is_not_eligible() {
let conn = fresh();
insert_dispute(&conn, "d-esc", LifecycleState::Escalated);
assert!(!is_mediation_eligible(&conn, "d-esc").unwrap());
}
#[test]
fn active_session_blocks_eligibility() {
let conn = fresh();
insert_dispute(&conn, "d-active", LifecycleState::Notified);
insert_session(&conn, "s-active", "d-active", "awaiting_response");
assert!(!is_mediation_eligible(&conn, "d-active").unwrap());
}
#[test]
fn closed_session_does_not_block_eligibility() {
let conn = fresh();
insert_dispute(&conn, "d-closed", LifecycleState::Notified);
insert_session(&conn, "s-closed", "d-closed", "closed");
assert!(is_mediation_eligible(&conn, "d-closed").unwrap());
}
#[test]
fn escalation_recommended_session_blocks_eligibility() {
let conn = fresh();
insert_dispute(&conn, "d-escrec", LifecycleState::Notified);
insert_session(&conn, "s-escrec", "d-escrec", "escalation_recommended");
assert!(!is_mediation_eligible(&conn, "d-escrec").unwrap());
}
#[test]
fn dispute_scoped_escalation_event_blocks_eligibility() {
let conn = fresh();
insert_dispute(&conn, "d-disp-esc", LifecycleState::Notified);
conn.execute(
"INSERT INTO mediation_events (
session_id, kind, payload_json,
rationale_id, prompt_bundle_id, policy_hash, occurred_at
) VALUES (NULL, 'escalation_recommended',
'{\"dispute_id\":\"d-disp-esc\",\"trigger\":\"conflicting_claims\"}',
NULL, NULL, NULL, 500)",
[],
)
.unwrap();
assert!(!is_mediation_eligible(&conn, "d-disp-esc").unwrap());
}
#[test]
fn list_mediation_eligible_returns_only_eligible_in_event_timestamp_order() {
let conn = fresh();
conn.execute(
"INSERT INTO disputes (dispute_id, event_id, mostro_pubkey, initiator_role,
dispute_status, event_timestamp, detected_at, lifecycle_state)
VALUES ('d-old', 'e', 'm', 'buyer', 'initiated', 10, 11, 'notified')",
[],
)
.unwrap();
conn.execute(
"INSERT INTO disputes (dispute_id, event_id, mostro_pubkey, initiator_role,
dispute_status, event_timestamp, detected_at, lifecycle_state)
VALUES ('d-mid', 'e2', 'm', 'buyer', 'initiated', 20, 21, 'waiting')",
[],
)
.unwrap();
conn.execute(
"INSERT INTO disputes (dispute_id, event_id, mostro_pubkey, initiator_role,
dispute_status, event_timestamp, detected_at, lifecycle_state)
VALUES ('d-new', 'e3', 'm', 'buyer', 'initiated', 30, 31, 'resolved')",
[],
)
.unwrap();
let got = list_mediation_eligible(&conn).unwrap();
let ids: Vec<_> = got.into_iter().map(|e| e.dispute_id).collect();
assert_eq!(ids, vec!["d-old".to_string(), "d-mid".to_string()]);
}
}