use std::sync::Arc;
use nostr_sdk::prelude::{Client, Keys, PublicKey};
use tokio::sync::Mutex as AsyncMutex;
use tracing::{debug, warn};
use crate::db;
use crate::error::Result;
use crate::mediation::escalation::HandoffPackage;
use crate::models::{NotificationStatus, NotificationType};
use crate::nostr::send_gift_wrap_notification;
pub(crate) const DM_VERSION: &str = "escalation_handoff/v1";
pub fn build_dm_body(pkg: &HandoffPackage) -> String {
let session_header = match &pkg.session_id {
Some(sid) => format!("Session: {sid}"),
None => "Session: <none — dispute-scoped handoff>".to_string(),
};
let payload_line = serde_json::to_string(pkg).unwrap_or_else(|e| {
warn!(
dispute_id = %pkg.dispute_id,
error = %e,
"build_dm_body: HandoffPackage failed to serialize; emitting degraded payload line"
);
format!(
r#"{{"dispute_id":"{}","serialization_error":true}}"#,
pkg.dispute_id
)
});
format!(
"{DM_VERSION}\n\
Dispute: {dispute}\n\
{session_header}\n\
Trigger: {trigger}\n\
\n\
Escalation required for dispute {dispute}. Trigger: {trigger}. \
This dispute was evaluated by Serbero's mediation assistance \
system and requires human judgment. Please run TakeDispute for \
dispute {dispute} on your Mostro instance to review the full \
context.\n\
\n\
Handoff payload (JSON):\n\
{payload_line}",
dispute = pkg.dispute_id,
trigger = pkg.trigger,
)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DispatchOutcome {
AllSucceeded { recipients: Vec<String> },
AllFailed { attempted: Vec<String> },
PartialSuccess {
attempted: Vec<String>,
succeeded: Vec<String>,
failed: Vec<String>,
},
}
impl DispatchOutcome {
pub fn attempted_recipients(&self) -> Vec<String> {
match self {
DispatchOutcome::AllSucceeded { recipients } => recipients.clone(),
DispatchOutcome::AllFailed { attempted } => attempted.clone(),
DispatchOutcome::PartialSuccess { attempted, .. } => attempted.clone(),
}
}
}
pub async fn send_to_recipients(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
client: &Client,
_serbero_keys: &Keys,
dispute_id: &str,
recipients: &[String],
body: &str,
now_ts: i64,
) -> Result<DispatchOutcome> {
let mut succeeded: Vec<String> = Vec::with_capacity(recipients.len());
let mut failed: Vec<String> = Vec::new();
for pk_hex in recipients {
let parsed_pk = match PublicKey::parse(pk_hex) {
Ok(p) => p,
Err(e) => {
warn!(
dispute_id = %dispute_id,
solver_pubkey = %pk_hex,
error = %e,
"phase4_dispatch: recipient pubkey malformed"
);
insert_notification(
conn,
dispute_id,
pk_hex,
NotificationStatus::Failed,
Some(format!("invalid pubkey: {e}")),
now_ts,
)
.await;
failed.push(pk_hex.clone());
continue;
}
};
match send_gift_wrap_notification(client, &parsed_pk, body).await {
Ok(()) => {
debug!(
dispute_id = %dispute_id,
solver_pubkey = %pk_hex,
"phase4_dispatch: recipient send ok"
);
insert_notification(
conn,
dispute_id,
pk_hex,
NotificationStatus::Sent,
None,
now_ts,
)
.await;
succeeded.push(pk_hex.clone());
}
Err(e) => {
warn!(
dispute_id = %dispute_id,
solver_pubkey = %pk_hex,
error = %e,
"phase4_dispatch: recipient send failed"
);
insert_notification(
conn,
dispute_id,
pk_hex,
NotificationStatus::Failed,
Some(e.to_string()),
now_ts,
)
.await;
failed.push(pk_hex.clone());
}
}
}
let attempted: Vec<String> = recipients.to_vec();
let outcome = if failed.is_empty() {
DispatchOutcome::AllSucceeded {
recipients: attempted,
}
} else if succeeded.is_empty() {
DispatchOutcome::AllFailed { attempted }
} else {
DispatchOutcome::PartialSuccess {
attempted,
succeeded,
failed,
}
};
Ok(outcome)
}
async fn insert_notification(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
dispute_id: &str,
solver_pubkey: &str,
status: NotificationStatus,
error_message: Option<String>,
sent_at: i64,
) {
let guard = conn.lock().await;
db::notifications::record_notification_logged(
&guard,
dispute_id,
solver_pubkey,
sent_at,
status,
error_message.as_deref(),
NotificationType::MediationEscalationRecommended,
);
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_package(session_id: Option<&str>) -> HandoffPackage {
HandoffPackage {
dispute_id: "dispute-abc".to_string(),
session_id: session_id.map(|s| s.to_string()),
trigger: "conflicting_claims".to_string(),
evidence_refs: vec!["inner-event-1".to_string(), "inner-event-2".to_string()],
prompt_bundle_id: "phase3-default".to_string(),
policy_hash: "abcd1234".to_string(),
rationale_refs: vec!["9f86d081884c".to_string()],
assembled_at: 1_745_000_000,
}
}
#[test]
fn body_starts_with_version_prefix() {
let pkg = sample_package(Some("sess-1"));
let body = build_dm_body(&pkg);
assert!(
body.starts_with("escalation_handoff/v1\n"),
"first line must be exactly the version prefix; got: {body}"
);
}
#[test]
fn body_carries_dispute_id_and_trigger_in_headers_and_summary() {
let pkg = sample_package(Some("sess-1"));
let body = build_dm_body(&pkg);
assert!(body.contains("Dispute: dispute-abc"));
assert!(body.contains("Trigger: conflicting_claims"));
assert!(body.contains("Escalation required for dispute dispute-abc"));
assert!(body.contains("Please run TakeDispute for dispute dispute-abc"));
assert!(body.contains("Serbero's mediation assistance system"));
}
#[test]
fn body_session_header_uses_literal_marker_when_session_id_absent() {
let pkg = sample_package(None);
let body = build_dm_body(&pkg);
assert!(
body.contains("Session: <none — dispute-scoped handoff>"),
"dispute-scoped (FR-122) handoff must render the <none> marker; got: {body}"
);
}
#[test]
fn body_session_header_uses_session_id_when_present() {
let pkg = sample_package(Some("sess-xyz"));
let body = build_dm_body(&pkg);
assert!(body.contains("Session: sess-xyz"));
assert!(!body.contains("<none — dispute-scoped handoff>"));
}
#[test]
fn json_payload_round_trips_to_handoff_package() {
let pkg = sample_package(Some("sess-1"));
let body = build_dm_body(&pkg);
let json_line = body
.lines()
.skip_while(|l| !l.starts_with("Handoff payload (JSON)"))
.nth(1)
.expect("payload line must exist");
let parsed: HandoffPackage =
serde_json::from_str(json_line).expect("JSON round-trip must succeed");
assert_eq!(parsed.dispute_id, pkg.dispute_id);
assert_eq!(parsed.trigger, pkg.trigger);
assert_eq!(parsed.evidence_refs, pkg.evidence_refs);
assert_eq!(parsed.rationale_refs, pkg.rationale_refs);
assert_eq!(parsed.assembled_at, pkg.assembled_at);
assert_eq!(parsed.session_id, pkg.session_id);
}
#[test]
fn json_payload_omits_session_id_key_when_none() {
let pkg = sample_package(None);
let body = build_dm_body(&pkg);
let json_line = body
.lines()
.skip_while(|l| !l.starts_with("Handoff payload (JSON)"))
.nth(1)
.expect("payload line must exist");
assert!(
!json_line.contains("session_id"),
"absent session must NOT emit the session_id key at all (got: {json_line})"
);
}
#[test]
fn body_never_carries_raw_rationale_text() {
let mut pkg = sample_package(Some("sess-1"));
pkg.rationale_refs = vec!["ref-abc123".to_string()];
let body = build_dm_body(&pkg);
assert!(body.contains("ref-abc123"));
assert!(
!body.contains("rationale_text"),
"raw rationale text MUST NOT appear in the DM body"
);
}
#[test]
fn dispatch_outcome_attempted_recipients_preserves_order() {
let o = DispatchOutcome::AllSucceeded {
recipients: vec!["a".into(), "b".into(), "c".into()],
};
assert_eq!(o.attempted_recipients(), vec!["a", "b", "c"]);
let o = DispatchOutcome::AllFailed {
attempted: vec!["x".into(), "y".into()],
};
assert_eq!(o.attempted_recipients(), vec!["x", "y"]);
let o = DispatchOutcome::PartialSuccess {
attempted: vec!["ok-1".into(), "bad-1".into(), "ok-2".into()],
succeeded: vec!["ok-1".into(), "ok-2".into()],
failed: vec!["bad-1".into()],
};
assert_eq!(
o.attempted_recipients(),
vec!["ok-1", "bad-1", "ok-2"],
"PartialSuccess must preserve the original send-loop order"
);
}
#[test]
fn partial_success_with_failure_before_later_success_keeps_original_order() {
let o = DispatchOutcome::PartialSuccess {
attempted: vec!["A".into(), "B".into()],
succeeded: vec!["B".into()],
failed: vec!["A".into()],
};
assert_eq!(
o.attempted_recipients(),
vec!["A", "B"],
"attempted_recipients MUST reflect send-loop order, \
not a succeeded-then-failed concatenation"
);
}
}