use std::str::FromStr;
use std::time::Duration;
use chrono::Utc;
use fedimint_tonic_lnd::lnrpc::payment::PaymentStatus;
use mostro_core::error::{
CantDoReason,
MostroError::{self, MostroCantDo, MostroInternalErr},
ServiceError,
};
use mostro_core::message::{Action, BondPayoutRequest, Message, Payload};
use mostro_core::nip59::UnwrappedMessage;
use mostro_core::order::{Order, SmallOrder};
use nostr_sdk::prelude::*;
use sqlx::{Pool, Sqlite};
use sqlx_crud::Crud;
use tokio::sync::mpsc::channel;
use tokio::time::timeout;
use tracing::{error, info, warn};
use uuid::Uuid;
use crate::app::context::AppContext;
use crate::config::settings::Settings;
use crate::lightning::invoice::{decode_invoice, is_valid_invoice};
use crate::lightning::{routing_fee_cap_sats, LndConnector};
use crate::util::{bytes_to_string, enqueue_order_msg};
use super::db::{find_bond_by_id, find_bonds_by_state};
use super::model::Bond;
use super::types::{BondSlashReason, BondState};
const PAYMENT_STATUS_RECV_TIMEOUT: Duration = Duration::from_secs(120);
pub async fn run_bond_payout_cycle(pool: &Pool<Sqlite>, ln_client: &mut LndConnector) {
let bonds = match find_bonds_by_state(pool, BondState::PendingPayout).await {
Ok(b) => b,
Err(e) => {
error!("bond payout: failed to enumerate PendingPayout bonds: {e}");
return;
}
};
if bonds.is_empty() {
return;
}
info!(
"bond payout: processing {} PendingPayout bond(s)",
bonds.len()
);
for bond in bonds {
if let Err(e) = process_one_bond(pool, ln_client, &bond).await {
warn!(
bond_id = %bond.id,
order_id = %bond.order_id,
"bond payout: bond cycle errored: {e}"
);
}
}
}
async fn child_payout_blocked_by_locked_parent(
pool: &Pool<Sqlite>,
bond: &Bond,
) -> Result<bool, MostroError> {
let Some(parent_id) = bond.parent_bond_id else {
return Ok(false);
};
match find_bond_by_id(pool, parent_id).await? {
Some(parent) => Ok(parent.state == BondState::Locked.to_string()),
None => {
warn!(
bond_id = %bond.id,
parent_bond_id = %parent_id,
"bond payout: child row's parent bond is missing; skipping this tick"
);
Ok(true)
}
}
}
async fn process_one_bond(
pool: &Pool<Sqlite>,
ln_client: &mut LndConnector,
bond: &Bond,
) -> Result<(), MostroError> {
if child_payout_blocked_by_locked_parent(pool, bond).await? {
return Ok(());
}
let cfg = Settings::get_bond();
let claim_window_seconds = cfg
.map(|c| c.payout_claim_window_days as i64 * 86_400)
.unwrap_or(15 * 86_400);
let invoice_window_seconds = cfg
.map(|c| c.payout_invoice_window_seconds as i64)
.unwrap_or(300);
let max_retries = cfg.map(|c| c.payout_max_retries as i64).unwrap_or(5);
let now = Utc::now().timestamp();
let slashed_at = bond.slashed_at.ok_or_else(|| {
MostroInternalErr(ServiceError::UnexpectedError(format!(
"bond {} in PendingPayout missing slashed_at (invariant violation)",
bond.id
)))
})?;
if bond.payout_invoice.is_none() && now - slashed_at >= claim_window_seconds {
return forfeit_bond(pool, bond).await;
}
let counterparty_share = counterparty_share_sats(bond)?;
if counterparty_share <= 0 {
return finalize_node_only(pool, bond).await;
}
match bond.payout_invoice.as_deref() {
None => request_payout_invoice(pool, bond, invoice_window_seconds).await,
Some(invoice) => {
pay_counterparty(
pool,
ln_client,
bond,
invoice,
max_retries,
claim_window_seconds,
)
.await
}
}
}
fn counterparty_share_sats(bond: &Bond) -> Result<i64, MostroError> {
let node_share = bond.node_share_sats.ok_or_else(|| {
MostroInternalErr(ServiceError::UnexpectedError(format!(
"bond {} in PendingPayout missing node_share_sats (invariant violation)",
bond.id
)))
})?;
if !(0..=bond.amount_sats).contains(&node_share) {
return Err(MostroInternalErr(ServiceError::UnexpectedError(format!(
"bond {} in PendingPayout has node_share_sats {node_share} outside [0, {}] (invariant violation)",
bond.id, bond.amount_sats
))));
}
Ok(bond.amount_sats - node_share)
}
async fn forfeit_bond(pool: &Pool<Sqlite>, bond: &Bond) -> Result<(), MostroError> {
let result = sqlx::query(
"UPDATE bonds SET state = ? WHERE id = ? AND state = ? AND payout_invoice IS NULL",
)
.bind(BondState::Forfeited.to_string())
.bind(bond.id)
.bind(BondState::PendingPayout.to_string())
.execute(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
if result.rows_affected() == 1 {
info!(
bond_id = %bond.id,
order_id = %bond.order_id,
amount_sats = bond.amount_sats,
"bond forfeited: claim window elapsed, node retains full amount"
);
} else {
info!(
bond_id = %bond.id,
order_id = %bond.order_id,
"forfeit: counterparty invoice landed concurrently; payout will continue on next tick"
);
}
Ok(())
}
async fn finalize_node_only(pool: &Pool<Sqlite>, bond: &Bond) -> Result<(), MostroError> {
let result = sqlx::query("UPDATE bonds SET state = ? WHERE id = ? AND state = ?")
.bind(BondState::Slashed.to_string())
.bind(bond.id)
.bind(BondState::PendingPayout.to_string())
.execute(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
if result.rows_affected() == 1 {
info!(
bond_id = %bond.id,
order_id = %bond.order_id,
amount_sats = bond.amount_sats,
"bond slashed (node-only): full amount retained by Mostro"
);
}
Ok(())
}
async fn request_payout_invoice(
pool: &Pool<Sqlite>,
bond: &Bond,
invoice_window_seconds: i64,
) -> Result<(), MostroError> {
let now = Utc::now().timestamp();
if let Some(last) = bond.last_invoice_request_at {
if now - last < invoice_window_seconds {
return Ok(());
}
}
let order = match Order::by_id(pool, bond.order_id)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?
{
Some(o) => o,
None => {
warn!(
bond_id = %bond.id,
order_id = %bond.order_id,
"request_payout_invoice: order row missing; skipping"
);
return Ok(());
}
};
let reason = bond
.slashed_reason
.as_deref()
.and_then(|s| BondSlashReason::from_str(s).ok())
.ok_or_else(|| {
MostroInternalErr(ServiceError::UnexpectedError(format!(
"bond {} in PendingPayout has unparseable slashed_reason {:?}",
bond.id, bond.slashed_reason
)))
})?;
let recipient_pubkey = match resolve_payout_recipient(&order, bond, reason)? {
Some(pk) => pk,
None => {
warn!(
bond_id = %bond.id,
order_id = %bond.order_id,
"request_payout_invoice: cannot resolve recipient; skipping (will retry on next tick)"
);
return Ok(());
}
};
let counterparty_share = counterparty_share_sats(bond)?;
let slashed_at = bond.slashed_at.ok_or_else(|| {
MostroInternalErr(ServiceError::UnexpectedError(format!(
"bond {} in PendingPayout missing slashed_at (invariant violation)",
bond.id
)))
})?;
let small = build_payout_small_order(&order, counterparty_share)?;
let result = sqlx::query(
"UPDATE bonds \
SET invoice_request_attempts = invoice_request_attempts + 1, \
last_invoice_request_at = ? \
WHERE id = ? AND state = ?",
)
.bind(now)
.bind(bond.id)
.bind(BondState::PendingPayout.to_string())
.execute(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
if result.rows_affected() == 0 {
return Ok(());
}
info!(
bond_id = %bond.id,
order_id = %bond.order_id,
amount_sats = counterparty_share,
recipient = %recipient_pubkey,
slashed_at,
attempt = bond.invoice_request_attempts + 1,
"bond payout: requesting invoice from counterparty"
);
enqueue_order_msg(
None,
Some(order.id),
Action::AddBondInvoice,
Some(Payload::BondPayoutRequest(BondPayoutRequest {
order: small,
slashed_at,
})),
recipient_pubkey,
None,
)
.await;
Ok(())
}
const SLASH_CAS_MAX_ATTEMPTS: u32 = 5;
async fn pay_counterparty(
pool: &Pool<Sqlite>,
ln_client: &mut LndConnector,
bond: &Bond,
invoice: &str,
max_retries: i64,
claim_window_seconds: i64,
) -> Result<(), MostroError> {
let counterparty_share = counterparty_share_sats(bond)?;
let decoded = match decode_invoice(invoice) {
Ok(d) => d,
Err(e) => {
return on_send_payment_failure(
pool,
bond,
max_retries,
claim_window_seconds,
PaymentFailureKind::Terminal,
&format!("payout invoice decode failed: {e}"),
)
.await;
}
};
let payment_hash_ref: &[u8] = decoded.payment_hash().as_ref();
let payment_hash_hex = bytes_to_string(payment_hash_ref);
if let Some(persisted_hash) = bond.payout_payment_hash.as_deref() {
if persisted_hash == payment_hash_hex {
match ln_client.lookup_payment_status(payment_hash_ref).await {
Ok(Some(PaymentStatus::Succeeded)) => {
info!(
bond_id = %bond.id,
order_id = %bond.order_id,
"bond payout: reconciled — LND already paid this invoice; finalizing Slashed without re-sending"
);
return slash_after_success(pool, bond, counterparty_share).await;
}
Ok(Some(PaymentStatus::Failed)) => {
return on_send_payment_failure(
pool,
bond,
max_retries,
claim_window_seconds,
PaymentFailureKind::Terminal,
"tracked payment reported Failed on reconciliation",
)
.await;
}
Ok(Some(PaymentStatus::InFlight)) => {
info!(
bond_id = %bond.id,
order_id = %bond.order_id,
"bond payout: prior send_payment still in flight; deferring to next tick"
);
return Ok(());
}
Ok(Some(PaymentStatus::Unknown)) | Ok(None) => {
}
Err(e) => {
warn!(
bond_id = %bond.id,
order_id = %bond.order_id,
"bond payout: reconciliation lookup failed ({e}); falling through to fresh send"
);
}
}
}
}
let routing_fee_cap = routing_fee_cap_sats(counterparty_share);
let persisted = sqlx::query(
"UPDATE bonds \
SET payout_routing_fee_sats = ?, payout_payment_hash = ? \
WHERE id = ? AND state = ?",
)
.bind(routing_fee_cap)
.bind(&payment_hash_hex)
.bind(bond.id)
.bind(BondState::PendingPayout.to_string())
.execute(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
if persisted.rows_affected() == 0 {
info!(
bond_id = %bond.id,
order_id = %bond.order_id,
"bond payout: row no longer PendingPayout at hash-persist CAS; skipping send_payment"
);
return Ok(());
}
let (tx, mut rx) = channel(100);
let send_outcome = ln_client
.send_payment(invoice, counterparty_share, tx)
.await;
if let Err(e) = send_outcome {
return on_send_payment_failure(
pool,
bond,
max_retries,
claim_window_seconds,
PaymentFailureKind::Indeterminate,
&format!("{e}"),
)
.await;
}
let mut succeeded = false;
let mut failure: Option<(PaymentFailureKind, String)> = None;
loop {
match timeout(PAYMENT_STATUS_RECV_TIMEOUT, rx.recv()).await {
Err(_) => {
failure = Some((
PaymentFailureKind::Indeterminate,
format!(
"payment status stream timed out after {}s without a terminal update",
PAYMENT_STATUS_RECV_TIMEOUT.as_secs()
),
));
break;
}
Ok(None) => break,
Ok(Some(msg)) => {
if let Ok(status) = PaymentStatus::try_from(msg.payment.status) {
match status {
PaymentStatus::Succeeded => {
succeeded = true;
break;
}
PaymentStatus::Failed => {
failure = Some((
PaymentFailureKind::Terminal,
format!("payment failed: reason {}", msg.payment.failure_reason),
));
break;
}
_ => {}
}
}
}
}
}
if succeeded {
return slash_after_success(pool, bond, counterparty_share).await;
}
let (kind, msg) = failure.unwrap_or((
PaymentFailureKind::Indeterminate,
"payment stream ended without terminal status".to_string(),
));
on_send_payment_failure(pool, bond, max_retries, claim_window_seconds, kind, &msg).await
}
async fn slash_after_success(
pool: &Pool<Sqlite>,
bond: &Bond,
counterparty_share: i64,
) -> Result<(), MostroError> {
let mut last_err: Option<String> = None;
for attempt in 0..SLASH_CAS_MAX_ATTEMPTS {
if attempt > 0 {
let backoff_ms = 50u64 << (attempt - 1);
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
}
match sqlx::query("UPDATE bonds SET state = ? WHERE id = ? AND state = ?")
.bind(BondState::Slashed.to_string())
.bind(bond.id)
.bind(BondState::PendingPayout.to_string())
.execute(pool)
.await
{
Ok(result) => {
if result.rows_affected() == 1 {
info!(
bond_id = %bond.id,
order_id = %bond.order_id,
amount_sats = bond.amount_sats,
counterparty_share_sats = counterparty_share,
"bond payout: send_payment succeeded; bond transitioned to Slashed"
);
notify_payout_completed(pool, bond, counterparty_share).await;
} else {
info!(
bond_id = %bond.id,
order_id = %bond.order_id,
"bond payout: Slashed CAS missed (concurrent transition); treating as already-finalized"
);
}
return Ok(());
}
Err(e) => last_err = Some(e.to_string()),
}
}
let cause = last_err.unwrap_or_else(|| "<no underlying error captured>".to_string());
error!(
bond_id = %bond.id,
order_id = %bond.order_id,
counterparty_share_sats = counterparty_share,
attempts = SLASH_CAS_MAX_ATTEMPTS,
"bond payout: send_payment SUCCEEDED but state=Slashed CAS failed after retries — \
next tick will reconcile via persisted payout_payment_hash. last db error: {cause}"
);
Err(MostroInternalErr(ServiceError::DbAccessError(cause)))
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
enum PaymentFailureKind {
Terminal,
Indeterminate,
}
async fn on_send_payment_failure(
pool: &Pool<Sqlite>,
bond: &Bond,
max_retries: i64,
claim_window_seconds: i64,
kind: PaymentFailureKind,
cause: &str,
) -> Result<(), MostroError> {
let new_attempts = std::cmp::min(bond.payout_attempts + 1, max_retries);
sqlx::query("UPDATE bonds SET payout_attempts = ? WHERE id = ? AND state = ?")
.bind(new_attempts)
.bind(bond.id)
.bind(BondState::PendingPayout.to_string())
.execute(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
if new_attempts < max_retries {
warn!(
bond_id = %bond.id,
order_id = %bond.order_id,
attempts = new_attempts,
max_retries,
"bond payout: send_payment failure ({cause}); will retry on next tick"
);
return Ok(());
}
if kind == PaymentFailureKind::Indeterminate {
warn!(
bond_id = %bond.id,
order_id = %bond.order_id,
attempts = new_attempts,
"bond payout: retry budget exhausted on an indeterminate failure ({cause}); keeping the current invoice for LND reconciliation — not re-prompting (payment may still be in flight)"
);
return Ok(());
}
let now = Utc::now().timestamp();
let within_window = bond
.slashed_at
.is_some_and(|slashed_at| now - slashed_at < claim_window_seconds);
if within_window {
let result = sqlx::query(
"UPDATE bonds \
SET payout_invoice = NULL, \
payout_routing_fee_sats = NULL, \
payout_payment_hash = NULL, \
payout_attempts = 0, \
last_invoice_request_at = NULL \
WHERE id = ? AND state = ?",
)
.bind(bond.id)
.bind(BondState::PendingPayout.to_string())
.execute(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
if result.rows_affected() == 1 {
warn!(
bond_id = %bond.id,
order_id = %bond.order_id,
attempts = new_attempts,
"bond payout: send_payment exhausted retries for this invoice ({cause}); re-requesting a fresh invoice from the winner (still inside claim window, deadline unchanged)"
);
} else {
info!(
bond_id = %bond.id,
order_id = %bond.order_id,
"bond payout: re-arm CAS missed (concurrent transition); leaving row as-is"
);
}
return Ok(());
}
sqlx::query("UPDATE bonds SET state = ? WHERE id = ? AND state = ?")
.bind(BondState::Failed.to_string())
.bind(bond.id)
.bind(BondState::PendingPayout.to_string())
.execute(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
error!(
bond_id = %bond.id,
order_id = %bond.order_id,
attempts = new_attempts,
"bond payout: send_payment exhausted retries past the claim window — transitioning to Failed; node share retained, counterparty share stranded (operator review required). last error: {cause}"
);
Ok(())
}
fn resolve_recipient(
order: &Order,
bond: &Bond,
_reason: BondSlashReason,
) -> Result<Option<PublicKey>, MostroError> {
let buyer = order.buyer_pubkey.as_deref();
let seller = order.seller_pubkey.as_deref();
let recipient_str = match (buyer, seller) {
(Some(b), Some(s)) if bond.pubkey == b => Some(s),
(Some(b), Some(s)) if bond.pubkey == s => Some(b),
_ => None,
};
let pk = recipient_str
.map(PublicKey::from_str)
.transpose()
.map_err(|e| MostroInternalErr(ServiceError::UnexpectedError(e.to_string())))?;
Ok(pk)
}
fn resolve_payout_recipient(
order: &Order,
bond: &Bond,
reason: BondSlashReason,
) -> Result<Option<PublicKey>, MostroError> {
if bond.parent_bond_id.is_some() && bond.child_order_id.is_none() {
let pk = PublicKey::from_str(&bond.pubkey)
.map_err(|e| MostroInternalErr(ServiceError::UnexpectedError(e.to_string())))?;
return Ok(Some(pk));
}
resolve_recipient(order, bond, reason)
}
fn build_payout_small_order(
order: &Order,
counterparty_share: i64,
) -> Result<SmallOrder, MostroError> {
let order_kind = order.get_order_kind().map_err(MostroInternalErr)?;
Ok(SmallOrder::new(
Some(order.id),
Some(order_kind),
None,
counterparty_share,
order.fiat_code.clone(),
order.min_amount,
order.max_amount,
order.fiat_amount,
order.payment_method.clone(),
order.premium,
None,
None,
None,
None,
None,
))
}
async fn enqueue_payout_ack(
order: &Order,
action: Action,
recipient: PublicKey,
counterparty_share: i64,
) {
let small = match build_payout_small_order(order, counterparty_share) {
Ok(s) => s,
Err(e) => {
warn!(
order_id = %order.id,
"bond payout: cannot build SmallOrder for {action} ack ({e:?}); skipping notification"
);
return;
}
};
enqueue_order_msg(
None,
Some(order.id),
action,
Some(Payload::Order(small)),
recipient,
None,
)
.await;
}
async fn notify_invoice_received(
pool: &Pool<Sqlite>,
bond: &Bond,
recipient: PublicKey,
counterparty_share: i64,
) {
match Order::by_id(pool, bond.order_id).await {
Ok(Some(order)) => {
enqueue_payout_ack(
&order,
Action::BondInvoiceAccepted,
recipient,
counterparty_share,
)
.await;
}
Ok(None) => warn!(
bond_id = %bond.id,
order_id = %bond.order_id,
"bond payout: order row missing; skipping BondInvoiceAccepted ack"
),
Err(e) => warn!(
bond_id = %bond.id,
order_id = %bond.order_id,
"bond payout: order load failed ({e}); skipping BondInvoiceAccepted ack"
),
}
}
async fn notify_payout_completed(pool: &Pool<Sqlite>, bond: &Bond, counterparty_share: i64) {
let order = match Order::by_id(pool, bond.order_id).await {
Ok(Some(o)) => o,
Ok(None) => {
warn!(
bond_id = %bond.id,
order_id = %bond.order_id,
"bond payout: order row missing; skipping BondPayoutCompleted notification"
);
return;
}
Err(e) => {
warn!(
bond_id = %bond.id,
order_id = %bond.order_id,
"bond payout: order load failed ({e}); skipping BondPayoutCompleted notification"
);
return;
}
};
let reason = match bond
.slashed_reason
.as_deref()
.and_then(|s| BondSlashReason::from_str(s).ok())
{
Some(r) => r,
None => {
warn!(
bond_id = %bond.id,
"bond payout: unparseable slashed_reason {:?}; skipping BondPayoutCompleted notification",
bond.slashed_reason
);
return;
}
};
match resolve_payout_recipient(&order, bond, reason) {
Ok(Some(recipient)) => {
enqueue_payout_ack(
&order,
Action::BondPayoutCompleted,
recipient,
counterparty_share,
)
.await;
}
Ok(None) => warn!(
bond_id = %bond.id,
order_id = %bond.order_id,
"bond payout: cannot resolve recipient; skipping BondPayoutCompleted notification"
),
Err(e) => warn!(
bond_id = %bond.id,
order_id = %bond.order_id,
"bond payout: recipient resolution failed ({e:?}); skipping BondPayoutCompleted notification"
),
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum InvoiceApplyOutcome {
Persisted,
Resurrected,
Rejected,
}
pub async fn add_bond_invoice_action(
ctx: &AppContext,
msg: Message,
event: &UnwrappedMessage,
_my_keys: &Keys,
) -> Result<(), MostroError> {
let pool = ctx.pool();
let kind = msg.get_inner_message_kind();
let order_id = kind.id.ok_or(MostroCantDo(CantDoReason::InvalidPayload))?;
let payment_request = match kind.get_payment_request() {
Some(pr) if !pr.is_empty() => pr,
_ => return Err(MostroCantDo(CantDoReason::InvalidInvoice)),
};
let sender = event.sender;
let invoice_share_sats = decode_invoice(&payment_request)
.ok()
.and_then(|inv| inv.amount_milli_satoshis())
.map(|msat| (msat / 1000) as i64);
let bond = find_recoverable_bond_for_recipient(
pool,
order_id,
&sender.to_string(),
invoice_share_sats,
)
.await?;
let bond = match bond {
Some(b) => b,
None => {
return Err(MostroCantDo(CantDoReason::NotAllowedByStatus));
}
};
let counterparty_share = counterparty_share_sats(&bond)?;
if counterparty_share <= 0 {
return Err(MostroCantDo(CantDoReason::NotAllowedByStatus));
}
if is_valid_invoice(
payment_request.clone(),
Some(counterparty_share as u64),
Some(0),
)
.await
.is_err()
{
return Err(MostroCantDo(CantDoReason::InvalidInvoice));
}
let cfg = Settings::get_bond();
let claim_window_seconds = cfg
.map(|c| c.payout_claim_window_days as i64 * 86_400)
.unwrap_or(15 * 86_400);
let now = Utc::now().timestamp();
match apply_payout_invoice(pool, &bond, &payment_request, now, claim_window_seconds).await? {
InvoiceApplyOutcome::Persisted => {
info!(
bond_id = %bond.id,
order_id = %bond.order_id,
sender = %sender,
"bond payout: invoice accepted; awaiting scheduler tick for payout"
);
}
InvoiceApplyOutcome::Resurrected => {
info!(
bond_id = %bond.id,
order_id = %bond.order_id,
sender = %sender,
"bond payout: Failed -> PendingPayout (user submitted fresh invoice within claim window); payout_attempts reset, awaiting scheduler tick for payout"
);
}
InvoiceApplyOutcome::Rejected => {
return Err(MostroCantDo(CantDoReason::NotAllowedByStatus));
}
}
notify_invoice_received(pool, &bond, sender, counterparty_share).await;
Ok(())
}
async fn apply_payout_invoice(
pool: &Pool<Sqlite>,
bond: &Bond,
invoice: &str,
now: i64,
claim_window_seconds: i64,
) -> Result<InvoiceApplyOutcome, MostroError> {
let state = match BondState::from_str(&bond.state) {
Ok(s) => s,
Err(_) => return Ok(InvoiceApplyOutcome::Rejected),
};
match state {
BondState::PendingPayout => {
let result = sqlx::query(
"UPDATE bonds \
SET payout_invoice = ?, invoice_request_attempts = 0 \
WHERE id = ? AND state = ? AND payout_invoice IS NULL",
)
.bind(invoice)
.bind(bond.id)
.bind(BondState::PendingPayout.to_string())
.execute(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
if result.rows_affected() == 0 {
Ok(InvoiceApplyOutcome::Rejected)
} else {
Ok(InvoiceApplyOutcome::Persisted)
}
}
BondState::Failed => {
let slashed_at = match bond.slashed_at {
Some(t) => t,
None => return Ok(InvoiceApplyOutcome::Rejected),
};
if now.saturating_sub(slashed_at) >= claim_window_seconds {
return Ok(InvoiceApplyOutcome::Rejected);
}
let result = sqlx::query(
"UPDATE bonds \
SET state = ?, \
payout_invoice = ?, \
payout_attempts = 0, \
invoice_request_attempts = 0, \
payout_payment_hash = NULL \
WHERE id = ? AND state = ?",
)
.bind(BondState::PendingPayout.to_string())
.bind(invoice)
.bind(bond.id)
.bind(BondState::Failed.to_string())
.execute(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
if result.rows_affected() == 0 {
Ok(InvoiceApplyOutcome::Rejected)
} else {
Ok(InvoiceApplyOutcome::Resurrected)
}
}
_ => Ok(InvoiceApplyOutcome::Rejected),
}
}
async fn find_recoverable_bond_for_recipient(
pool: &Pool<Sqlite>,
order_id: Uuid,
sender_pubkey: &str,
expected_share_sats: Option<i64>,
) -> Result<Option<Bond>, MostroError> {
let bonds: Vec<Bond> = sqlx::query_as::<_, Bond>(
"SELECT * FROM bonds \
WHERE order_id = ? AND (state = ? OR state = ?) \
ORDER BY slashed_at DESC",
)
.bind(order_id)
.bind(BondState::PendingPayout.to_string())
.bind(BondState::Failed.to_string())
.fetch_all(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
if bonds.is_empty() {
return Ok(None);
}
let order = match Order::by_id(pool, order_id)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?
{
Some(o) => o,
None => return Ok(None),
};
let mut matches: Vec<Bond> = Vec::new();
for bond in bonds {
let reason = match bond
.slashed_reason
.as_deref()
.and_then(|r| BondSlashReason::from_str(r).ok())
{
Some(r) => r,
None => continue,
};
if let Some(recipient) = resolve_payout_recipient(&order, &bond, reason)? {
if recipient.to_string() == sender_pubkey {
matches.push(bond);
}
}
}
if matches.len() > 1 {
if let Some(target) = expected_share_sats {
if let Some(exact) = matches.iter().find(|b| {
counterparty_share_sats(b)
.map(|s| s == target)
.unwrap_or(false)
}) {
return Ok(Some(exact.clone()));
}
}
}
Ok(matches.into_iter().next())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::app::bond::db::create_bond;
use crate::app::bond::types::BondRole;
use mostro_core::order::{Kind, Status};
use sqlx::sqlite::SqlitePoolOptions;
async fn setup_pool() -> Pool<Sqlite> {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect(":memory:")
.await
.expect("open in-memory sqlite");
sqlx::query(include_str!(
"../../../migrations/20221222153301_orders.sql"
))
.execute(&pool)
.await
.expect("orders migration");
for stmt in include_str!("../../../migrations/20251126120000_dev_fee.sql")
.split(';')
.map(str::trim)
.filter(|s| !s.is_empty() && !s.lines().all(|l| l.trim_start().starts_with("--")))
{
sqlx::query(stmt)
.execute(&pool)
.await
.expect("dev_fee migration");
}
sqlx::query(include_str!(
"../../../migrations/20260423120000_anti_abuse_bond.sql"
))
.execute(&pool)
.await
.expect("bonds migration");
sqlx::query(include_str!(
"../../../migrations/20260518120000_bond_payout_payment_hash.sql"
))
.execute(&pool)
.await
.expect("bond_payout_payment_hash migration");
for stmt in include_str!("../../../migrations/20260530120000_cashu_escrow_fields.sql")
.split(';')
.map(str::trim)
.filter(|s| !s.is_empty() && !s.lines().all(|l| l.trim_start().starts_with("--")))
{
sqlx::query(stmt)
.execute(&pool)
.await
.expect("cashu escrow migration");
}
pool
}
fn taker_pk() -> &'static str {
"1111111111111111111111111111111111111111111111111111111111111111"
}
fn maker_pk() -> &'static str {
"2222222222222222222222222222222222222222222222222222222222222222"
}
async fn insert_order(pool: &Pool<Sqlite>, id: Uuid, seller: &str, buyer: &str) {
sqlx::query(
r#"INSERT INTO orders (
id, kind, event_id, status, premium, payment_method,
amount, fiat_code, fiat_amount, created_at, expires_at,
seller_pubkey, buyer_pubkey
) VALUES (?, 'sell', ?, ?, 0, 'cash', 100000, 'USD', 10, 0, 0, ?, ?)"#,
)
.bind(id)
.bind(id.simple().to_string())
.bind(Status::Dispute.to_string())
.bind(seller)
.bind(buyer)
.execute(pool)
.await
.expect("insert order");
}
fn pending_payout_bond(
order_id: Uuid,
pubkey: &str,
amount: i64,
node_share: i64,
slashed_at: i64,
invoice: Option<&str>,
last_request: Option<i64>,
) -> Bond {
let mut b = Bond::new_requested(order_id, pubkey.to_string(), BondRole::Taker, amount);
b.state = BondState::PendingPayout.to_string();
b.node_share_sats = Some(node_share);
b.slashed_reason = Some(BondSlashReason::LostDispute.to_string());
b.slashed_at = Some(slashed_at);
b.payout_invoice = invoice.map(|s| s.to_string());
b.last_invoice_request_at = last_request;
b
}
#[test]
fn resolve_recipient_sell_order_taker_buyer_slashed() {
let order = Order {
kind: Kind::Sell.to_string(),
seller_pubkey: Some(maker_pk().to_string()),
buyer_pubkey: Some(taker_pk().to_string()),
..Order::default()
};
let bond = pending_payout_bond(Uuid::new_v4(), taker_pk(), 10_000, 5_000, 0, None, None);
let r = resolve_recipient(&order, &bond, BondSlashReason::LostDispute).unwrap();
assert_eq!(r.unwrap().to_string(), maker_pk());
}
#[test]
fn resolve_recipient_buy_order_taker_seller_slashed() {
let order = Order {
kind: Kind::Buy.to_string(),
buyer_pubkey: Some(maker_pk().to_string()),
seller_pubkey: Some(taker_pk().to_string()),
..Order::default()
};
let bond = pending_payout_bond(Uuid::new_v4(), taker_pk(), 10_000, 5_000, 0, None, None);
let r = resolve_recipient(&order, &bond, BondSlashReason::LostDispute).unwrap();
assert_eq!(r.unwrap().to_string(), maker_pk());
}
#[test]
fn resolve_recipient_missing_buyer_returns_none() {
let order = Order {
kind: Kind::Sell.to_string(),
seller_pubkey: Some(maker_pk().to_string()),
buyer_pubkey: None,
..Order::default()
};
let bond = pending_payout_bond(Uuid::new_v4(), taker_pk(), 10_000, 5_000, 0, None, None);
let r = resolve_recipient(&order, &bond, BondSlashReason::LostDispute).unwrap();
assert!(r.is_none());
}
#[test]
fn resolve_payout_recipient_refund_row_pays_the_maker() {
let order = Order {
kind: Kind::Sell.to_string(),
seller_pubkey: Some(maker_pk().to_string()),
buyer_pubkey: Some(taker_pk().to_string()),
..Order::default()
};
let mut refund = pending_payout_bond(Uuid::new_v4(), maker_pk(), 600, 0, 0, None, None);
refund.parent_bond_id = Some(Uuid::new_v4());
refund.child_order_id = None;
let r = resolve_payout_recipient(&order, &refund, BondSlashReason::LostDispute).unwrap();
assert_eq!(
r.unwrap().to_string(),
maker_pk(),
"the unslashed-remainder refund is paid back to the maker"
);
}
#[test]
fn resolve_payout_recipient_slice_slash_pays_the_winner() {
let order = Order {
kind: Kind::Sell.to_string(),
seller_pubkey: Some(maker_pk().to_string()),
buyer_pubkey: Some(taker_pk().to_string()),
..Order::default()
};
let mut child = pending_payout_bond(Uuid::new_v4(), maker_pk(), 400, 200, 0, None, None);
child.parent_bond_id = Some(Uuid::new_v4());
child.child_order_id = Some(Uuid::new_v4());
let r = resolve_payout_recipient(&order, &child, BondSlashReason::LostDispute).unwrap();
assert_eq!(
r.unwrap().to_string(),
taker_pk(),
"a slice slash pays the non-maker winner"
);
}
#[tokio::test]
async fn child_payout_blocked_while_parent_locked() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let mut parent =
Bond::new_requested(order_id, maker_pk().to_string(), BondRole::Maker, 1_000);
parent.state = BondState::Locked.to_string();
let parent = create_bond(&pool, parent).await.unwrap();
let mut child = pending_payout_bond(order_id, maker_pk(), 400, 0, 0, None, None);
child.parent_bond_id = Some(parent.id);
child.child_order_id = Some(order_id);
let child = create_bond(&pool, child).await.unwrap();
assert!(child_payout_blocked_by_locked_parent(&pool, &child)
.await
.unwrap());
sqlx::query("UPDATE bonds SET state = ? WHERE id = ?")
.bind(BondState::Slashed.to_string())
.bind(parent.id)
.execute(&pool)
.await
.unwrap();
assert!(!child_payout_blocked_by_locked_parent(&pool, &child)
.await
.unwrap());
let normal = pending_payout_bond(order_id, taker_pk(), 1_000, 0, 0, None, None);
assert!(!child_payout_blocked_by_locked_parent(&pool, &normal)
.await
.unwrap());
}
#[tokio::test]
async fn request_payout_invoice_respects_cadence_window() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let now = Utc::now().timestamp();
let bond = pending_payout_bond(
order_id,
taker_pk(),
10_000,
5_000,
now,
None,
Some(now - 10), );
let bond = create_bond(&pool, bond).await.unwrap();
request_payout_invoice(&pool, &bond, 300).await.unwrap();
let row: (i64, Option<i64>) = sqlx::query_as(
"SELECT invoice_request_attempts, last_invoice_request_at FROM bonds WHERE id = ?",
)
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, 0);
assert_eq!(row.1, Some(now - 10));
}
async fn count_add_bond_invoice_msgs(order_id: Uuid) -> usize {
use crate::config::MESSAGE_QUEUES;
MESSAGE_QUEUES
.queue_order_msg
.read()
.await
.iter()
.filter(|(m, _)| {
let kind = m.get_inner_message_kind();
kind.id == Some(order_id) && kind.action == Action::AddBondInvoice
})
.count()
}
#[tokio::test]
async fn request_payout_invoice_persists_before_enqueue_happy_path() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let now = Utc::now().timestamp();
let bond = pending_payout_bond(order_id, taker_pk(), 10_000, 5_000, now, None, None);
let bond = create_bond(&pool, bond).await.unwrap();
let before = count_add_bond_invoice_msgs(order_id).await;
request_payout_invoice(&pool, &bond, 300).await.unwrap();
let after = count_add_bond_invoice_msgs(order_id).await;
let row: (i64, Option<i64>) = sqlx::query_as(
"SELECT invoice_request_attempts, last_invoice_request_at FROM bonds WHERE id = ?",
)
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, 1);
assert!(row.1.is_some_and(|t| t >= now));
assert_eq!(after - before, 1);
}
#[tokio::test]
async fn request_payout_invoice_skips_enqueue_when_state_moved_off_pending_payout() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let now = Utc::now().timestamp();
let bond = pending_payout_bond(order_id, taker_pk(), 10_000, 5_000, now, None, None);
let bond = create_bond(&pool, bond).await.unwrap();
sqlx::query("UPDATE bonds SET state = ? WHERE id = ?")
.bind(BondState::Forfeited.to_string())
.bind(bond.id)
.execute(&pool)
.await
.unwrap();
let before = count_add_bond_invoice_msgs(order_id).await;
request_payout_invoice(&pool, &bond, 300).await.unwrap();
let after = count_add_bond_invoice_msgs(order_id).await;
let row: (i64, Option<i64>) = sqlx::query_as(
"SELECT invoice_request_attempts, last_invoice_request_at FROM bonds WHERE id = ?",
)
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, 0);
assert_eq!(row.1, None);
assert_eq!(after, before);
}
#[tokio::test]
async fn forfeit_bond_transitions_pending_to_forfeited() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let now = Utc::now().timestamp();
let bond = pending_payout_bond(order_id, taker_pk(), 10_000, 5_000, now, None, None);
let bond = create_bond(&pool, bond).await.unwrap();
forfeit_bond(&pool, &bond).await.unwrap();
let after: (String,) = sqlx::query_as("SELECT state FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(after.0, BondState::Forfeited.to_string());
}
#[tokio::test]
async fn forfeit_bond_skips_when_invoice_landed_concurrently() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let now = Utc::now().timestamp();
let bond = pending_payout_bond(
order_id,
taker_pk(),
10_000,
5_000,
now,
Some("lnbc1pCONCURRENT"),
None,
);
let bond = create_bond(&pool, bond).await.unwrap();
forfeit_bond(&pool, &bond).await.unwrap();
let after: (String,) = sqlx::query_as("SELECT state FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(after.0, BondState::PendingPayout.to_string());
}
#[tokio::test]
async fn send_payment_failure_past_window_flips_to_failed() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let slashed_at = Utc::now().timestamp() - (CLAIM_WINDOW_SECONDS + 86_400);
let bond = pending_payout_bond(
order_id,
taker_pk(),
10_000,
5_000,
slashed_at,
Some("lnbc1pSOMETHING"),
None,
);
let bond = create_bond(&pool, bond).await.unwrap();
on_send_payment_failure(
&pool,
&bond,
3,
CLAIM_WINDOW_SECONDS,
PaymentFailureKind::Terminal,
"transient",
)
.await
.unwrap();
let bond_after_1: Bond = sqlx::query_as("SELECT * FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(bond_after_1.payout_attempts, 1);
assert_eq!(bond_after_1.state, BondState::PendingPayout.to_string());
on_send_payment_failure(
&pool,
&bond_after_1,
3,
CLAIM_WINDOW_SECONDS,
PaymentFailureKind::Terminal,
"transient",
)
.await
.unwrap();
let bond_after_2: Bond = sqlx::query_as("SELECT * FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(bond_after_2.payout_attempts, 2);
on_send_payment_failure(
&pool,
&bond_after_2,
3,
CLAIM_WINDOW_SECONDS,
PaymentFailureKind::Terminal,
"transient",
)
.await
.unwrap();
let bond_after_3: Bond = sqlx::query_as("SELECT * FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(bond_after_3.payout_attempts, 3);
assert_eq!(bond_after_3.state, BondState::Failed.to_string());
}
#[tokio::test]
async fn send_payment_failure_within_window_reprompts_winner() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let slashed_at = Utc::now().timestamp();
let mut bond = pending_payout_bond(
order_id,
taker_pk(),
10_000,
5_000,
slashed_at,
Some("lnbc1pSTALE"),
Some(slashed_at), );
bond.payout_routing_fee_sats = Some(50);
bond.payout_payment_hash = Some("deadbeef".to_string());
bond.invoice_request_attempts = 1;
let bond = create_bond(&pool, bond).await.unwrap();
on_send_payment_failure(
&pool,
&bond,
3,
CLAIM_WINDOW_SECONDS,
PaymentFailureKind::Terminal,
"transient",
)
.await
.unwrap();
let after_1: Bond = sqlx::query_as("SELECT * FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(after_1.payout_attempts, 1);
assert_eq!(after_1.state, BondState::PendingPayout.to_string());
assert_eq!(after_1.payout_invoice.as_deref(), Some("lnbc1pSTALE"));
on_send_payment_failure(
&pool,
&after_1,
3,
CLAIM_WINDOW_SECONDS,
PaymentFailureKind::Terminal,
"transient",
)
.await
.unwrap();
let after_2: Bond = sqlx::query_as("SELECT * FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(after_2.payout_attempts, 2);
on_send_payment_failure(
&pool,
&after_2,
3,
CLAIM_WINDOW_SECONDS,
PaymentFailureKind::Terminal,
"transient",
)
.await
.unwrap();
let after_3: Bond = sqlx::query_as("SELECT * FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(after_3.state, BondState::PendingPayout.to_string());
assert!(after_3.payout_invoice.is_none());
assert!(after_3.payout_routing_fee_sats.is_none());
assert!(after_3.payout_payment_hash.is_none());
assert!(after_3.last_invoice_request_at.is_none());
assert_eq!(after_3.payout_attempts, 0);
assert_eq!(after_3.slashed_at, Some(slashed_at));
assert_eq!(after_3.invoice_request_attempts, 1);
}
#[tokio::test]
async fn send_payment_indeterminate_failure_keeps_invoice() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let slashed_at = Utc::now().timestamp();
let mut bond = pending_payout_bond(
order_id,
taker_pk(),
10_000,
5_000,
slashed_at,
Some("lnbc1pINFLIGHT"),
Some(slashed_at),
);
bond.payout_routing_fee_sats = Some(50);
bond.payout_payment_hash = Some("cafebabe".to_string());
let bond = create_bond(&pool, bond).await.unwrap();
let mut current = bond;
for _ in 0..3 {
on_send_payment_failure(
&pool,
¤t,
3,
CLAIM_WINDOW_SECONDS,
PaymentFailureKind::Indeterminate,
"stream timed out",
)
.await
.unwrap();
current = sqlx::query_as::<_, Bond>("SELECT * FROM bonds WHERE id = ?")
.bind(current.id)
.fetch_one(&pool)
.await
.unwrap();
}
assert_eq!(current.state, BondState::PendingPayout.to_string());
assert_eq!(current.payout_invoice.as_deref(), Some("lnbc1pINFLIGHT"));
assert_eq!(current.payout_payment_hash.as_deref(), Some("cafebabe"));
assert_eq!(current.payout_attempts, 3);
on_send_payment_failure(
&pool,
¤t,
3,
CLAIM_WINDOW_SECONDS,
PaymentFailureKind::Indeterminate,
"stream eof",
)
.await
.unwrap();
let after: Bond = sqlx::query_as("SELECT * FROM bonds WHERE id = ?")
.bind(current.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(after.state, BondState::PendingPayout.to_string());
assert_eq!(after.payout_attempts, 3);
assert_eq!(after.payout_invoice.as_deref(), Some("lnbc1pINFLIGHT"));
assert_eq!(after.payout_payment_hash.as_deref(), Some("cafebabe"));
}
#[tokio::test]
async fn finalize_node_only_transitions_to_slashed() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let bond = pending_payout_bond(
order_id,
taker_pk(),
10_000,
10_000, Utc::now().timestamp(),
None,
None,
);
let bond = create_bond(&pool, bond).await.unwrap();
finalize_node_only(&pool, &bond).await.unwrap();
let after: (String,) = sqlx::query_as("SELECT state FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(after.0, BondState::Slashed.to_string());
}
const CLAIM_WINDOW_SECONDS: i64 = 15 * 86_400;
#[tokio::test]
async fn apply_payout_invoice_persists_on_pending_payout_null_invoice() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let now = Utc::now().timestamp();
let mut bond = pending_payout_bond(order_id, taker_pk(), 10_000, 5_000, now, None, None);
bond.invoice_request_attempts = 2;
let bond = create_bond(&pool, bond).await.unwrap();
let outcome = apply_payout_invoice(&pool, &bond, "lnbc1pNEW", now, CLAIM_WINDOW_SECONDS)
.await
.unwrap();
assert_eq!(outcome, InvoiceApplyOutcome::Persisted);
let after: Bond = sqlx::query_as("SELECT * FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(after.state, BondState::PendingPayout.to_string());
assert_eq!(after.payout_invoice.as_deref(), Some("lnbc1pNEW"));
assert_eq!(after.invoice_request_attempts, 0);
}
#[tokio::test]
async fn apply_payout_invoice_rejects_pending_payout_when_invoice_already_set() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let now = Utc::now().timestamp();
let bond = pending_payout_bond(
order_id,
taker_pk(),
10_000,
5_000,
now,
Some("lnbc1pOLD"),
None,
);
let bond = create_bond(&pool, bond).await.unwrap();
let outcome = apply_payout_invoice(&pool, &bond, "lnbc1pNEW", now, CLAIM_WINDOW_SECONDS)
.await
.unwrap();
assert_eq!(outcome, InvoiceApplyOutcome::Rejected);
let after: (String, Option<String>) =
sqlx::query_as("SELECT state, payout_invoice FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(after.0, BondState::PendingPayout.to_string());
assert_eq!(after.1.as_deref(), Some("lnbc1pOLD"));
}
#[tokio::test]
async fn apply_payout_invoice_resurrects_failed_within_window() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let now = Utc::now().timestamp();
let slashed_at = now - 86_400; let mut bond = pending_payout_bond(
order_id,
taker_pk(),
10_000,
5_000,
slashed_at,
Some("lnbc1pBAD"),
None,
);
bond.state = BondState::Failed.to_string();
bond.payout_attempts = 5;
bond.invoice_request_attempts = 3;
let bond = create_bond(&pool, bond).await.unwrap();
let outcome = apply_payout_invoice(&pool, &bond, "lnbc1pFRESH", now, CLAIM_WINDOW_SECONDS)
.await
.unwrap();
assert_eq!(outcome, InvoiceApplyOutcome::Resurrected);
let after: Bond = sqlx::query_as("SELECT * FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(after.state, BondState::PendingPayout.to_string());
assert_eq!(after.payout_invoice.as_deref(), Some("lnbc1pFRESH"));
assert_eq!(after.payout_attempts, 0);
assert_eq!(after.invoice_request_attempts, 0);
assert_eq!(after.slashed_at, Some(slashed_at));
}
#[tokio::test]
async fn apply_payout_invoice_rejects_failed_past_claim_window() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let now = Utc::now().timestamp();
let slashed_at = now - CLAIM_WINDOW_SECONDS - 1;
let mut bond = pending_payout_bond(
order_id,
taker_pk(),
10_000,
5_000,
slashed_at,
Some("lnbc1pBAD"),
None,
);
bond.state = BondState::Failed.to_string();
bond.payout_attempts = 5;
let bond = create_bond(&pool, bond).await.unwrap();
let outcome = apply_payout_invoice(&pool, &bond, "lnbc1pFRESH", now, CLAIM_WINDOW_SECONDS)
.await
.unwrap();
assert_eq!(outcome, InvoiceApplyOutcome::Rejected);
let after: (String, Option<String>, i64) =
sqlx::query_as("SELECT state, payout_invoice, payout_attempts FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(after.0, BondState::Failed.to_string());
assert_eq!(after.1.as_deref(), Some("lnbc1pBAD"));
assert_eq!(after.2, 5);
}
#[tokio::test]
async fn apply_payout_invoice_rejects_failed_at_exact_window_boundary() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let now = Utc::now().timestamp();
let slashed_at = now - CLAIM_WINDOW_SECONDS;
let mut bond = pending_payout_bond(
order_id,
taker_pk(),
10_000,
5_000,
slashed_at,
Some("lnbc1pBAD"),
None,
);
bond.state = BondState::Failed.to_string();
let bond = create_bond(&pool, bond).await.unwrap();
let outcome = apply_payout_invoice(&pool, &bond, "lnbc1pFRESH", now, CLAIM_WINDOW_SECONDS)
.await
.unwrap();
assert_eq!(outcome, InvoiceApplyOutcome::Rejected);
}
#[tokio::test]
async fn apply_payout_invoice_rejects_failed_without_slashed_at() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let now = Utc::now().timestamp();
let mut bond = pending_payout_bond(
order_id,
taker_pk(),
10_000,
5_000,
now,
Some("lnbc1pBAD"),
None,
);
bond.state = BondState::Failed.to_string();
bond.slashed_at = None;
let bond = create_bond(&pool, bond).await.unwrap();
let outcome = apply_payout_invoice(&pool, &bond, "lnbc1pFRESH", now, CLAIM_WINDOW_SECONDS)
.await
.unwrap();
assert_eq!(outcome, InvoiceApplyOutcome::Rejected);
}
#[tokio::test]
async fn apply_payout_invoice_concurrent_resurrections_only_one_wins() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let now = Utc::now().timestamp();
let mut bond = pending_payout_bond(
order_id,
taker_pk(),
10_000,
5_000,
now,
Some("lnbc1pBAD"),
None,
);
bond.state = BondState::Failed.to_string();
bond.payout_attempts = 5;
let bond = create_bond(&pool, bond).await.unwrap();
let outcome_a =
apply_payout_invoice(&pool, &bond, "lnbc1pFRESH_A", now, CLAIM_WINDOW_SECONDS)
.await
.unwrap();
assert_eq!(outcome_a, InvoiceApplyOutcome::Resurrected);
let outcome_b =
apply_payout_invoice(&pool, &bond, "lnbc1pFRESH_B", now, CLAIM_WINDOW_SECONDS)
.await
.unwrap();
assert_eq!(outcome_b, InvoiceApplyOutcome::Rejected);
let after: (String, Option<String>) =
sqlx::query_as("SELECT state, payout_invoice FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(after.0, BondState::PendingPayout.to_string());
assert_eq!(after.1.as_deref(), Some("lnbc1pFRESH_A"));
}
#[tokio::test]
async fn apply_payout_invoice_resurrects_after_re_failure() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let now = Utc::now().timestamp();
let mut bond = pending_payout_bond(
order_id,
taker_pk(),
10_000,
5_000,
now,
Some("lnbc1pA"),
None,
);
bond.state = BondState::Failed.to_string();
bond.payout_attempts = 5;
let bond = create_bond(&pool, bond).await.unwrap();
let outcome = apply_payout_invoice(&pool, &bond, "lnbc1pB", now, CLAIM_WINDOW_SECONDS)
.await
.unwrap();
assert_eq!(outcome, InvoiceApplyOutcome::Resurrected);
let bond_after_b: Bond = sqlx::query_as("SELECT * FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(bond_after_b.state, BondState::PendingPayout.to_string());
assert_eq!(bond_after_b.payout_attempts, 0);
let mut current = bond_after_b;
for _ in 0..3 {
on_send_payment_failure(
&pool,
¤t,
3,
0,
PaymentFailureKind::Terminal,
"transient",
)
.await
.unwrap();
current = sqlx::query_as::<_, Bond>("SELECT * FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
}
assert_eq!(current.state, BondState::Failed.to_string());
let outcome = apply_payout_invoice(&pool, ¤t, "lnbc1pC", now, CLAIM_WINDOW_SECONDS)
.await
.unwrap();
assert_eq!(outcome, InvoiceApplyOutcome::Resurrected);
let final_bond: Bond = sqlx::query_as("SELECT * FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(final_bond.state, BondState::PendingPayout.to_string());
assert_eq!(final_bond.payout_invoice.as_deref(), Some("lnbc1pC"));
assert_eq!(final_bond.payout_attempts, 0);
}
#[tokio::test]
async fn apply_payout_invoice_rejects_other_states() {
let pool = setup_pool().await;
let now = Utc::now().timestamp();
for state in [
BondState::Slashed,
BondState::Released,
BondState::Forfeited,
BondState::Locked,
BondState::Requested,
] {
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let mut bond = pending_payout_bond(
order_id,
taker_pk(),
10_000,
5_000,
now,
Some("lnbc1p"),
None,
);
bond.state = state.to_string();
let bond = create_bond(&pool, bond).await.unwrap();
let outcome =
apply_payout_invoice(&pool, &bond, "lnbc1pFRESH", now, CLAIM_WINDOW_SECONDS)
.await
.unwrap();
assert_eq!(outcome, InvoiceApplyOutcome::Rejected, "state {state}");
let after: (String,) = sqlx::query_as("SELECT state FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(after.0, state.to_string());
}
}
#[tokio::test]
async fn apply_payout_invoice_resurrection_clears_payout_payment_hash() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let now = Utc::now().timestamp();
let mut bond = pending_payout_bond(
order_id,
taker_pk(),
10_000,
5_000,
now,
Some("lnbc1pOLD"),
None,
);
bond.state = BondState::Failed.to_string();
bond.payout_attempts = 5;
bond.payout_payment_hash = Some("a".repeat(64));
let bond = create_bond(&pool, bond).await.unwrap();
let outcome = apply_payout_invoice(&pool, &bond, "lnbc1pFRESH", now, CLAIM_WINDOW_SECONDS)
.await
.unwrap();
assert_eq!(outcome, InvoiceApplyOutcome::Resurrected);
let after: Bond = sqlx::query_as("SELECT * FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(after.state, BondState::PendingPayout.to_string());
assert_eq!(after.payout_invoice.as_deref(), Some("lnbc1pFRESH"));
assert!(
after.payout_payment_hash.is_none(),
"resurrection must clear stale payout_payment_hash, got {:?}",
after.payout_payment_hash
);
}
#[tokio::test]
async fn apply_payout_invoice_persists_leaves_payout_payment_hash_untouched() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let now = Utc::now().timestamp();
let bond = pending_payout_bond(order_id, taker_pk(), 10_000, 5_000, now, None, None);
let bond = create_bond(&pool, bond).await.unwrap();
let outcome = apply_payout_invoice(&pool, &bond, "lnbc1pFRESH", now, CLAIM_WINDOW_SECONDS)
.await
.unwrap();
assert_eq!(outcome, InvoiceApplyOutcome::Persisted);
let after: Bond = sqlx::query_as("SELECT * FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert!(after.payout_payment_hash.is_none());
}
#[tokio::test]
async fn slash_after_success_transitions_pending_to_slashed() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let bond = pending_payout_bond(
order_id,
taker_pk(),
10_000,
5_000,
Utc::now().timestamp(),
Some("lnbc1pPAID"),
None,
);
let bond = create_bond(&pool, bond).await.unwrap();
slash_after_success(&pool, &bond, 5_000).await.unwrap();
let after: (String,) = sqlx::query_as("SELECT state FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(after.0, BondState::Slashed.to_string());
}
#[tokio::test]
async fn slash_after_success_is_idempotent_when_already_slashed() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let mut bond = pending_payout_bond(
order_id,
taker_pk(),
10_000,
5_000,
Utc::now().timestamp(),
Some("lnbc1pPAID"),
None,
);
bond.state = BondState::Slashed.to_string();
let bond = create_bond(&pool, bond).await.unwrap();
let mut snapshot = bond.clone();
snapshot.state = BondState::PendingPayout.to_string();
slash_after_success(&pool, &snapshot, 5_000).await.unwrap();
let after: (String,) = sqlx::query_as("SELECT state FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(after.0, BondState::Slashed.to_string());
}
async fn ack_recipients(order_id: Uuid, action: Action) -> Vec<String> {
use crate::config::MESSAGE_QUEUES;
MESSAGE_QUEUES
.queue_order_msg
.read()
.await
.iter()
.filter(|(m, _)| {
let k = m.get_inner_message_kind();
k.id == Some(order_id) && k.action == action
})
.map(|(_, pk)| pk.to_string())
.collect()
}
#[tokio::test]
async fn notify_invoice_received_acks_submitter() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let now = Utc::now().timestamp();
let bond = pending_payout_bond(order_id, taker_pk(), 10_000, 5_000, now, None, None);
let bond = create_bond(&pool, bond).await.unwrap();
let recipient = PublicKey::from_str(maker_pk()).unwrap();
let before = ack_recipients(order_id, Action::BondInvoiceAccepted)
.await
.len();
notify_invoice_received(&pool, &bond, recipient, 5_000).await;
let after = ack_recipients(order_id, Action::BondInvoiceAccepted).await;
assert_eq!(after.len() - before, 1);
assert!(after.contains(&maker_pk().to_string()));
}
#[tokio::test]
async fn notify_payout_completed_acks_counterparty() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let now = Utc::now().timestamp();
let bond = pending_payout_bond(order_id, taker_pk(), 10_000, 5_000, now, None, None);
let bond = create_bond(&pool, bond).await.unwrap();
let before = ack_recipients(order_id, Action::BondPayoutCompleted)
.await
.len();
notify_payout_completed(&pool, &bond, 5_000).await;
let after = ack_recipients(order_id, Action::BondPayoutCompleted).await;
assert_eq!(after.len() - before, 1);
assert!(after.contains(&maker_pk().to_string()));
}
#[tokio::test]
async fn slash_after_success_notifies_winner() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let now = Utc::now().timestamp();
let bond = pending_payout_bond(
order_id,
taker_pk(),
10_000,
5_000,
now,
Some("lnbc1pPAID"),
None,
);
let bond = create_bond(&pool, bond).await.unwrap();
let before = ack_recipients(order_id, Action::BondPayoutCompleted)
.await
.len();
slash_after_success(&pool, &bond, 5_000).await.unwrap();
let after = ack_recipients(order_id, Action::BondPayoutCompleted).await;
let state: (String,) = sqlx::query_as("SELECT state FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(state.0, BondState::Slashed.to_string());
assert_eq!(after.len() - before, 1);
assert!(after.contains(&maker_pk().to_string()));
}
#[tokio::test]
async fn finalize_node_only_does_not_notify_winner() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id, maker_pk(), taker_pk()).await;
let now = Utc::now().timestamp();
let bond = pending_payout_bond(order_id, taker_pk(), 10_000, 10_000, now, None, None);
let bond = create_bond(&pool, bond).await.unwrap();
let before = ack_recipients(order_id, Action::BondPayoutCompleted)
.await
.len();
finalize_node_only(&pool, &bond).await.unwrap();
let after = ack_recipients(order_id, Action::BondPayoutCompleted).await;
let state: (String,) = sqlx::query_as("SELECT state FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(state.0, BondState::Slashed.to_string());
assert_eq!(after.len(), before); }
}