use std::sync::Arc;
use tokio::sync::Mutex as AsyncMutex;
use tracing::{info, instrument, warn};
use crate::db::mediation_events;
use crate::error::{Error, Result};
use crate::mediation::{eligibility, session};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StartTrigger {
Detected,
TickRetry,
}
impl StartTrigger {
pub fn as_str(&self) -> &'static str {
match self {
StartTrigger::Detected => "detected",
StartTrigger::TickRetry => "tick_retry",
}
}
}
#[derive(Debug)]
pub enum StartOutcome {
NotEligible,
Started(session::OpenOutcome),
StoppedBeforeTake { reason: String },
TakeFailed { reason: String },
Error(Error),
}
pub struct StartParams<'a> {
pub open: session::OpenSessionParams<'a>,
pub trigger: StartTrigger,
}
#[instrument(skip_all, fields(
dispute_id = %params.open.dispute_id,
trigger = params.trigger.as_str(),
))]
pub async fn try_start_for(params: StartParams<'_>) -> StartOutcome {
let StartParams { open, trigger } = params;
let dispute_id = open.dispute_id.to_string();
let conn: Arc<AsyncMutex<rusqlite::Connection>> = Arc::clone(open.conn);
match run_eligibility(&conn, &dispute_id).await {
Ok(true) => {}
Ok(false) => {
write_stop(&conn, &dispute_id, "ineligible").await;
return StartOutcome::NotEligible;
}
Err(e) => {
warn!(error = %e, "try_start_for: eligibility check failed");
return StartOutcome::Error(e);
}
}
if let Err(e) = write_started(&conn, &dispute_id, trigger).await {
warn!(
error = %e,
"try_start_for: failed to write start_attempt_started; aborting attempt"
);
return StartOutcome::Error(e);
}
info!("try_start_for: start attempt recorded; delegating to open_session");
let open_result = session::open_session(open).await;
match open_result {
Ok(outcome) => match &outcome {
session::OpenOutcome::RefusedAuthPending { reason } => {
let reason_str = reason.clone();
write_stop(&conn, &dispute_id, "auth_pending").await;
StartOutcome::StoppedBeforeTake { reason: reason_str }
}
session::OpenOutcome::RefusedAuthTerminated { reason } => {
let reason_str = reason.clone();
write_stop(&conn, &dispute_id, "auth_terminated").await;
StartOutcome::StoppedBeforeTake { reason: reason_str }
}
session::OpenOutcome::RefusedReasoningUnavailable { reason } => {
let reason_str = reason.clone();
write_stop(&conn, &dispute_id, "reasoning_unhealthy").await;
StartOutcome::StoppedBeforeTake { reason: reason_str }
}
session::OpenOutcome::Opened { .. }
| session::OpenOutcome::AlreadyOpen { .. }
| session::OpenOutcome::ReadyForSummary { .. }
| session::OpenOutcome::EscalatedBeforeTake { .. } => StartOutcome::Started(outcome),
},
Err(e) => {
warn!(error = %e, "try_start_for: open_session returned error");
StartOutcome::Error(e)
}
}
}
async fn run_eligibility(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
dispute_id: &str,
) -> Result<bool> {
let guard = conn.lock().await;
eligibility::is_mediation_eligible(&guard, dispute_id)
}
async fn write_started(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
dispute_id: &str,
trigger: StartTrigger,
) -> Result<()> {
let now = super::current_ts_secs()?;
let guard = conn.lock().await;
mediation_events::record_start_attempt_started(&guard, None, dispute_id, trigger.as_str(), now)
.map(|_| ())
}
async fn write_stop(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
dispute_id: &str,
stop_reason: &str,
) {
let now = match super::current_ts_secs() {
Ok(ts) => ts,
Err(e) => {
warn!(error = %e, "try_start_for: clock error; skipping stop audit write");
return;
}
};
let guard = conn.lock().await;
if let Err(e) =
mediation_events::record_start_attempt_stopped(&guard, None, dispute_id, stop_reason, now)
{
warn!(
error = %e,
stop_reason,
"try_start_for: failed to write start_attempt_stopped"
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::migrations::run_migrations;
use crate::db::open_in_memory;
use crate::models::LifecycleState;
use rusqlite::params;
fn ineligible_fixture() -> rusqlite::Connection {
let mut conn = open_in_memory().unwrap();
run_migrations(&mut conn).unwrap();
conn.execute(
"INSERT INTO disputes (
dispute_id, event_id, mostro_pubkey, initiator_role,
dispute_status, event_timestamp, detected_at, lifecycle_state
) VALUES ('d-resolved', 'e1', 'm1', 'buyer',
'initiated', 1, 2, ?1)",
params![LifecycleState::Resolved.to_string()],
)
.unwrap();
conn
}
#[test]
fn start_trigger_strings_match_data_model() {
assert_eq!(StartTrigger::Detected.as_str(), "detected");
assert_eq!(StartTrigger::TickRetry.as_str(), "tick_retry");
}
#[test]
fn ineligible_dispute_records_dispute_scoped_stop_row() {
let conn = ineligible_fixture();
assert!(!eligibility::is_mediation_eligible(&conn, "d-resolved").unwrap());
mediation_events::record_start_attempt_stopped(&conn, None, "d-resolved", "ineligible", 42)
.unwrap();
let (sid, payload): (Option<String>, String) = conn
.query_row(
"SELECT session_id, payload_json
FROM mediation_events
WHERE kind = 'start_attempt_stopped'
ORDER BY id DESC LIMIT 1",
[],
|r| Ok((r.get(0)?, r.get(1)?)),
)
.unwrap();
assert!(sid.is_none(), "ineligible stop row must be dispute-scoped");
let parsed: serde_json::Value = serde_json::from_str(&payload).unwrap();
assert_eq!(parsed["dispute_id"], "d-resolved");
assert_eq!(parsed["stop_reason"], "ineligible");
}
}