use std::collections::HashSet;
use std::future::Future;
use std::pin::Pin;
use chrono::Utc;
use mostro_core::error::{
CantDoReason,
MostroError::{self, MostroCantDo},
};
use mostro_core::message::{BondResolution, Message, Payload};
use mostro_core::order::Order;
use sqlx::{Pool, Sqlite};
use tracing::{info, warn};
use uuid::Uuid;
use super::db::find_active_bonds_for_order;
use super::flow::release_bond;
use super::math::compute_node_share;
use super::model::Bond;
use super::types::{BondSlashReason, BondState};
use crate::config::settings::Settings;
use crate::lightning::LndConnector;
pub trait SettleLightning {
fn settle_hold_invoice<'a>(
&'a mut self,
preimage: &'a str,
) -> Pin<Box<dyn Future<Output = Result<(), MostroError>> + Send + 'a>>;
}
impl SettleLightning for LndConnector {
fn settle_hold_invoice<'a>(
&'a mut self,
preimage: &'a str,
) -> Pin<Box<dyn Future<Output = Result<(), MostroError>> + Send + 'a>> {
Box::pin(async move {
LndConnector::settle_hold_invoice(self, preimage)
.await
.map(|_| ())
})
}
}
pub(super) fn is_already_settled_error(err: &MostroError) -> bool {
let s = err.to_string().to_lowercase();
s.contains("already settled")
|| s.contains("invoice already settled")
|| s.contains("code=alreadyexists")
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Side {
Seller,
Buyer,
}
pub fn extract_bond_resolution(msg: &Message) -> BondResolution {
match &msg.get_inner_message_kind().payload {
Some(Payload::BondResolution(br)) => br.clone(),
_ => BondResolution {
slash_seller: false,
slash_buyer: false,
},
}
}
pub async fn validate_bond_resolution(
pool: &Pool<Sqlite>,
order: &Order,
resolution: &BondResolution,
) -> Result<(), MostroError> {
if !resolution.slash_seller && !resolution.slash_buyer {
return Ok(());
}
let bonds = find_active_bonds_for_order(pool, order.id).await?;
if resolution.slash_seller && resolve_locked_bond(order, &bonds, Side::Seller).is_none() {
return Err(MostroCantDo(CantDoReason::InvalidPayload));
}
if resolution.slash_buyer && resolve_locked_bond(order, &bonds, Side::Buyer).is_none() {
return Err(MostroCantDo(CantDoReason::InvalidPayload));
}
Ok(())
}
pub async fn apply_bond_resolution<L: SettleLightning + Send>(
pool: &Pool<Sqlite>,
ln_client: &mut L,
order: &Order,
resolution: &BondResolution,
reason: BondSlashReason,
) -> Result<(), MostroError> {
let bonds = find_active_bonds_for_order(pool, order.id).await?;
if bonds.is_empty() {
return Ok(());
}
let mut slashed_ids: HashSet<Uuid> = HashSet::new();
if resolution.slash_seller {
if let Some(bond) = resolve_locked_bond(order, &bonds, Side::Seller) {
slashed_ids.insert(bond.id);
}
}
if resolution.slash_buyer {
if let Some(bond) = resolve_locked_bond(order, &bonds, Side::Buyer) {
slashed_ids.insert(bond.id);
}
}
let node_share_pct = Settings::get_bond().map_or(0.0, |c| c.slash_node_share_pct);
for bond in bonds.iter() {
if slashed_ids.contains(&bond.id) {
slash_one(pool, ln_client, bond, reason, node_share_pct).await;
} else {
if let Err(e) = release_bond(pool, bond).await {
warn!(
bond_id = %bond.id,
order_id = %order.id,
"apply_bond_resolution: release_bond failed: {}", e
);
}
}
}
Ok(())
}
async fn slash_one<L: SettleLightning + Send>(
pool: &Pool<Sqlite>,
ln_client: &mut L,
bond: &Bond,
reason: BondSlashReason,
node_share_pct: f64,
) {
let preimage = match bond.preimage.as_deref() {
Some(p) => p,
None => {
warn!(
bond_id = %bond.id,
order_id = %bond.order_id,
"slash: bond has no preimage — cannot settle HTLC; leaving Locked for operator review"
);
return;
}
};
if let Err(e) = ln_client.settle_hold_invoice(preimage).await {
if is_already_settled_error(&e) {
info!(
bond_id = %bond.id,
order_id = %bond.order_id,
"slash: HTLC already settled (idempotent retry); proceeding to CAS"
);
} else {
warn!(
bond_id = %bond.id,
order_id = %bond.order_id,
"slash: settle_hold_invoice failed: {e} — leaving bond Locked for admin retry"
);
return;
}
}
let node_share_sats = compute_node_share(bond.amount_sats, node_share_pct);
let now = Utc::now().timestamp();
let result = sqlx::query(
"UPDATE bonds \
SET state = ?, slashed_reason = ?, slashed_at = ?, node_share_sats = ? \
WHERE id = ? AND state = ?",
)
.bind(BondState::PendingPayout.to_string())
.bind(reason.to_string())
.bind(now)
.bind(node_share_sats)
.bind(bond.id)
.bind(BondState::Locked.to_string())
.execute(pool)
.await;
match result {
Ok(r) if r.rows_affected() == 1 => {
info!(
bond_id = %bond.id,
order_id = %bond.order_id,
reason = %reason,
node_share_sats,
counterparty_share_sats = bond.amount_sats - node_share_sats,
"Bond HTLC settled and row transitioned to PendingPayout"
);
}
Ok(_) => {
warn!(
bond_id = %bond.id,
order_id = %bond.order_id,
current_state = %bond.state,
"slash CAS no-op (bond state changed concurrently); HTLC was settled"
);
}
Err(e) => {
warn!(
bond_id = %bond.id,
order_id = %bond.order_id,
"slash CAS DB error: {} (HTLC was settled)", e
);
}
}
}
fn resolve_locked_bond<'a>(order: &Order, bonds: &'a [Bond], side: Side) -> Option<&'a Bond> {
let target_pubkey = match side {
Side::Seller => order.seller_pubkey.as_deref()?,
Side::Buyer => order.buyer_pubkey.as_deref()?,
};
let locked = BondState::Locked.to_string();
bonds
.iter()
.find(|b| b.pubkey == target_pubkey && b.state == locked)
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use mostro_core::error::{MostroError, ServiceError};
use mostro_core::message::{Action, Message, Payload};
use mostro_core::order::{Kind, Order, Status};
use sqlx::sqlite::SqlitePoolOptions;
use sqlx::{Pool, Sqlite};
use super::*;
use crate::app::bond::model::Bond;
use crate::app::bond::types::{BondRole, BondSlashReason, BondState};
#[derive(Default)]
struct StubSettle {
calls: Mutex<Vec<String>>,
fail_with: Mutex<Option<String>>,
}
impl StubSettle {
fn new() -> Arc<Self> {
Arc::new(Self::default())
}
fn calls(&self) -> Vec<String> {
self.calls.lock().unwrap().clone()
}
fn fail_next_with(&self, msg: &str) {
*self.fail_with.lock().unwrap() = Some(msg.to_string());
}
}
impl SettleLightning for Arc<StubSettle> {
fn settle_hold_invoice<'a>(
&'a mut self,
preimage: &'a str,
) -> Pin<Box<dyn Future<Output = Result<(), MostroError>> + Send + 'a>> {
Box::pin(async move {
self.calls.lock().unwrap().push(preimage.to_string());
if let Some(msg) = self.fail_with.lock().unwrap().take() {
return Err(MostroError::MostroInternalErr(ServiceError::LnNodeError(
msg,
)));
}
Ok(())
})
}
}
async fn setup_pool() -> Pool<Sqlite> {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect(":memory:")
.await
.expect("open in-memory sqlite");
sqlx::query(include_str!(
"../../../migrations/20221222153301_orders.sql"
))
.execute(&pool)
.await
.expect("orders migration");
for stmt in include_str!("../../../migrations/20251126120000_dev_fee.sql")
.split(';')
.map(str::trim)
.filter(|s| !s.is_empty() && !s.lines().all(|l| l.trim_start().starts_with("--")))
{
sqlx::query(stmt)
.execute(&pool)
.await
.expect("dev_fee migration");
}
sqlx::query(include_str!(
"../../../migrations/20260423120000_anti_abuse_bond.sql"
))
.execute(&pool)
.await
.expect("bonds migration");
sqlx::query(include_str!(
"../../../migrations/20260518120000_bond_payout_payment_hash.sql"
))
.execute(&pool)
.await
.expect("bond_payout_payment_hash migration");
pool
}
async fn insert_order_row(pool: &Pool<Sqlite>, order: &Order) {
sqlx::query(
r#"INSERT INTO orders (
id, kind, event_id, status, premium, payment_method,
amount, fiat_code, fiat_amount, created_at, expires_at,
seller_pubkey, buyer_pubkey
) VALUES (?, ?, ?, ?, 0, ?, ?, ?, ?, ?, ?, ?, ?)"#,
)
.bind(order.id)
.bind(&order.kind)
.bind(order.id.simple().to_string())
.bind(&order.status)
.bind(&order.payment_method)
.bind(order.amount)
.bind(&order.fiat_code)
.bind(order.fiat_amount)
.bind(order.created_at)
.bind(order.expires_at)
.bind(order.seller_pubkey.as_deref())
.bind(order.buyer_pubkey.as_deref())
.execute(pool)
.await
.expect("insert order");
}
fn fixture_order(kind: Kind, seller_pk: &str, buyer_pk: &str) -> Order {
Order {
id: Uuid::new_v4(),
kind: kind.to_string(),
status: Status::Dispute.to_string(),
seller_pubkey: Some(seller_pk.to_string()),
buyer_pubkey: Some(buyer_pk.to_string()),
amount: 100_000,
fiat_code: "USD".to_string(),
fiat_amount: 10,
payment_method: "lightning".to_string(),
created_at: Utc::now().timestamp(),
expires_at: Utc::now().timestamp() + 3600,
..Order::default()
}
}
fn stub_preimage() -> String {
"00".repeat(32)
}
async fn insert_bond(
pool: &Pool<Sqlite>,
order_id: Uuid,
pubkey: &str,
state: BondState,
) -> Bond {
let mut b = Bond::new_requested(order_id, pubkey.to_string(), BondRole::Taker, 10_000);
b.state = state.to_string();
b.preimage = Some(stub_preimage());
b.hash = None;
sqlx_crud::Crud::create(b.clone(), pool).await.unwrap();
b
}
fn taker_pk() -> &'static str {
"1111111111111111111111111111111111111111111111111111111111111111"
}
fn maker_pk() -> &'static str {
"2222222222222222222222222222222222222222222222222222222222222222"
}
fn order_msg_with(payload: Option<Payload>) -> Message {
Message::new_order(
Some(Uuid::new_v4()),
None,
None,
Action::AdminSettle,
payload,
)
}
#[test]
fn extract_returns_default_when_payload_absent() {
let msg = order_msg_with(None);
let br = extract_bond_resolution(&msg);
assert!(!br.slash_seller);
assert!(!br.slash_buyer);
}
#[test]
fn extract_returns_default_for_unrelated_payload_shapes() {
let msg = order_msg_with(Some(Payload::TextMessage("hi".into())));
let br = extract_bond_resolution(&msg);
assert!(!br.slash_seller);
assert!(!br.slash_buyer);
}
#[test]
fn extract_returns_payload_when_present() {
let payload = Payload::BondResolution(BondResolution {
slash_seller: true,
slash_buyer: false,
});
let msg = order_msg_with(Some(payload));
let br = extract_bond_resolution(&msg);
assert!(br.slash_seller);
assert!(!br.slash_buyer);
}
#[tokio::test]
async fn validate_null_payload_passes_with_no_bonds() {
let pool = setup_pool().await;
let order = fixture_order(Kind::Sell, maker_pk(), taker_pk());
insert_order_row(&pool, &order).await;
let res = BondResolution {
slash_seller: false,
slash_buyer: false,
};
validate_bond_resolution(&pool, &order, &res).await.unwrap();
}
#[tokio::test]
async fn validate_slash_buyer_passes_when_buyer_has_locked_bond() {
let pool = setup_pool().await;
let order = fixture_order(Kind::Sell, maker_pk(), taker_pk());
insert_order_row(&pool, &order).await;
insert_bond(&pool, order.id, taker_pk(), BondState::Locked).await;
let res = BondResolution {
slash_seller: false,
slash_buyer: true,
};
validate_bond_resolution(&pool, &order, &res).await.unwrap();
}
#[tokio::test]
async fn validate_slash_seller_on_sell_apply_to_take_rejects() {
let pool = setup_pool().await;
let order = fixture_order(Kind::Sell, maker_pk(), taker_pk());
insert_order_row(&pool, &order).await;
insert_bond(&pool, order.id, taker_pk(), BondState::Locked).await;
let res = BondResolution {
slash_seller: true,
slash_buyer: false,
};
let err = validate_bond_resolution(&pool, &order, &res)
.await
.unwrap_err();
assert!(
matches!(err, MostroCantDo(CantDoReason::InvalidPayload)),
"expected CantDo(InvalidPayload), got {err:?}"
);
}
#[tokio::test]
async fn validate_rejects_when_bond_table_is_empty() {
let pool = setup_pool().await;
let order = fixture_order(Kind::Sell, maker_pk(), taker_pk());
insert_order_row(&pool, &order).await;
let res = BondResolution {
slash_seller: false,
slash_buyer: true,
};
let err = validate_bond_resolution(&pool, &order, &res)
.await
.unwrap_err();
assert!(matches!(err, MostroCantDo(CantDoReason::InvalidPayload)));
}
#[tokio::test]
async fn apply_null_payload_releases_all_active_bonds() {
let pool = setup_pool().await;
let order = fixture_order(Kind::Sell, maker_pk(), taker_pk());
insert_order_row(&pool, &order).await;
let bond = insert_bond(&pool, order.id, taker_pk(), BondState::Locked).await;
let res = BondResolution {
slash_seller: false,
slash_buyer: false,
};
apply_bond_resolution(
&pool,
&mut StubSettle::new(),
&order,
&res,
BondSlashReason::LostDispute,
)
.await
.unwrap();
let row: (String,) = sqlx::query_as("SELECT state FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(
row.0,
BondState::Released.to_string(),
"null payload must release, not slash"
);
}
#[tokio::test]
async fn apply_slash_buyer_on_sell_order_transitions_taker_bond() {
let pool = setup_pool().await;
let order = fixture_order(Kind::Sell, maker_pk(), taker_pk());
insert_order_row(&pool, &order).await;
let bond = insert_bond(&pool, order.id, taker_pk(), BondState::Locked).await;
let res = BondResolution {
slash_seller: false,
slash_buyer: true,
};
apply_bond_resolution(
&pool,
&mut StubSettle::new(),
&order,
&res,
BondSlashReason::LostDispute,
)
.await
.unwrap();
let row: (String, Option<String>, Option<i64>, Option<i64>) = sqlx::query_as(
"SELECT state, slashed_reason, slashed_at, node_share_sats \
FROM bonds WHERE id = ?",
)
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, BondState::PendingPayout.to_string());
assert_eq!(row.1.as_deref(), Some("lost-dispute"));
assert!(row.2.unwrap() > 0, "slashed_at must be set");
assert_eq!(row.3, Some(0));
}
#[tokio::test]
async fn apply_is_idempotent_on_already_pending_payout() {
let pool = setup_pool().await;
let order = fixture_order(Kind::Sell, maker_pk(), taker_pk());
insert_order_row(&pool, &order).await;
let bond = insert_bond(&pool, order.id, taker_pk(), BondState::Locked).await;
let res = BondResolution {
slash_seller: false,
slash_buyer: true,
};
apply_bond_resolution(
&pool,
&mut StubSettle::new(),
&order,
&res,
BondSlashReason::LostDispute,
)
.await
.unwrap();
let first: (String, Option<i64>, Option<i64>) =
sqlx::query_as("SELECT state, slashed_at, node_share_sats FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(first.0, BondState::PendingPayout.to_string());
std::thread::sleep(std::time::Duration::from_secs(1));
apply_bond_resolution(
&pool,
&mut StubSettle::new(),
&order,
&res,
BondSlashReason::LostDispute,
)
.await
.unwrap();
let second: (String, Option<i64>, Option<i64>) =
sqlx::query_as("SELECT state, slashed_at, node_share_sats FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(
second.0,
BondState::PendingPayout.to_string(),
"second apply must not transition the bond out of PendingPayout"
);
assert_eq!(
first, second,
"second apply must not rebump state / slashed_at / node_share_sats"
);
}
#[tokio::test]
async fn apply_with_no_bond_rows_is_noop() {
let pool = setup_pool().await;
let order = fixture_order(Kind::Sell, maker_pk(), taker_pk());
insert_order_row(&pool, &order).await;
let res = BondResolution {
slash_seller: false,
slash_buyer: false,
};
apply_bond_resolution(
&pool,
&mut StubSettle::new(),
&order,
&res,
BondSlashReason::LostDispute,
)
.await
.unwrap();
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM bonds")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count.0, 0);
}
#[tokio::test]
async fn slash_one_settles_exactly_one_htlc() {
let pool = setup_pool().await;
let order = fixture_order(Kind::Sell, maker_pk(), taker_pk());
insert_order_row(&pool, &order).await;
let bond = insert_bond(&pool, order.id, taker_pk(), BondState::Locked).await;
let mut ln = StubSettle::new();
let res = BondResolution {
slash_seller: false,
slash_buyer: true,
};
apply_bond_resolution(&pool, &mut ln, &order, &res, BondSlashReason::LostDispute)
.await
.unwrap();
assert_eq!(
ln.calls(),
vec![stub_preimage()],
"slash path must settle exactly the slashed bond's HTLC"
);
let state: (String,) = sqlx::query_as("SELECT state FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(state.0, BondState::PendingPayout.to_string());
}
#[tokio::test]
async fn slash_both_settles_both_htlcs() {
let pool = setup_pool().await;
let order = fixture_order(Kind::Sell, maker_pk(), taker_pk());
insert_order_row(&pool, &order).await;
let buyer_bond = insert_bond(&pool, order.id, taker_pk(), BondState::Locked).await;
let seller_bond = insert_bond(&pool, order.id, maker_pk(), BondState::Locked).await;
let mut ln = StubSettle::new();
let res = BondResolution {
slash_seller: true,
slash_buyer: true,
};
apply_bond_resolution(&pool, &mut ln, &order, &res, BondSlashReason::LostDispute)
.await
.unwrap();
assert_eq!(
ln.calls().len(),
2,
"both slashed bonds must have their HTLCs settled immediately"
);
for b in [&buyer_bond, &seller_bond] {
let state: (String,) = sqlx::query_as("SELECT state FROM bonds WHERE id = ?")
.bind(b.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(state.0, BondState::PendingPayout.to_string());
}
}
#[tokio::test]
async fn non_slashed_bond_is_released_not_settled() {
let pool = setup_pool().await;
let order = fixture_order(Kind::Sell, maker_pk(), taker_pk());
insert_order_row(&pool, &order).await;
let bond = insert_bond(&pool, order.id, taker_pk(), BondState::Locked).await;
let mut ln = StubSettle::new();
let res = BondResolution {
slash_seller: false,
slash_buyer: false,
};
apply_bond_resolution(&pool, &mut ln, &order, &res, BondSlashReason::LostDispute)
.await
.unwrap();
assert!(
ln.calls().is_empty(),
"non-slashed (released) bonds must not invoke settle_hold_invoice"
);
let state: (String,) = sqlx::query_as("SELECT state FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(state.0, BondState::Released.to_string());
}
#[tokio::test]
async fn slash_treats_already_settled_as_success() {
let pool = setup_pool().await;
let order = fixture_order(Kind::Sell, maker_pk(), taker_pk());
insert_order_row(&pool, &order).await;
let bond = insert_bond(&pool, order.id, taker_pk(), BondState::Locked).await;
let mut ln = StubSettle::new();
ln.fail_next_with("code=AlreadyExists: invoice already settled");
let res = BondResolution {
slash_seller: false,
slash_buyer: true,
};
apply_bond_resolution(&pool, &mut ln, &order, &res, BondSlashReason::LostDispute)
.await
.unwrap();
let state: (String,) = sqlx::query_as("SELECT state FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(
state.0,
BondState::PendingPayout.to_string(),
"already-settled error must not block the CAS"
);
}
#[tokio::test]
async fn slash_settle_transport_failure_leaves_bond_locked() {
let pool = setup_pool().await;
let order = fixture_order(Kind::Sell, maker_pk(), taker_pk());
insert_order_row(&pool, &order).await;
let bond = insert_bond(&pool, order.id, taker_pk(), BondState::Locked).await;
let mut ln = StubSettle::new();
ln.fail_next_with("code=Unavailable: connection refused");
let res = BondResolution {
slash_seller: false,
slash_buyer: true,
};
apply_bond_resolution(&pool, &mut ln, &order, &res, BondSlashReason::LostDispute)
.await
.unwrap();
let state: (String,) = sqlx::query_as("SELECT state FROM bonds WHERE id = ?")
.bind(bond.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(
state.0,
BondState::Locked.to_string(),
"transient settle failure must leave the bond Locked for admin retry"
);
}
}