use std::sync::Arc;
use nostr_sdk::prelude::*;
use rusqlite::params;
use tokio::sync::Mutex as AsyncMutex;
use tracing::{debug, info, instrument, warn};
use crate::db;
use crate::error::Result;
use crate::models::mediation::ClassificationLabel;
use crate::models::SolverConfig;
#[derive(Debug, Clone)]
pub struct FinalReportPayload {
pub dispute_id: String,
pub session_id: Option<String>,
pub classification: Option<(ClassificationLabel, f64)>,
pub outbound_party_messages_count: u8,
pub final_dispute_status: String,
pub narrative: String,
}
#[instrument(skip(conn), fields(dispute_id = %dispute_id))]
pub async fn has_any_mediation_context(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
dispute_id: &str,
) -> Result<bool> {
let guard = conn.lock().await;
let exists: bool = guard.query_row(
"SELECT EXISTS(
SELECT 1 FROM mediation_sessions WHERE dispute_id = ?1
UNION ALL
SELECT 1 FROM mediation_events
WHERE session_id IS NULL
AND json_extract(payload_json, '$.dispute_id') = ?1
)",
params![dispute_id],
|r| r.get::<_, bool>(0),
)?;
Ok(exists)
}
#[instrument(skip_all, fields(dispute_id = %dispute_id))]
async fn build_payload(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
dispute_id: &str,
final_dispute_status: &str,
) -> Result<FinalReportPayload> {
use std::str::FromStr;
let guard = conn.lock().await;
let session_id: Option<String> = guard
.query_row(
"SELECT session_id FROM mediation_sessions
WHERE dispute_id = ?1
ORDER BY started_at DESC
LIMIT 1",
params![dispute_id],
|r| r.get::<_, String>(0),
)
.ok();
let mut classification: Option<(ClassificationLabel, f64)> = None;
if let Some(sid) = session_id.as_deref() {
if let Ok(payload_json) = guard.query_row(
"SELECT payload_json FROM mediation_events
WHERE session_id = ?1 AND kind = 'classification_produced'
ORDER BY occurred_at DESC, id DESC
LIMIT 1",
params![sid],
|r| r.get::<_, String>(0),
) {
if let Ok(v) = serde_json::from_str::<serde_json::Value>(&payload_json) {
let label = v["classification"]
.as_str()
.and_then(|s| ClassificationLabel::from_str(s).ok());
let conf = v["confidence"].as_f64();
if let (Some(l), Some(c)) = (label, conf) {
classification = Some((l, c));
}
}
}
}
if classification.is_none() {
if let Ok(payload_json) = guard.query_row(
"SELECT payload_json FROM mediation_events
WHERE session_id IS NULL
AND kind = 'reasoning_verdict'
AND json_extract(payload_json, '$.dispute_id') = ?1
ORDER BY occurred_at DESC, id DESC
LIMIT 1",
params![dispute_id],
|r| r.get::<_, String>(0),
) {
if let Ok(v) = serde_json::from_str::<serde_json::Value>(&payload_json) {
let label = v["classification"]
.as_str()
.and_then(|s| ClassificationLabel::from_str(s).ok());
let conf = v["confidence"].as_f64();
if let (Some(l), Some(c)) = (label, conf) {
classification = Some((l, c));
}
}
}
}
let outbound_party_messages_count: u8 = if let Some(sid) = session_id.as_deref() {
let n: i64 = guard
.query_row(
"SELECT COUNT(DISTINCT party) FROM mediation_messages
WHERE session_id = ?1 AND direction = 'outbound'",
params![sid],
|r| r.get(0),
)
.unwrap_or(0);
n.clamp(0, 2) as u8
} else {
0
};
let session_state: Option<String> = session_id.as_deref().and_then(|sid| {
guard
.query_row(
"SELECT state FROM mediation_sessions WHERE session_id = ?1",
params![sid],
|r| r.get::<_, String>(0),
)
.ok()
});
let no_session_reason: Option<NoSessionReason> = if session_id.is_none() {
Some(classify_no_session_reason(&guard, dispute_id))
} else {
None
};
drop(guard);
let narrative = build_narrative(
&session_id,
session_state.as_deref(),
&classification,
outbound_party_messages_count,
final_dispute_status,
no_session_reason,
);
Ok(FinalReportPayload {
dispute_id: dispute_id.to_string(),
session_id,
classification,
outbound_party_messages_count,
final_dispute_status: final_dispute_status.to_string(),
narrative,
})
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum NoSessionReason {
ReasoningDeclined,
TakeFailed,
Unknown,
}
fn classify_no_session_reason(conn: &rusqlite::Connection, dispute_id: &str) -> NoSessionReason {
let take_failed: bool = conn
.query_row(
"SELECT EXISTS(
SELECT 1 FROM mediation_events
WHERE session_id IS NULL
AND kind = 'take_dispute_issued'
AND json_extract(payload_json, '$.dispute_id') = ?1
AND json_extract(payload_json, '$.outcome') = 'failure'
)",
params![dispute_id],
|r| r.get::<_, bool>(0),
)
.unwrap_or(false);
if take_failed {
return NoSessionReason::TakeFailed;
}
let reasoning_declined: bool = conn
.query_row(
"SELECT EXISTS(
SELECT 1 FROM mediation_events
WHERE session_id IS NULL
AND json_extract(payload_json, '$.dispute_id') = ?1
AND (
(kind = 'reasoning_verdict'
AND json_extract(payload_json, '$.decision') = 'escalate')
OR kind = 'escalation_recommended'
)
)",
params![dispute_id],
|r| r.get::<_, bool>(0),
)
.unwrap_or(false);
if reasoning_declined {
NoSessionReason::ReasoningDeclined
} else {
NoSessionReason::Unknown
}
}
fn build_narrative(
session_id: &Option<String>,
session_state: Option<&str>,
classification: &Option<(ClassificationLabel, f64)>,
outbound_party_messages_count: u8,
final_dispute_status: &str,
no_session_reason: Option<NoSessionReason>,
) -> String {
let session_clause = match (session_id, session_state) {
(Some(sid), Some(state)) => format!("Session {sid} was in state `{state}`."),
(Some(sid), None) => format!("Session {sid} was active."),
(None, _) => match no_session_reason.unwrap_or(NoSessionReason::Unknown) {
NoSessionReason::ReasoningDeclined => "No mediation session was opened — the \
reasoning verdict recommended escalation before any take was issued."
.to_string(),
NoSessionReason::TakeFailed => "No mediation session was opened — reasoning \
cleared the dispute for mediation but the take-dispute exchange with \
Mostro did not complete."
.to_string(),
NoSessionReason::Unknown => {
"No mediation session was opened for this dispute.".to_string()
}
},
};
let classification_clause = match classification {
Some((label, conf)) => {
format!("Serbero's last recorded classification was `{label}` (confidence {conf:.2}).")
}
None => "No classification was recorded.".to_string(),
};
let outbound_clause = match outbound_party_messages_count {
0 => "No outbound party-facing messages were dispatched.".to_string(),
1 => "Serbero messaged one party before the dispute was resolved externally.".to_string(),
_ => {
"Serbero messaged both parties before the dispute was resolved externally.".to_string()
}
};
format!(
"Dispute closed with final status `{final_dispute_status}`. \
{session_clause} {classification_clause} {outbound_clause}"
)
}
pub fn build_report_body(payload: &FinalReportPayload) -> String {
let session_line = match &payload.session_id {
Some(sid) => format!("Session: {sid}\n"),
None => "Session: <none — dispute-scoped handoff>\n".to_string(),
};
let classification_line = match &payload.classification {
Some((label, conf)) => format!("Classification: {label} (confidence {conf:.2})\n"),
None => "Classification: <none recorded>\n".to_string(),
};
format!(
"mediation_resolution_report/v1\n\
Dispute: {}\n\
{session_line}\
{classification_line}\
Outbound party messages: {}\n\
Final dispute status: {}\n\
\n\
{}",
payload.dispute_id,
payload.outbound_party_messages_count,
payload.final_dispute_status,
payload.narrative,
)
}
#[instrument(skip_all, fields(dispute_id = %dispute_id, final_dispute_status = %final_dispute_status))]
pub async fn emit_final_report(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
client: &Client,
solvers: &[SolverConfig],
dispute_id: &str,
final_dispute_status: &str,
) -> Result<()> {
let payload = build_payload(conn, dispute_id, final_dispute_status).await?;
let body = build_report_body(&payload);
info!(
dispute_id = %dispute_id,
session_id = payload.session_id.as_deref().unwrap_or("<none>"),
final_dispute_status,
"emit_final_report: delivering FR-124 DM to solvers"
);
super::notify_solvers_final_resolution_report(
conn,
client,
solvers,
dispute_id,
payload.session_id.as_deref(),
&body,
)
.await;
let now = super::current_ts_secs()?;
let audit_payload = serde_json::json!({
"dispute_id": payload.dispute_id,
"session_id": payload.session_id,
"classification": payload
.classification
.as_ref()
.map(|(l, _)| l.to_string()),
"confidence": payload.classification.as_ref().map(|(_, c)| *c),
"outbound_party_messages_count": payload.outbound_party_messages_count,
"final_dispute_status": payload.final_dispute_status,
"narrative": payload.narrative,
})
.to_string();
{
let guard = conn.lock().await;
if let Err(e) = db::mediation_events::record_event(
&guard,
db::mediation_events::MediationEventKind::ResolvedExternallyReported,
payload.session_id.as_deref(),
&audit_payload,
None,
None,
None,
now,
) {
warn!(
dispute_id = %dispute_id,
error = %e,
"failed to record resolved_externally_reported event"
);
}
}
debug!(
dispute_id = %dispute_id,
"emit_final_report: done"
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::migrations::run_migrations;
use crate::db::open_in_memory;
async fn fresh_conn() -> Arc<AsyncMutex<rusqlite::Connection>> {
let mut conn = open_in_memory().unwrap();
run_migrations(&mut conn).unwrap();
conn.execute(
"INSERT INTO disputes (
dispute_id, event_id, mostro_pubkey, initiator_role,
dispute_status, event_timestamp, detected_at, lifecycle_state
) VALUES ('d-r', 'e1', 'm1', 'buyer',
'initiated', 1, 2, 'notified')",
[],
)
.unwrap();
Arc::new(AsyncMutex::new(conn))
}
#[tokio::test]
async fn no_context_predicate_false_for_untouched_dispute() {
let conn = fresh_conn().await;
assert!(!has_any_mediation_context(&conn, "d-r").await.unwrap());
}
#[tokio::test]
async fn session_row_satisfies_predicate() {
let conn = fresh_conn().await;
{
let c = conn.lock().await;
c.execute(
"INSERT INTO mediation_sessions (
session_id, dispute_id, state, round_count,
prompt_bundle_id, policy_hash,
started_at, last_transition_at
) VALUES ('s-r', 'd-r', 'awaiting_response', 0,
'phase3-default', 'hash', 100, 100)",
[],
)
.unwrap();
}
assert!(has_any_mediation_context(&conn, "d-r").await.unwrap());
}
#[tokio::test]
async fn dispute_scoped_event_alone_satisfies_predicate() {
let conn = fresh_conn().await;
let now = super::super::current_ts_secs().unwrap();
{
let c = conn.lock().await;
db::mediation_events::record_event(
&c,
db::mediation_events::MediationEventKind::ReasoningVerdict,
None,
r#"{"dispute_id":"d-r","decision":"escalate","trigger":"fraud_indicator"}"#,
None,
Some("phase3-default"),
Some("hash"),
now,
)
.unwrap();
}
assert!(has_any_mediation_context(&conn, "d-r").await.unwrap());
}
#[tokio::test]
async fn payload_picks_dispute_scoped_verdict_when_no_session_event() {
let conn = fresh_conn().await;
let now = super::super::current_ts_secs().unwrap();
{
let c = conn.lock().await;
db::mediation_events::record_event(
&c,
db::mediation_events::MediationEventKind::ReasoningVerdict,
None,
r#"{"dispute_id":"d-r","classification":"suspected_fraud","confidence":0.95,"decision":"escalate","trigger":"fraud_indicator"}"#,
None,
Some("phase3-default"),
Some("hash"),
now,
)
.unwrap();
}
let payload = build_payload(&conn, "d-r", "seller-refunded")
.await
.unwrap();
assert_eq!(payload.session_id, None);
assert_eq!(
payload.classification,
Some((ClassificationLabel::SuspectedFraud, 0.95))
);
assert_eq!(payload.outbound_party_messages_count, 0);
assert!(
payload
.narrative
.contains("No mediation session was opened"),
"narrative should say no session; got: {}",
payload.narrative
);
}
#[test]
fn body_contains_versioning_and_key_fields() {
let payload = FinalReportPayload {
dispute_id: "d-1".into(),
session_id: Some("s-1".into()),
classification: Some((ClassificationLabel::CoordinationFailureResolvable, 0.88)),
outbound_party_messages_count: 2,
final_dispute_status: "seller-refunded".into(),
narrative: "NARRATIVE".into(),
};
let body = build_report_body(&payload);
assert!(body.starts_with("mediation_resolution_report/v1"));
assert!(body.contains("d-1"));
assert!(body.contains("s-1"));
assert!(body.contains("coordination_failure_resolvable"));
assert!(body.contains("0.88"));
assert!(body.contains("seller-refunded"));
assert!(body.contains("NARRATIVE"));
}
}