pub mod auth_retry;
pub mod eligibility;
pub mod escalation;
pub mod follow_up;
pub mod policy;
pub mod report;
pub mod router;
pub mod session;
pub mod start;
pub mod summarizer;
pub mod transcript;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use nostr_sdk::prelude::*;
use rusqlite::params;
use serde_json::json;
use tokio::sync::Mutex as AsyncMutex;
use tracing::{debug, error, info, instrument, warn};
use uuid::Uuid;
use crate::chat::dispute_chat_flow::{self, DisputeChatMaterial};
use crate::chat::inbound::{self, PartyChatMaterial};
use crate::chat::outbound;
use crate::db;
use crate::db::mediation_events::MediationEventKind;
use crate::error::{Error, Result};
use crate::models::dispute::InitiatorRole;
use crate::models::mediation::{
ClassificationLabel, EscalationTrigger, MediationSessionState, TranscriptParty,
};
use crate::models::reasoning::TranscriptEntry;
use crate::models::{MediationConfig, NotificationStatus, NotificationType, SolverConfig};
use crate::nostr::notifier::send_gift_wrap_notification;
use crate::prompts::PromptBundle;
use crate::reasoning::ReasoningProvider;
pub type SessionKeyCache = Arc<AsyncMutex<HashMap<String, DisputeChatMaterial>>>;
const INGEST_FETCH_TIMEOUT: Duration = Duration::from_secs(5);
const ENGINE_TICK_INTERVAL: Duration = Duration::from_secs(30);
pub(crate) const DEFAULT_TAKE_FLOW_TIMEOUT: Duration = Duration::from_secs(15);
pub(crate) const DEFAULT_TAKE_FLOW_POLL_INTERVAL: Duration = Duration::from_millis(250);
pub struct Phase3HandlerCtx {
pub serbero_keys: Keys,
pub mostro_pubkey: PublicKey,
pub reasoning: Arc<dyn ReasoningProvider>,
pub prompt_bundle: Arc<PromptBundle>,
pub provider_name: String,
pub model_name: String,
pub auth_handle: auth_retry::AuthRetryHandle,
pub session_key_cache: SessionKeyCache,
pub solvers: Vec<SolverConfig>,
}
#[allow(clippy::too_many_arguments)]
pub async fn open_dispute_session(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
client: &Client,
serbero_keys: &Keys,
mostro_pubkey: &PublicKey,
reasoning: &dyn ReasoningProvider,
prompt_bundle: &Arc<PromptBundle>,
dispute_id: &str,
initiator_role: InitiatorRole,
dispute_uuid: Uuid,
provider_name: &str,
model_name: &str,
auth_handle: &auth_retry::AuthRetryHandle,
) -> Result<session::OpenOutcome> {
session::open_session(session::OpenSessionParams {
conn,
client,
serbero_keys,
mostro_pubkey,
reasoning,
prompt_bundle,
dispute_id,
initiator_role,
dispute_uuid,
take_flow_timeout: Duration::from_secs(15),
take_flow_poll_interval: Duration::from_millis(250),
provider_name,
model_name,
auth_handle,
session_key_cache: None,
solvers: &[],
})
.await
}
#[instrument(skip_all, fields(session_id = %session_id))]
#[allow(clippy::too_many_arguments)]
pub async fn draft_and_send_initial_message(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
client: &Client,
serbero_keys: &Keys,
session_id: &str,
buyer_shared_keys: &Keys,
seller_shared_keys: &Keys,
prompt_bundle: &Arc<PromptBundle>,
buyer_text: &str,
seller_text: &str,
) -> Result<()> {
let buyer_content = format!("Buyer: {}", buyer_text);
let seller_content = format!("Seller: {}", seller_text);
let buyer_wrap = outbound::build_wrap(
serbero_keys,
&buyer_shared_keys.public_key(),
&buyer_content,
)
.await?;
let seller_wrap = outbound::build_wrap(
serbero_keys,
&seller_shared_keys.public_key(),
&seller_content,
)
.await?;
if buyer_wrap.inner_event_id == seller_wrap.inner_event_id {
return Err(Error::ChatTransport(
"inner event ids collided across parties; refusing to persist \
rows that would violate the dedup invariant"
.into(),
));
}
let buyer_shared_pubkey_hex = buyer_shared_keys.public_key().to_hex();
let seller_shared_pubkey_hex = seller_shared_keys.public_key().to_hex();
let buyer_inner_id_hex = buyer_wrap.inner_event_id.to_hex();
let seller_inner_id_hex = seller_wrap.inner_event_id.to_hex();
let now = current_ts_secs()?;
{
let mut guard = conn.lock().await;
let tx = guard.transaction()?;
db::mediation::insert_outbound_message(
&tx,
&db::mediation::NewOutboundMessage {
session_id,
party: TranscriptParty::Buyer,
shared_pubkey: &buyer_shared_pubkey_hex,
inner_event_id: &buyer_inner_id_hex,
inner_event_created_at: buyer_wrap.inner_created_at,
outer_event_id: Some(&buyer_wrap.outer.id.to_hex()),
content: &buyer_content,
prompt_bundle_id: &prompt_bundle.id,
policy_hash: &prompt_bundle.policy_hash,
persisted_at: now,
},
)?;
db::mediation::insert_outbound_message(
&tx,
&db::mediation::NewOutboundMessage {
session_id,
party: TranscriptParty::Seller,
shared_pubkey: &seller_shared_pubkey_hex,
inner_event_id: &seller_inner_id_hex,
inner_event_created_at: seller_wrap.inner_created_at,
outer_event_id: Some(&seller_wrap.outer.id.to_hex()),
content: &seller_content,
prompt_bundle_id: &prompt_bundle.id,
policy_hash: &prompt_bundle.policy_hash,
persisted_at: now,
},
)?;
tx.execute(
"UPDATE mediation_sessions
SET state = 'awaiting_response', last_transition_at = ?1
WHERE session_id = ?2 AND state != 'awaiting_response'",
params![now, session_id],
)?;
tx.commit()?;
}
session::publish_with_bounded_retry(client, &buyer_wrap.outer, "buyer").await?;
record_outbound_sent_audit(
conn,
session_id,
&buyer_shared_pubkey_hex,
&buyer_inner_id_hex,
prompt_bundle,
)
.await?;
session::publish_with_bounded_retry(client, &seller_wrap.outer, "seller").await?;
record_outbound_sent_audit(
conn,
session_id,
&seller_shared_pubkey_hex,
&seller_inner_id_hex,
prompt_bundle,
)
.await?;
info!(
session_id = %session_id,
prompt_bundle_id = %prompt_bundle.id,
policy_hash = %prompt_bundle.policy_hash,
"initial clarifying message dispatched to both parties"
);
Ok(())
}
#[instrument(skip_all, fields(session_id = %session_id, round = round_number))]
#[allow(clippy::too_many_arguments)]
pub async fn draft_and_send_followup_message(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
client: &Client,
serbero_keys: &Keys,
session_id: &str,
round_number: u32,
round_count_to_mark: i64,
buyer_shared_keys: &Keys,
seller_shared_keys: &Keys,
prompt_bundle: &Arc<PromptBundle>,
buyer_text: &str,
seller_text: &str,
) -> Result<()> {
let buyer_content = format!("Round {round_number}. Buyer: {buyer_text}");
let seller_content = format!("Round {round_number}. Seller: {seller_text}");
let buyer_wrap = outbound::build_wrap(
serbero_keys,
&buyer_shared_keys.public_key(),
&buyer_content,
)
.await?;
let seller_wrap = outbound::build_wrap(
serbero_keys,
&seller_shared_keys.public_key(),
&seller_content,
)
.await?;
if buyer_wrap.inner_event_id == seller_wrap.inner_event_id {
return Err(Error::ChatTransport(
"inner event ids collided across parties on follow-up; refusing to \
persist rows that would violate the dedup invariant"
.into(),
));
}
let buyer_shared_pubkey_hex = buyer_shared_keys.public_key().to_hex();
let seller_shared_pubkey_hex = seller_shared_keys.public_key().to_hex();
let buyer_inner_id_hex = buyer_wrap.inner_event_id.to_hex();
let seller_inner_id_hex = seller_wrap.inner_event_id.to_hex();
let now = current_ts_secs()?;
{
let mut guard = conn.lock().await;
let tx = guard.transaction()?;
db::mediation::insert_outbound_message(
&tx,
&db::mediation::NewOutboundMessage {
session_id,
party: TranscriptParty::Buyer,
shared_pubkey: &buyer_shared_pubkey_hex,
inner_event_id: &buyer_inner_id_hex,
inner_event_created_at: buyer_wrap.inner_created_at,
outer_event_id: Some(&buyer_wrap.outer.id.to_hex()),
content: &buyer_content,
prompt_bundle_id: &prompt_bundle.id,
policy_hash: &prompt_bundle.policy_hash,
persisted_at: now,
},
)?;
db::mediation::insert_outbound_message(
&tx,
&db::mediation::NewOutboundMessage {
session_id,
party: TranscriptParty::Seller,
shared_pubkey: &seller_shared_pubkey_hex,
inner_event_id: &seller_inner_id_hex,
inner_event_created_at: seller_wrap.inner_created_at,
outer_event_id: Some(&seller_wrap.outer.id.to_hex()),
content: &seller_content,
prompt_bundle_id: &prompt_bundle.id,
policy_hash: &prompt_bundle.policy_hash,
persisted_at: now,
},
)?;
tx.execute(
"UPDATE mediation_sessions
SET last_transition_at = ?1
WHERE session_id = ?2 AND state = 'awaiting_response'",
params![now, session_id],
)?;
db::mediation::advance_evaluator_marker(&tx, session_id, round_count_to_mark)?;
tx.commit()?;
}
session::publish_with_bounded_retry(client, &buyer_wrap.outer, "buyer").await?;
record_outbound_sent_audit(
conn,
session_id,
&buyer_shared_pubkey_hex,
&buyer_inner_id_hex,
prompt_bundle,
)
.await?;
session::publish_with_bounded_retry(client, &seller_wrap.outer, "seller").await?;
record_outbound_sent_audit(
conn,
session_id,
&seller_shared_pubkey_hex,
&seller_inner_id_hex,
prompt_bundle,
)
.await?;
info!(
session_id = %session_id,
round = round_number,
round_count_marked = round_count_to_mark,
prompt_bundle_id = %prompt_bundle.id,
policy_hash = %prompt_bundle.policy_hash,
"follow-up clarifying message dispatched to both parties"
);
Ok(())
}
async fn record_outbound_sent_audit(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
session_id: &str,
shared_pubkey_hex: &str,
inner_event_id_hex: &str,
prompt_bundle: &Arc<PromptBundle>,
) -> Result<()> {
let now = current_ts_secs()?;
let guard = conn.lock().await;
db::mediation_events::record_outbound_sent(
&guard,
session_id,
shared_pubkey_hex,
inner_event_id_hex,
Some(&prompt_bundle.id),
Some(&prompt_bundle.policy_hash),
now,
)?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub async fn run_engine(
conn: Arc<AsyncMutex<rusqlite::Connection>>,
client: Client,
serbero_keys: Keys,
mostro_pubkey: PublicKey,
reasoning: Arc<dyn ReasoningProvider>,
prompt_bundle: Arc<PromptBundle>,
provider_name: String,
model_name: String,
auth_handle: auth_retry::AuthRetryHandle,
solvers: Vec<SolverConfig>,
mediation_cfg: MediationConfig,
session_key_cache: SessionKeyCache,
) {
info!(
tick_seconds = ENGINE_TICK_INTERVAL.as_secs(),
provider = %provider_name,
model = %model_name,
"mediation engine loop starting"
);
if let Err(e) = startup_resume_pass(&conn, &prompt_bundle, &session_key_cache).await {
error!(error = %e, "mediation engine startup-resume pass failed");
}
let mut ticker = tokio::time::interval(ENGINE_TICK_INTERVAL);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
ticker.tick().await;
loop {
ticker.tick().await;
if let Err(e) = run_engine_tick(
&conn,
&client,
&serbero_keys,
&mostro_pubkey,
reasoning.as_ref(),
&prompt_bundle,
&provider_name,
&model_name,
&auth_handle,
&session_key_cache,
&solvers,
)
.await
{
error!(error = %e, "mediation engine tick failed");
}
if let Err(e) = run_ingest_tick(
&conn,
&client,
&serbero_keys,
reasoning.as_ref(),
&session_key_cache,
&prompt_bundle,
&provider_name,
&model_name,
&mediation_cfg,
&solvers,
)
.await
{
error!(error = %e, "mediation ingest tick failed");
}
}
}
async fn startup_resume_pass(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
prompt_bundle: &Arc<PromptBundle>,
session_key_cache: &SessionKeyCache,
) -> Result<()> {
let sessions = {
let guard = conn.lock().await;
db::mediation::list_live_sessions(&guard)?
};
if sessions.is_empty() {
debug!("startup resume: no live sessions");
return Ok(());
}
info!(
count = sessions.len(),
"startup resume: attempting to repopulate session-key cache"
);
for s in sessions {
let (bsp, ssp) = match (
s.buyer_shared_pubkey.as_deref(),
s.seller_shared_pubkey.as_deref(),
) {
(Some(b), Some(se)) => (b, se),
_ => {
warn!(
session_id = %s.session_id,
"startup resume: session missing shared pubkey columns; skipping"
);
continue;
}
};
match dispute_chat_flow::load_chat_keys_for_session(bsp, ssp) {
Ok(material) => {
let mut guard = session_key_cache.lock().await;
guard.insert(s.session_id.clone(), material);
info!(
session_id = %s.session_id,
"startup resume: session material restored into cache"
);
}
Err(e) => {
if s.policy_hash == prompt_bundle.policy_hash {
info!(
session_id = %s.session_id,
policy_hash = %s.policy_hash,
error = %e,
"startup resume: key material unavailable but pinned bundle matches; \
session stays alive (ingest tick will skip until re-derivation)"
);
} else {
handle_policy_bundle_missing(
conn,
&s.session_id,
&s.policy_hash,
&prompt_bundle.policy_hash,
)
.await;
}
}
}
}
Ok(())
}
async fn handle_policy_bundle_missing(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
session_id: &str,
pinned_hash: &str,
loaded_hash: &str,
) {
let now = match current_ts_secs() {
Ok(t) => t,
Err(e) => {
error!(error = %e, "startup resume: refusing to escalate with invalid clock");
return;
}
};
let payload = json!({
"trigger": "policy_bundle_missing",
"session_id": session_id,
"pinned_hash": pinned_hash,
"loaded_hash": loaded_hash,
})
.to_string();
let mut guard = conn.lock().await;
let tx = match guard.transaction() {
Ok(tx) => tx,
Err(e) => {
error!(
session_id = %session_id,
error = %e,
"startup resume: failed to open escalation transaction"
);
return;
}
};
if let Err(e) = db::mediation::set_session_state(
&tx,
session_id,
MediationSessionState::EscalationRecommended,
now,
) {
error!(
session_id = %session_id,
error = %e,
"startup resume: set_session_state failed (transaction will roll back)"
);
return;
}
if let Err(e) = db::mediation_events::record_event(
&tx,
MediationEventKind::EscalationRecommended,
Some(session_id),
&payload,
None,
None,
Some(pinned_hash),
now,
) {
error!(
session_id = %session_id,
error = %e,
"startup resume: record_event failed (transaction will roll back)"
);
return;
}
if let Err(e) = tx.commit() {
error!(
session_id = %session_id,
error = %e,
"startup resume: escalation transaction commit failed"
);
return;
}
error!(
session_id = %session_id,
pinned_hash = %pinned_hash,
loaded_hash = %loaded_hash,
"startup resume: pinned prompt bundle missing; session escalated (policy_bundle_missing)"
);
}
#[allow(clippy::too_many_arguments)]
async fn run_engine_tick(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
client: &Client,
serbero_keys: &Keys,
mostro_pubkey: &PublicKey,
reasoning: &dyn ReasoningProvider,
prompt_bundle: &Arc<PromptBundle>,
provider_name: &str,
model_name: &str,
auth_handle: &auth_retry::AuthRetryHandle,
session_key_cache: &SessionKeyCache,
solvers: &[SolverConfig],
) -> Result<()> {
let eligible = list_eligible_disputes(conn).await?;
if eligible.is_empty() {
debug!("engine tick: no eligible disputes");
return Ok(());
}
debug!(count = eligible.len(), "engine tick: eligible disputes");
for eligible in eligible {
let eligibility::EligibleDispute {
dispute_id,
initiator_role,
} = &eligible;
let dispute_uuid = match Uuid::parse_str(dispute_id) {
Ok(u) => u,
Err(e) => {
warn!(
dispute_id = %dispute_id,
error = %e,
"engine tick: skipping dispute with non-UUID id"
);
continue;
}
};
let start_outcome = start::try_start_for(start::StartParams {
open: session::OpenSessionParams {
conn,
client,
serbero_keys,
mostro_pubkey,
reasoning,
prompt_bundle,
dispute_id,
initiator_role: *initiator_role,
dispute_uuid,
take_flow_timeout: DEFAULT_TAKE_FLOW_TIMEOUT,
take_flow_poll_interval: DEFAULT_TAKE_FLOW_POLL_INTERVAL,
provider_name,
model_name,
auth_handle,
session_key_cache: Some(session_key_cache),
solvers,
},
trigger: start::StartTrigger::TickRetry,
})
.await;
match start_outcome {
start::StartOutcome::NotEligible => {
debug!(
dispute_id = %dispute_id,
"engine tick: dispute no longer eligible; skipping"
);
}
start::StartOutcome::Started(session::OpenOutcome::Opened { session_id }) => {
info!(
dispute_id = %dispute_id,
session_id = %session_id,
"engine opened new mediation session"
);
}
start::StartOutcome::Started(session::OpenOutcome::ReadyForSummary {
session_id,
classification,
confidence,
}) => {
info!(
dispute_id = %dispute_id,
session_id = %session_id,
classification = %classification,
confidence,
"engine: session opened in cooperative-summary mode; delivering summary"
);
if let Err(e) = deliver_summary(
conn,
client,
serbero_keys,
&session_id,
dispute_id,
classification,
confidence,
Vec::new(),
prompt_bundle,
reasoning,
solvers,
provider_name,
model_name,
)
.await
{
error!(
session_id = %session_id,
error = %e,
"engine: deliver_summary failed; session left mid-pipeline"
);
}
}
start::StartOutcome::Started(session::OpenOutcome::AlreadyOpen { session_id }) => {
debug!(
dispute_id = %dispute_id,
session_id = %session_id,
"engine tick: dispute already has an open mediation session"
);
}
start::StartOutcome::Started(session::OpenOutcome::EscalatedBeforeTake {
dispute_id: did,
trigger,
}) => {
warn!(
dispute_id = %did,
trigger = %trigger,
"engine: opening-call escalate — dispute-scoped handoff already recorded"
);
notify_solvers_dispute_escalation(conn, client, solvers, &did, trigger).await;
}
start::StartOutcome::Started(session::OpenOutcome::RefusedReasoningUnavailable {
reason,
})
| start::StartOutcome::Started(session::OpenOutcome::RefusedAuthPending { reason })
| start::StartOutcome::Started(session::OpenOutcome::RefusedAuthTerminated {
reason,
}) => {
warn!(
dispute_id = %dispute_id,
reason = %reason,
"engine tick: Started arm carried a Refused variant (unexpected)"
);
}
start::StartOutcome::StoppedBeforeTake { reason } => {
warn!(
dispute_id = %dispute_id,
reason = %reason,
"engine tick: start attempt stopped before take; will retry next cycle (SC-105)"
);
}
start::StartOutcome::TakeFailed { reason } => {
warn!(
dispute_id = %dispute_id,
reason = %reason,
"engine tick: take-dispute failed"
);
}
start::StartOutcome::Error(e) => {
error!(
dispute_id = %dispute_id,
error = %e,
"engine tick: start attempt returned error; continuing with next dispute"
);
}
}
}
Ok(())
}
#[instrument(
skip_all,
fields(session_id = %session_id, dispute_id = %dispute_id)
)]
#[allow(clippy::too_many_arguments)]
pub async fn deliver_summary(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
client: &Client,
_serbero_keys: &Keys,
session_id: &str,
dispute_id: &str,
classification: ClassificationLabel,
confidence: f64,
transcript: Vec<TranscriptEntry>,
prompt_bundle: &Arc<PromptBundle>,
reasoning: &dyn ReasoningProvider,
solvers: &[SolverConfig],
provider_name: &str,
model_name: &str,
) -> Result<()> {
transition_session(
conn,
session_id,
MediationSessionState::SummaryPending,
current_ts_secs()?,
)
.await?;
let summary = match summarizer::summarize(summarizer::SummarizeParams {
conn,
session_id,
dispute_id,
classification,
confidence,
transcript,
prompt_bundle,
reasoning,
provider_name,
model_name,
})
.await
{
Ok(s) => s,
Err(Error::PolicyViolation(msg)) => {
warn!(
session_id = %session_id,
reason = %msg,
"deliver_summary: authority-boundary attempt in summary; escalating"
);
escalate_from_summary_path(
conn,
session_id,
prompt_bundle,
EscalationTrigger::AuthorityBoundaryAttempt,
&msg,
)
.await?;
return Ok(());
}
Err(e) => {
warn!(
session_id = %session_id,
error = %e,
"deliver_summary: summarizer failed; escalating as reasoning_unavailable"
);
escalate_from_summary_path(
conn,
session_id,
prompt_bundle,
EscalationTrigger::ReasoningUnavailable,
&e.to_string(),
)
.await?;
return Ok(());
}
};
let assigned_solver: Option<String> = {
let guard = conn.lock().await;
match guard.query_row(
"SELECT assigned_solver FROM disputes WHERE dispute_id = ?1",
params![dispute_id],
|r| r.get::<_, Option<String>>(0),
) {
Ok(opt) => opt,
Err(rusqlite::Error::QueryReturnedNoRows) => {
return Err(Error::InvalidEvent(format!(
"deliver_summary: dispute row missing for dispute_id={dispute_id}; \
refusing to broadcast without a valid parent row"
)));
}
Err(e) => {
return Err(Error::Db(e));
}
}
};
let recipients = router::resolve_recipients(solvers, assigned_solver.as_deref());
let recipient_list: Vec<String> = match recipients {
router::Recipients::Targeted(pk) => vec![pk],
router::Recipients::Broadcast(v) => v,
};
if recipient_list.is_empty() {
warn!(
session_id = %session_id,
"deliver_summary: no solver recipients configured; escalating (notification_failed)"
);
escalate_from_summary_path(
conn,
session_id,
prompt_bundle,
EscalationTrigger::NotificationFailed,
"no solver recipients configured",
)
.await?;
return Ok(());
}
let dm_body = format!(
"{}\n\nSuggested next step: {}",
summary.summary_text, summary.suggested_next_step
);
let mut any_sent = false;
for pk_hex in &recipient_list {
let sent_at = current_ts_secs()?;
let (status, error_message) = match PublicKey::parse(pk_hex) {
Ok(pk) => match send_gift_wrap_notification(client, &pk, &dm_body).await {
Ok(()) => {
info!(
session_id = %session_id,
solver_pubkey = %pk_hex,
rationale_id = %summary.rationale_id,
"solver_summary_delivered"
);
any_sent = true;
(NotificationStatus::Sent, None)
}
Err(e) => {
warn!(
session_id = %session_id,
solver_pubkey = %pk_hex,
error = %e,
"deliver_summary: notifier send failed; recording Failed notification row"
);
(NotificationStatus::Failed, Some(e.to_string()))
}
},
Err(e) => {
warn!(
session_id = %session_id,
solver_pubkey = %pk_hex,
error = %e,
"deliver_summary: recipient pubkey parse failed; recording Failed notification row"
);
(
NotificationStatus::Failed,
Some(format!("invalid pubkey: {e}")),
)
}
};
let guard = conn.lock().await;
db::notifications::record_notification_logged(
&guard,
dispute_id,
pk_hex,
sent_at,
status,
error_message.as_deref(),
NotificationType::MediationSummary,
);
}
if !any_sent {
warn!(
session_id = %session_id,
recipients = recipient_list.len(),
"deliver_summary: all recipient sends failed; escalating (notification_failed)"
);
escalate_from_summary_path(
conn,
session_id,
prompt_bundle,
EscalationTrigger::NotificationFailed,
"all recipient sends failed",
)
.await?;
return Ok(());
}
let now = current_ts_secs()?;
transition_session(
conn,
session_id,
MediationSessionState::SummaryDelivered,
now,
)
.await?;
transition_session(conn, session_id, MediationSessionState::Closed, now).await?;
Ok(())
}
pub(crate) async fn notify_solvers_escalation(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
client: &Client,
solvers: &[SolverConfig],
dispute_id: &str,
session_id: &str,
trigger: EscalationTrigger,
) {
let dm_body = format!(
"Mediation session {session_id} (dispute {dispute_id}) escalated — \
trigger: {trigger}. Needs human judgment."
);
notify_solvers_dm(
conn,
client,
solvers,
SolverDmParams {
dispute_id,
session_id,
body: &dm_body,
notif_type: NotificationType::MediationEscalationRecommended,
tracing_label: "solver_escalation_notified",
lookup_log_prefix: "notify_solvers_escalation",
force_broadcast: false,
},
)
.await;
}
pub(crate) async fn notify_solvers_dispute_escalation(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
client: &Client,
solvers: &[SolverConfig],
dispute_id: &str,
trigger: EscalationTrigger,
) {
let dm_body = format!(
"Dispute {dispute_id} escalated before mediation take — \
trigger: {trigger}. Serbero ran the reasoning verdict and \
the policy layer said this dispute is not a mediation \
candidate. No session was opened. Needs human judgment."
);
notify_solvers_dm(
conn,
client,
solvers,
SolverDmParams {
dispute_id,
session_id: "",
body: &dm_body,
notif_type: NotificationType::MediationEscalationRecommended,
tracing_label: "solver_dispute_escalation_notified",
lookup_log_prefix: "notify_solvers_dispute_escalation",
force_broadcast: true,
},
)
.await;
}
pub(crate) async fn notify_solvers_final_resolution_report(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
client: &Client,
solvers: &[SolverConfig],
dispute_id: &str,
session_id: Option<&str>,
body: &str,
) {
notify_solvers_dm(
conn,
client,
solvers,
SolverDmParams {
dispute_id,
session_id: session_id.unwrap_or(""),
body,
notif_type: NotificationType::MediationResolutionReport,
tracing_label: "solver_final_resolution_report_sent",
lookup_log_prefix: "notify_solvers_final_resolution_report",
force_broadcast: true,
},
)
.await;
}
struct SolverDmParams<'a> {
dispute_id: &'a str,
session_id: &'a str,
body: &'a str,
notif_type: NotificationType,
tracing_label: &'static str,
lookup_log_prefix: &'static str,
force_broadcast: bool,
}
async fn notify_solvers_dm(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
client: &Client,
solvers: &[SolverConfig],
params: SolverDmParams<'_>,
) {
let assigned_solver: Option<String> = if params.force_broadcast {
None
} else {
let guard = conn.lock().await;
match guard.query_row(
"SELECT assigned_solver FROM disputes WHERE dispute_id = ?1",
rusqlite::params![params.dispute_id],
|r| r.get::<_, Option<String>>(0),
) {
Ok(opt) => opt,
Err(rusqlite::Error::QueryReturnedNoRows) => {
warn!(
dispute_id = %params.dispute_id,
session_id = %params.session_id,
log_prefix = params.lookup_log_prefix,
"solver_dm: dispute row missing; refusing to broadcast without a valid parent row"
);
return;
}
Err(e) => {
warn!(
dispute_id = %params.dispute_id,
session_id = %params.session_id,
log_prefix = params.lookup_log_prefix,
error = %e,
"solver_dm: assigned_solver lookup failed; skipping notification to avoid unsafe broadcast"
);
return;
}
}
};
let recipients = router::resolve_recipients(solvers, assigned_solver.as_deref());
let recipient_list: Vec<String> = match recipients {
router::Recipients::Targeted(pk) => vec![pk],
router::Recipients::Broadcast(v) => v,
};
if recipient_list.is_empty() {
warn!(
dispute_id = %params.dispute_id,
session_id = %params.session_id,
log_prefix = params.lookup_log_prefix,
"solver_dm: no solver recipients configured"
);
return;
}
for pk_hex in &recipient_list {
let sent_at = match current_ts_secs() {
Ok(t) => t,
Err(e) => {
warn!(
session_id = %params.session_id,
solver_pubkey = %pk_hex,
log_prefix = params.lookup_log_prefix,
error = %e,
"solver_dm: clock guard returned Err; recording sent_at = 0 as a best-effort marker"
);
0
}
};
let (status, error_message) = match PublicKey::parse(pk_hex) {
Ok(pk) => match send_gift_wrap_notification(client, &pk, params.body).await {
Ok(()) => {
info!(
session_id = %params.session_id,
solver_pubkey = %pk_hex,
event = params.tracing_label,
"solver_dm_sent"
);
(NotificationStatus::Sent, None)
}
Err(e) => {
warn!(
session_id = %params.session_id,
solver_pubkey = %pk_hex,
log_prefix = params.lookup_log_prefix,
error = %e,
"solver_dm: notifier send failed; recording Failed row"
);
(NotificationStatus::Failed, Some(e.to_string()))
}
},
Err(e) => {
warn!(
session_id = %params.session_id,
solver_pubkey = %pk_hex,
log_prefix = params.lookup_log_prefix,
error = %e,
"solver_dm: recipient pubkey parse failed"
);
(
NotificationStatus::Failed,
Some(format!("invalid pubkey: {e}")),
)
}
};
let guard = conn.lock().await;
db::notifications::record_notification_logged(
&guard,
params.dispute_id,
pk_hex,
sent_at,
status,
error_message.as_deref(),
params.notif_type,
);
}
}
async fn transition_session(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
session_id: &str,
new_state: MediationSessionState,
at: i64,
) -> Result<()> {
let guard = conn.lock().await;
db::mediation::set_session_state(&guard, session_id, new_state, at)
}
async fn escalate_from_summary_path(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
session_id: &str,
prompt_bundle: &Arc<PromptBundle>,
trigger: EscalationTrigger,
reason: &str,
) -> Result<()> {
let now = current_ts_secs()?;
let payload = json!({
"trigger": trigger.to_string(),
"session_id": session_id,
"reason": reason,
})
.to_string();
let mut guard = conn.lock().await;
let tx = guard.transaction()?;
db::mediation::set_session_state(
&tx,
session_id,
MediationSessionState::EscalationRecommended,
now,
)?;
db::mediation_events::record_event(
&tx,
MediationEventKind::EscalationRecommended,
Some(session_id),
&payload,
None,
Some(&prompt_bundle.id),
Some(&prompt_bundle.policy_hash),
now,
)?;
tx.commit()?;
Ok(())
}
async fn list_eligible_disputes(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
) -> Result<Vec<eligibility::EligibleDispute>> {
let guard = conn.lock().await;
eligibility::list_mediation_eligible(&guard)
}
pub(crate) fn current_ts_secs() -> Result<i64> {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.map_err(|e| Error::ChatTransport(format!("system clock is before UNIX_EPOCH: {e}")))
}
#[instrument(
skip_all,
fields(
sessions_checked = tracing::field::Empty,
envelopes_fetched = tracing::field::Empty,
rows_ingested = tracing::field::Empty,
rows_duplicate = tracing::field::Empty,
rows_stale = tracing::field::Empty,
)
)]
#[allow(clippy::too_many_arguments)]
async fn run_ingest_tick(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
client: &Client,
serbero_keys: &Keys,
reasoning: &dyn ReasoningProvider,
session_key_cache: &SessionKeyCache,
prompt_bundle: &Arc<PromptBundle>,
provider_name: &str,
model_name: &str,
mediation_cfg: &MediationConfig,
solvers: &[SolverConfig],
) -> Result<()> {
debug!("ingest tick starting");
let sessions = {
let guard = conn.lock().await;
db::mediation::list_live_sessions(&guard)?
};
let mut sessions_checked: u64 = 0;
let mut envelopes_fetched: u64 = 0;
let mut rows_ingested: u64 = 0;
let mut rows_duplicate: u64 = 0;
let mut rows_stale: u64 = 0;
let mut fetchers: tokio::task::JoinSet<IngestFetchResult> = tokio::task::JoinSet::new();
for s in sessions {
sessions_checked += 1;
let material = {
let guard = session_key_cache.lock().await;
guard.get(&s.session_id).cloned()
};
let Some(material) = material else {
debug!(
session_id = %s.session_id,
"ingest tick: no in-memory chat material; skipping session (restart-resume pending)"
);
continue;
};
if let (Some(bsp), Some(ssp)) = (
s.buyer_shared_pubkey.as_deref(),
s.seller_shared_pubkey.as_deref(),
) {
if bsp != material.buyer_shared_pubkey() || ssp != material.seller_shared_pubkey() {
warn!(
session_id = %s.session_id,
"ingest tick: cached chat material does not match session row's \
shared pubkeys; skipping"
);
continue;
}
}
let buyer_pk = match PublicKey::parse(&material.buyer_pubkey) {
Ok(pk) => pk,
Err(e) => {
warn!(
session_id = %s.session_id,
error = %e,
"ingest tick: invalid buyer trade pubkey in cache; skipping session"
);
continue;
}
};
let seller_pk = match PublicKey::parse(&material.seller_pubkey) {
Ok(pk) => pk,
Err(e) => {
warn!(
session_id = %s.session_id,
error = %e,
"ingest tick: invalid seller trade pubkey in cache; skipping session"
);
continue;
}
};
let client = client.clone();
let session_id = s.session_id.clone();
fetchers.spawn(async move {
let parties = [
PartyChatMaterial {
party: TranscriptParty::Buyer,
shared_keys: &material.buyer_shared_keys,
expected_author: buyer_pk,
},
PartyChatMaterial {
party: TranscriptParty::Seller,
shared_keys: &material.seller_shared_keys,
expected_author: seller_pk,
},
];
let result = inbound::fetch_inbound(&client, &parties, INGEST_FETCH_TIMEOUT).await;
(session_id, result)
});
}
while let Some(res) = fetchers.join_next().await {
let (session_id, fetch_result) = match res {
Ok(pair) => pair,
Err(e) => {
warn!(
error = %e,
"ingest tick: a fetch task panicked or was cancelled; continuing"
);
continue;
}
};
let envelopes = match fetch_result {
Ok(v) => v,
Err(e) => {
warn!(
session_id = %session_id,
error = %e,
"ingest tick: fetch_inbound failed; continuing with next session"
);
continue;
}
};
envelopes_fetched += envelopes.len() as u64;
let mut session_had_fresh = false;
'envelope_loop: for env in &envelopes {
match session::ingest_inbound(conn, &session_id, env).await {
Ok(session::IngestOutcome::Fresh { round_count_after }) => {
rows_ingested += 1;
session_had_fresh = true;
let rc_after: u32 = round_count_after.max(0) as u32;
if session::check_round_limit(rc_after, mediation_cfg.max_rounds) {
session_had_fresh = false;
warn!(
session_id = %session_id,
round_count = rc_after,
max_rounds = mediation_cfg.max_rounds,
"round_limit_escalation"
);
let dispute_id: Option<String> = {
let g = conn.lock().await;
g.query_row(
"SELECT dispute_id FROM mediation_sessions \
WHERE session_id = ?1",
rusqlite::params![session_id],
|r| r.get::<_, String>(0),
)
.ok()
};
let Some(did) = dispute_id else {
warn!(
session_id = %session_id,
"round_limit_escalation: dispute_id lookup failed; \
skipping escalation attempt"
);
break;
};
match escalation::recommend(escalation::RecommendParams {
conn,
session_id: Some(&session_id),
dispute_id: &did,
trigger: EscalationTrigger::RoundLimit,
evidence_refs: vec![env.inner_event_id.clone()],
rationale_refs: Vec::new(),
prompt_bundle_id: &prompt_bundle.id,
policy_hash: &prompt_bundle.policy_hash,
})
.await
{
Ok(()) => {
notify_solvers_escalation(
conn,
client,
solvers,
&did,
&session_id,
EscalationTrigger::RoundLimit,
)
.await;
}
Err(e) => {
warn!(
session_id = %session_id,
error = %e,
"ingest tick: round_limit escalation failed; \
breaking out of envelope loop for this session"
);
}
}
break 'envelope_loop;
}
}
Ok(session::IngestOutcome::Duplicate) => rows_duplicate += 1,
Ok(session::IngestOutcome::Stale) => rows_stale += 1,
Err(e) => {
warn!(
session_id = %session_id,
error = %e,
inner_event_id = %env.inner_event_id,
"ingest tick: ingest_inbound failed for envelope"
);
}
}
}
if session_had_fresh {
follow_up::advance_session_round(
conn,
client,
serbero_keys,
reasoning,
prompt_bundle,
&session_id,
session_key_cache,
solvers,
provider_name,
model_name,
)
.await
.unwrap_or_else(|e| {
warn!(
session_id = %session_id,
error = %e,
"ingest tick: advance_session_round surfaced an error (continuing tick)"
);
});
}
}
let span = tracing::Span::current();
span.record("sessions_checked", sessions_checked);
span.record("envelopes_fetched", envelopes_fetched);
span.record("rows_ingested", rows_ingested);
span.record("rows_duplicate", rows_duplicate);
span.record("rows_stale", rows_stale);
debug!(
sessions_checked,
envelopes_fetched, rows_ingested, rows_duplicate, rows_stale, "ingest tick finished"
);
if let Err(e) =
check_party_unresponsive_timeout(conn, client, solvers, prompt_bundle, mediation_cfg).await
{
warn!(error = %e, "ingest tick: party-unresponsive timeout sweep failed");
}
Ok(())
}
pub async fn check_party_unresponsive_timeout(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
client: &Client,
solvers: &[SolverConfig],
prompt_bundle: &Arc<PromptBundle>,
mediation_cfg: &MediationConfig,
) -> Result<()> {
if mediation_cfg.party_response_timeout_seconds == 0 {
debug!("party-response timeout sweep disabled (timeout = 0)");
return Ok(());
}
let now = current_ts_secs()?;
let timeout = mediation_cfg.party_response_timeout_seconds as i64;
#[derive(Debug)]
struct Candidate {
session_id: String,
dispute_id: String,
state: MediationSessionState,
started_at: i64,
buyer_last: Option<i64>,
seller_last: Option<i64>,
}
let candidates: Vec<Candidate> = {
use std::str::FromStr;
let guard = conn.lock().await;
let mut stmt = guard.prepare(
"SELECT session_id, dispute_id, state, started_at,
buyer_last_seen_inner_ts, seller_last_seen_inner_ts
FROM mediation_sessions
WHERE state NOT IN (
'closed',
'summary_delivered',
'escalation_recommended',
'superseded_by_human'
)",
)?;
let rows = stmt.query_map([], |r| {
Ok((
r.get::<_, String>(0)?,
r.get::<_, String>(1)?,
r.get::<_, String>(2)?,
r.get::<_, i64>(3)?,
r.get::<_, Option<i64>>(4)?,
r.get::<_, Option<i64>>(5)?,
))
})?;
let mut out = Vec::new();
for row in rows {
let (session_id, dispute_id, state_s, started_at, buyer_last, seller_last) = row?;
let state = match MediationSessionState::from_str(&state_s) {
Ok(s) => s,
Err(e) => {
warn!(
session_id = %session_id,
state = %state_s,
error = %e,
"timeout sweep: skipping session with unparseable state"
);
continue;
}
};
out.push(Candidate {
session_id,
dispute_id,
state,
started_at,
buyer_last,
seller_last,
});
}
out
};
for c in candidates {
if c.state.is_terminal() || c.state == MediationSessionState::EscalationRecommended {
continue;
}
let reference = [Some(c.started_at), c.buyer_last, c.seller_last]
.into_iter()
.flatten()
.max()
.unwrap_or(c.started_at);
let deadline = reference.saturating_add(timeout);
if now <= deadline {
continue;
}
warn!(
session_id = %c.session_id,
reference_ts = reference,
deadline,
now,
"party_unresponsive_escalation"
);
match escalation::recommend(escalation::RecommendParams {
conn,
session_id: Some(&c.session_id),
dispute_id: &c.dispute_id,
trigger: EscalationTrigger::PartyUnresponsive,
evidence_refs: Vec::new(),
rationale_refs: Vec::new(),
prompt_bundle_id: &prompt_bundle.id,
policy_hash: &prompt_bundle.policy_hash,
})
.await
{
Ok(()) => {
notify_solvers_escalation(
conn,
client,
solvers,
&c.dispute_id,
&c.session_id,
EscalationTrigger::PartyUnresponsive,
)
.await;
}
Err(e) => {
error!(
session_id = %c.session_id,
error = %e,
"timeout sweep: escalation::recommend failed"
);
}
}
}
Ok(())
}
type IngestFetchResult = (String, Result<Vec<inbound::InboundEnvelope>>);