use std::sync::Arc;
use std::time::Duration;
use nostr_sdk::prelude::*;
use tokio::sync::Mutex as AsyncMutex;
use tracing::{debug, info, instrument, warn};
use uuid::Uuid;
use super::policy::{self, PolicyDecision};
use super::SessionKeyCache;
use crate::chat::dispute_chat_flow::{self, DisputeChatMaterial};
use crate::chat::inbound::InboundEnvelope;
use crate::db;
use crate::error::{Error, Result};
use crate::models::dispute::InitiatorRole;
use crate::models::mediation::TranscriptParty;
use crate::prompts::PromptBundle;
use crate::reasoning::ReasoningProvider;
#[derive(Debug, Clone, PartialEq)]
pub enum OpenOutcome {
Opened { session_id: String },
AlreadyOpen { session_id: String },
ReadyForSummary {
session_id: String,
classification: crate::models::mediation::ClassificationLabel,
confidence: f64,
},
EscalatedBeforeTake {
dispute_id: String,
trigger: crate::models::mediation::EscalationTrigger,
},
RefusedReasoningUnavailable { reason: String },
RefusedAuthPending { reason: String },
RefusedAuthTerminated { reason: String },
}
pub struct OpenSessionParams<'a> {
pub conn: &'a Arc<AsyncMutex<rusqlite::Connection>>,
pub client: &'a Client,
pub serbero_keys: &'a Keys,
pub mostro_pubkey: &'a PublicKey,
pub reasoning: &'a dyn ReasoningProvider,
pub prompt_bundle: &'a Arc<PromptBundle>,
pub dispute_id: &'a str,
pub initiator_role: InitiatorRole,
pub dispute_uuid: Uuid,
pub take_flow_timeout: Duration,
pub take_flow_poll_interval: Duration,
pub provider_name: &'a str,
pub model_name: &'a str,
pub auth_handle: &'a super::auth_retry::AuthRetryHandle,
pub session_key_cache: Option<&'a SessionKeyCache>,
pub solvers: &'a [crate::models::SolverConfig],
}
#[instrument(skip_all, fields(dispute_id = %params.dispute_id))]
pub async fn open_session(params: OpenSessionParams<'_>) -> Result<OpenOutcome> {
match params.auth_handle.current_state() {
super::auth_retry::AuthState::Authorized => {}
super::auth_retry::AuthState::Unauthorized => {
let reason = "solver authorization pending (retry loop running)".to_string();
warn!(reason = %reason, "refusing to open mediation session: auth pending");
return Ok(OpenOutcome::RefusedAuthPending { reason });
}
super::auth_retry::AuthState::Terminated => {
let reason = "solver authorization terminated without recovery".to_string();
warn!(reason = %reason, "refusing to open mediation session: auth terminated");
return Ok(OpenOutcome::RefusedAuthTerminated { reason });
}
}
if let Err(e) = params.reasoning.health_check().await {
warn!(
error = %e,
"refusing to open mediation session: reasoning provider health check failed"
);
return Ok(OpenOutcome::RefusedReasoningUnavailable {
reason: e.to_string(),
});
}
{
let conn = params.conn.lock().await;
if let Some((sid, _state)) =
db::mediation::latest_open_session_for(&conn, params.dispute_id)?
{
info!(session_id = %sid, "mediation session already open; skipping");
return Ok(OpenOutcome::AlreadyOpen { session_id: sid });
}
}
let classify_outcome = policy::classify_for_start(
params.conn,
params.dispute_id,
params.initiator_role,
params.prompt_bundle,
params.reasoning,
params.provider_name,
params.model_name,
)
.await?;
{
let now = current_ts_secs()?;
let payload = match &classify_outcome.decision {
PolicyDecision::AskClarification { .. } => {
serde_json::json!({
"dispute_id": params.dispute_id,
"decision": "ask_clarification",
})
}
PolicyDecision::Summarize {
classification,
confidence,
} => serde_json::json!({
"dispute_id": params.dispute_id,
"decision": "summarize",
"classification": classification.to_string(),
"confidence": confidence,
}),
PolicyDecision::Escalate(trigger) => serde_json::json!({
"dispute_id": params.dispute_id,
"decision": "escalate",
"trigger": trigger.to_string(),
}),
};
let guard = params.conn.lock().await;
if let Err(e) = db::mediation_events::record_event(
&guard,
db::mediation_events::MediationEventKind::ReasoningVerdict,
None,
&payload.to_string(),
None,
Some(¶ms.prompt_bundle.id),
Some(¶ms.prompt_bundle.policy_hash),
now,
) {
warn!(
dispute_id = %params.dispute_id,
error = %e,
"failed to record dispute-scoped reasoning_verdict event"
);
}
}
if let PolicyDecision::Escalate(trigger) = &classify_outcome.decision {
let trigger = *trigger;
{
let now = current_ts_secs()?;
let payload = serde_json::json!({
"dispute_id": params.dispute_id,
"reason": "policy_escalate",
"trigger": trigger.to_string(),
})
.to_string();
let guard = params.conn.lock().await;
if let Err(e) = db::mediation_events::record_event(
&guard,
db::mediation_events::MediationEventKind::StartAttemptStopped,
None,
&payload,
None,
Some(¶ms.prompt_bundle.id),
Some(¶ms.prompt_bundle.policy_hash),
now,
) {
warn!(
dispute_id = %params.dispute_id,
error = %e,
"failed to record dispute-scoped start_attempt_stopped event"
);
}
}
let rationale_refs = classify_outcome
.rationale_audit
.as_ref()
.map(|a| vec![a.rationale_id.clone()])
.unwrap_or_default();
if let Err(e) = super::escalation::recommend(super::escalation::RecommendParams {
conn: params.conn,
session_id: None,
dispute_id: params.dispute_id,
trigger,
evidence_refs: Vec::new(),
rationale_refs,
prompt_bundle_id: ¶ms.prompt_bundle.id,
policy_hash: ¶ms.prompt_bundle.policy_hash,
})
.await
{
warn!(
dispute_id = %params.dispute_id,
error = %e,
"escalation::recommend (dispute-scoped) failed on opening-call escalate"
);
return Err(e);
}
info!(
dispute_id = %params.dispute_id,
trigger = %trigger,
"opening-call escalate: dispute-scoped handoff recorded, no take"
);
return Ok(OpenOutcome::EscalatedBeforeTake {
dispute_id: params.dispute_id.to_string(),
trigger,
});
}
let material = match dispute_chat_flow::run_take_flow(dispute_chat_flow::TakeFlowParams {
client: params.client,
serbero_keys: params.serbero_keys,
mostro_pubkey: params.mostro_pubkey,
dispute_id: params.dispute_uuid,
timeout: params.take_flow_timeout,
poll_interval: params.take_flow_poll_interval,
})
.await
{
Ok(m) => {
let now = current_ts_secs()?;
let payload = serde_json::json!({
"dispute_id": params.dispute_id,
"outcome": "success",
})
.to_string();
let guard = params.conn.lock().await;
if let Err(e) = db::mediation_events::record_event(
&guard,
db::mediation_events::MediationEventKind::TakeDisputeIssued,
None,
&payload,
None,
Some(¶ms.prompt_bundle.id),
Some(¶ms.prompt_bundle.policy_hash),
now,
) {
warn!(
dispute_id = %params.dispute_id,
error = %e,
"failed to record take_dispute_issued{{success}} event"
);
}
m
}
Err(e) => {
let now = current_ts_secs()?;
let payload = serde_json::json!({
"dispute_id": params.dispute_id,
"outcome": "failure",
"reason": e.to_string(),
})
.to_string();
let guard = params.conn.lock().await;
if let Err(db_err) = db::mediation_events::record_event(
&guard,
db::mediation_events::MediationEventKind::TakeDisputeIssued,
None,
&payload,
None,
Some(¶ms.prompt_bundle.id),
Some(¶ms.prompt_bundle.policy_hash),
now,
) {
warn!(
dispute_id = %params.dispute_id,
error = %db_err,
"failed to record take_dispute_issued{{failure}} event"
);
}
return Err(e);
}
};
let session_id = Uuid::new_v4().to_string();
let now = current_ts_secs()?;
{
let mut conn = params.conn.lock().await;
if let Some((sid, _state)) =
db::mediation::latest_open_session_for(&conn, params.dispute_id)?
{
info!(
session_id = %sid,
"mediation session opened concurrently; aborting this attempt"
);
return Ok(OpenOutcome::AlreadyOpen { session_id: sid });
}
let tx = conn.transaction()?;
db::mediation::insert_session(
&tx,
&db::mediation::NewMediationSession {
session_id: &session_id,
dispute_id: params.dispute_id,
prompt_bundle_id: ¶ms.prompt_bundle.id,
policy_hash: ¶ms.prompt_bundle.policy_hash,
buyer_shared_pubkey: Some(&material.buyer_shared_pubkey()),
seller_shared_pubkey: Some(&material.seller_shared_pubkey()),
started_at: now,
},
)?;
db::mediation_events::record_session_opened(
&tx,
&session_id,
¶ms.prompt_bundle.id,
¶ms.prompt_bundle.policy_hash,
now,
)?;
tx.commit()?;
}
let rationale_audit = classify_outcome
.rationale_audit
.expect("classify_for_start must return a rationale_audit on non-Escalate paths");
policy::record_classification_for_session(
params.conn,
&session_id,
&rationale_audit,
params.prompt_bundle,
)
.await?;
let decision = classify_outcome.decision;
match decision {
PolicyDecision::AskClarification {
buyer_text,
seller_text,
} => {
if let Err(e) = super::draft_and_send_initial_message(
params.conn,
params.client,
params.serbero_keys,
&session_id,
&material.buyer_shared_keys,
&material.seller_shared_keys,
params.prompt_bundle,
&buyer_text,
&seller_text,
)
.await
{
if let Error::AuthorizationLost(ref msg) = e {
handle_authorization_lost(
params.conn,
params.client,
params.solvers,
params.dispute_id,
&session_id,
params.auth_handle,
params.prompt_bundle,
msg,
)
.await;
}
return Err(e);
}
if let Some(cache) = params.session_key_cache {
register_session_material(cache, &session_id, material.clone()).await;
}
info!(
session_id = %session_id,
prompt_bundle_id = %params.prompt_bundle.id,
policy_hash = %params.prompt_bundle.policy_hash,
"mediation session opened; first clarifying message dispatched to both parties"
);
Ok(OpenOutcome::Opened { session_id })
}
PolicyDecision::Summarize {
classification,
confidence,
} => {
let now = current_ts_secs()?;
{
let guard = params.conn.lock().await;
db::mediation::set_session_state(
&guard,
&session_id,
crate::models::mediation::MediationSessionState::Classified,
now,
)?;
}
if let Some(cache) = params.session_key_cache {
register_session_material(cache, &session_id, material.clone()).await;
}
info!(
session_id = %session_id,
classification = %classification,
confidence = confidence,
"mediation session opened; ready for cooperative summary"
);
Ok(OpenOutcome::ReadyForSummary {
session_id,
classification,
confidence,
})
}
PolicyDecision::Escalate(_) => {
unreachable!(
"Escalate must be handled in the FR-122 pre-take branch; \
reaching the post-insert dispatch is a refactor bug"
);
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IngestOutcome {
Fresh { round_count_after: i64 },
Duplicate,
Stale,
}
pub async fn ingest_inbound(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
session_id: &str,
envelope: &InboundEnvelope,
) -> Result<IngestOutcome> {
if matches!(envelope.party, TranscriptParty::Serbero) {
return Err(Error::InvalidEvent(
"ingest_inbound refused: envelope.party = Serbero".into(),
));
}
let now = current_ts_secs()?;
let mut conn = conn.lock().await;
let (buyer_last, seller_last) = db::mediation::get_last_seen(&conn, session_id)?;
let last_seen_for_party = match envelope.party {
TranscriptParty::Buyer => buyer_last,
TranscriptParty::Seller => seller_last,
TranscriptParty::Serbero => unreachable!("guarded above"),
};
let is_stale = last_seen_for_party
.map(|prev| envelope.inner_created_at < prev)
.unwrap_or(false);
let tx = conn.transaction()?;
let inserted = db::mediation::insert_inbound_message(
&tx,
&db::mediation::NewInboundMessage {
session_id,
party: envelope.party,
shared_pubkey: &envelope.shared_pubkey,
inner_event_id: &envelope.inner_event_id,
inner_event_created_at: envelope.inner_created_at,
outer_event_id: Some(&envelope.outer_event_id),
content: &envelope.content,
persisted_at: now,
stale: is_stale,
},
)?;
if !inserted {
tx.commit()?;
debug!(
session_id = %session_id,
inner_event_id = %envelope.inner_event_id,
"inbound_duplicate"
);
return Ok(IngestOutcome::Duplicate);
}
if is_stale {
tx.commit()?;
debug!(
session_id = %session_id,
inner_event_id = %envelope.inner_event_id,
stale = true,
"inbound_ingested_stale"
);
return Ok(IngestOutcome::Stale);
}
db::mediation::update_last_seen_inner_ts(
&tx,
session_id,
envelope.party,
envelope.inner_created_at,
)?;
let round_count_after = db::mediation::recompute_round_count(&tx, session_id)?;
tx.commit()?;
info!(
session_id = %session_id,
party = %envelope.party,
inner_event_id = %envelope.inner_event_id,
inner_created_at = envelope.inner_created_at,
round_count_after = round_count_after,
"inbound_ingested"
);
Ok(IngestOutcome::Fresh { round_count_after })
}
async fn register_session_material(
cache: &SessionKeyCache,
session_id: &str,
material: DisputeChatMaterial,
) {
let mut guard = cache.lock().await;
guard.insert(session_id.to_string(), material);
}
#[allow(clippy::too_many_arguments)]
pub async fn handle_authorization_lost(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
client: &Client,
solvers: &[crate::models::SolverConfig],
dispute_id: &str,
session_id: &str,
auth_handle: &super::auth_retry::AuthRetryHandle,
prompt_bundle: &Arc<PromptBundle>,
error_msg: &str,
) {
tracing::error!(
session_id = %session_id,
error = %error_msg,
"authorization_lost"
);
auth_handle.signal_auth_lost();
match super::escalation::recommend(super::escalation::RecommendParams {
conn,
session_id: Some(session_id),
dispute_id,
trigger: crate::models::mediation::EscalationTrigger::AuthorizationLost,
evidence_refs: Vec::new(),
rationale_refs: Vec::new(),
prompt_bundle_id: &prompt_bundle.id,
policy_hash: &prompt_bundle.policy_hash,
})
.await
{
Ok(()) => {
super::notify_solvers_escalation(
conn,
client,
solvers,
dispute_id,
session_id,
crate::models::mediation::EscalationTrigger::AuthorizationLost,
)
.await;
}
Err(esc_err) => {
warn!(
session_id = %session_id,
error = %esc_err,
"handle_authorization_lost: escalation::recommend failed"
);
}
}
}
pub(crate) async fn publish_with_bounded_retry(
client: &Client,
outer: &Event,
label: &str,
) -> Result<()> {
const MAX_ATTEMPTS: u32 = 3;
let mut last_err: Option<String> = None;
for attempt in 1..=MAX_ATTEMPTS {
match client.send_event(outer).await {
Ok(_) => return Ok(()),
Err(e) => {
last_err = Some(e.to_string());
if attempt < MAX_ATTEMPTS {
let backoff_ms = 100u64 * (1u64 << (attempt - 1));
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
}
}
}
}
Err(Error::ChatTransport(format!(
"publish {label} gift-wrap failed after {MAX_ATTEMPTS} attempts: {}",
last_err.unwrap_or_default()
)))
}
use super::current_ts_secs;
pub fn check_round_limit(round_count: u32, max_rounds: u32) -> bool {
round_count >= max_rounds
}
#[cfg(test)]
mod round_limit_tests {
use super::check_round_limit;
#[test]
fn at_limit_returns_true() {
assert!(check_round_limit(3, 3));
}
#[test]
fn below_limit_returns_false() {
assert!(!check_round_limit(2, 3));
}
#[test]
fn above_limit_returns_true() {
assert!(check_round_limit(4, 3));
}
#[test]
fn zero_max_always_true() {
assert!(check_round_limit(0, 0));
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::migrations::run_migrations;
use crate::db::open_in_memory;
use crate::mediation::auth_retry::{AuthRetryHandle, AuthState};
use crate::models::reasoning::{
ClassificationRequest, ClassificationResponse, ReasoningError, SummaryRequest,
SummaryResponse,
};
use async_trait::async_trait;
struct PanicReasoning;
#[async_trait]
impl ReasoningProvider for PanicReasoning {
async fn classify(
&self,
_request: ClassificationRequest,
) -> std::result::Result<ClassificationResponse, ReasoningError> {
panic!("auth gate must refuse before classify is called");
}
async fn summarize(
&self,
_request: SummaryRequest,
) -> std::result::Result<SummaryResponse, ReasoningError> {
panic!("auth gate must refuse before summarize is called");
}
async fn health_check(&self) -> std::result::Result<(), ReasoningError> {
panic!("auth gate must refuse before health_check is called");
}
}
fn fresh_conn() -> Arc<AsyncMutex<rusqlite::Connection>> {
let mut conn = open_in_memory().unwrap();
run_migrations(&mut conn).unwrap();
conn.execute(
"INSERT INTO disputes (
dispute_id, event_id, mostro_pubkey, initiator_role,
dispute_status, event_timestamp, detected_at, lifecycle_state
) VALUES ('dispute-auth-gate', 'e1', 'm1', 'buyer',
'initiated', 0, 0, 'notified')",
[],
)
.unwrap();
Arc::new(AsyncMutex::new(conn))
}
fn fresh_bundle() -> Arc<PromptBundle> {
Arc::new(PromptBundle {
id: "phase3-default".into(),
policy_hash: "test-policy-hash".into(),
system: "sys".into(),
classification: "cls".into(),
escalation: "esc".into(),
mediation_style: "style".into(),
message_templates: "tpl".into(),
})
}
async fn run_gate_with(
auth_handle: &AuthRetryHandle,
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
) -> OpenOutcome {
let serbero_keys = Keys::generate();
let mostro_pk = Keys::generate().public_key();
let client = Client::new(Keys::generate());
let reasoning = PanicReasoning;
let bundle = fresh_bundle();
open_session(OpenSessionParams {
conn,
client: &client,
serbero_keys: &serbero_keys,
mostro_pubkey: &mostro_pk,
reasoning: &reasoning,
prompt_bundle: &bundle,
dispute_id: "dispute-auth-gate",
initiator_role: InitiatorRole::Buyer,
dispute_uuid: Uuid::new_v4(),
take_flow_timeout: Duration::from_secs(1),
take_flow_poll_interval: Duration::from_millis(50),
provider_name: "mock-provider",
model_name: "mock-model",
auth_handle,
session_key_cache: None,
solvers: &[],
})
.await
.expect("auth-gate path must not return Err")
}
async fn mediation_row_counts(conn: &Arc<AsyncMutex<rusqlite::Connection>>) -> (i64, i64, i64) {
let guard = conn.lock().await;
let sessions: i64 = guard
.query_row("SELECT COUNT(*) FROM mediation_sessions", [], |r| r.get(0))
.unwrap();
let messages: i64 = guard
.query_row("SELECT COUNT(*) FROM mediation_messages", [], |r| r.get(0))
.unwrap();
let events: i64 = guard
.query_row("SELECT COUNT(*) FROM mediation_events", [], |r| r.get(0))
.unwrap();
(sessions, messages, events)
}
#[tokio::test]
async fn unauthorized_gate_refuses_with_pending_and_writes_nothing() {
let conn = fresh_conn();
let handle = AuthRetryHandle::with_state_for_testing(AuthState::Unauthorized);
let outcome = run_gate_with(&handle, &conn).await;
match outcome {
OpenOutcome::RefusedAuthPending { reason } => {
assert!(reason.contains("pending"), "unexpected reason: {reason}");
}
other => panic!("expected RefusedAuthPending, got {other:?}"),
}
let (sessions, messages, events) = mediation_row_counts(&conn).await;
assert_eq!(sessions, 0, "no mediation_sessions row may be written");
assert_eq!(messages, 0, "no mediation_messages row may be written");
assert_eq!(events, 0, "no mediation_events row may be written");
}
#[tokio::test]
async fn terminated_gate_refuses_with_terminated_and_writes_nothing() {
let conn = fresh_conn();
let handle = AuthRetryHandle::with_state_for_testing(AuthState::Terminated);
let outcome = run_gate_with(&handle, &conn).await;
match outcome {
OpenOutcome::RefusedAuthTerminated { reason } => {
assert!(reason.contains("terminated"), "unexpected reason: {reason}");
}
other => panic!("expected RefusedAuthTerminated, got {other:?}"),
}
let (sessions, messages, events) = mediation_row_counts(&conn).await;
assert_eq!(sessions, 0);
assert_eq!(messages, 0);
assert_eq!(events, 0);
}
}