use std::str::FromStr;
use std::sync::Arc;
use chrono::Utc;
use fedimint_tonic_lnd::lnrpc::invoice::InvoiceState;
use mostro_core::error::{MostroError, MostroError::MostroInternalErr, ServiceError};
use mostro_core::prelude::*;
use nostr_sdk::nostr::hashes::hex::FromHex;
use nostr_sdk::prelude::*;
use sqlx::{Pool, Sqlite};
use sqlx_crud::Crud;
use tokio::sync::mpsc::channel;
use tracing::{info, warn};
use uuid::Uuid;
use crate::config::settings::Settings;
use crate::lightning::{InvoiceMessage, LndConnector};
use crate::util::{
bytes_to_string, enqueue_order_msg, get_keys, set_waiting_invoice_status, show_hold_invoice,
};
use super::db::{create_bond, find_active_bonds, find_active_bonds_for_order, find_bond_by_hash};
use super::math::compute_bond_amount;
use super::model::Bond;
use super::types::{BondRole, BondState};
pub fn taker_bond_required() -> bool {
Settings::get_bond()
.filter(|cfg| cfg.enabled)
.is_some_and(|cfg| cfg.apply_to.applies_to_taker())
}
pub fn maker_bond_required() -> bool {
Settings::get_bond()
.filter(|cfg| cfg.enabled)
.is_some_and(|cfg| cfg.apply_to.applies_to_maker())
}
pub fn trade_committed_by_locked_taker_bond(bonds: &[Bond]) -> bool {
let locked = BondState::Locked.to_string();
let taker = BondRole::Taker.to_string();
bonds.iter().any(|b| b.state == locked && b.role == taker)
}
#[derive(Debug, Clone)]
pub struct TakerContext {
pub identity: String,
pub trade_index: i64,
pub buyer_invoice: Option<String>,
pub fiat_amount: i64,
pub amount: i64,
pub fee: i64,
pub dev_fee: i64,
}
pub async fn request_taker_bond(
pool: &Pool<Sqlite>,
order: &Order,
taker_pubkey: PublicKey,
request_id: Option<u64>,
taker_ctx: TakerContext,
) -> Result<Bond, MostroError> {
let cfg = Settings::get_bond().ok_or_else(|| {
MostroInternalErr(ServiceError::UnexpectedError(
"anti_abuse_bond block is missing while bond was deemed required".into(),
))
})?;
let amount = compute_bond_amount(taker_ctx.amount, cfg);
let memo = format!("mostro bond order_id={}", order.id);
let mut ln_client = LndConnector::new().await?;
let (invoice_resp, preimage, hash) = ln_client
.create_hold_invoice(&memo, amount)
.await
.map_err(|e| MostroInternalErr(ServiceError::HoldInvoiceError(e.to_string())))?;
let mut bond = Bond::new_requested(order.id, taker_pubkey.to_string(), BondRole::Taker, amount);
bond.hash = Some(bytes_to_string(&hash));
bond.preimage = Some(bytes_to_string(&preimage));
bond.payment_request = Some(invoice_resp.payment_request.clone());
bond.taker_identity = Some(taker_ctx.identity.clone());
bond.taker_trade_index = Some(taker_ctx.trade_index);
bond.taker_invoice = taker_ctx.buyer_invoice.clone();
bond.taker_fiat_amount = Some(taker_ctx.fiat_amount);
bond.taker_amount = Some(taker_ctx.amount);
bond.taker_fee = Some(taker_ctx.fee);
bond.taker_dev_fee = Some(taker_ctx.dev_fee);
let bond = create_bond(pool, bond).await?;
info!(
"Bond requested: bond_id={} order_id={} role={} amount_sats={}",
bond.id, order.id, bond.role, bond.amount_sats
);
let order_kind = order.get_order_kind().map_err(MostroInternalErr)?;
let bond_small = SmallOrder::new(
Some(order.id),
Some(order_kind),
Some(Status::Pending),
amount,
order.fiat_code.clone(),
order.min_amount,
order.max_amount,
taker_ctx.fiat_amount,
order.payment_method.clone(),
order.premium,
None,
None,
None,
None,
None,
);
if let Err(e) = bond_invoice_subscribe(hash, request_id).await {
warn!(
bond_id = %bond.id,
order_id = %bond.order_id,
"request_taker_bond: subscribe failed ({}); rolling back bond row",
e
);
let _ = release_bond(pool, &bond).await;
return Err(e);
}
enqueue_order_msg(
request_id,
Some(order.id),
Action::PayBondInvoice,
Some(Payload::PaymentRequest(
Some(bond_small),
invoice_resp.payment_request,
None,
)),
taker_pubkey,
Some(taker_ctx.trade_index),
)
.await;
let cas = sqlx::query("UPDATE orders SET status = ? WHERE id = ? AND status = ?")
.bind(Status::WaitingTakerBond.to_string())
.bind(order.id)
.bind(Status::Pending.to_string())
.execute(pool)
.await;
let claimed = match &cas {
Ok(r) => r.rows_affected() == 1,
Err(e) => {
warn!(
order_id = %order.id,
"request_taker_bond: WaitingTakerBond compare-and-swap failed: {}", e
);
false
}
};
if claimed {
let my_keys = get_keys()?;
match crate::util::update_order_event(&my_keys, Status::WaitingTakerBond, order).await {
Ok(updated) => {
if let Err(e) = sqlx::query("UPDATE orders SET event_id = ? WHERE id = ?")
.bind(&updated.event_id)
.bind(order.id)
.execute(pool)
.await
{
warn!(
order_id = %order.id,
"request_taker_bond: failed to persist event_id after WaitingTakerBond republish: {}", e
);
}
}
Err(e) => {
warn!(
order_id = %order.id,
"request_taker_bond: WaitingTakerBond republish failed: {}", e
);
}
}
}
Ok(bond)
}
pub async fn request_maker_bond(
pool: &Pool<Sqlite>,
order: &Order,
maker_pubkey: PublicKey,
notional_sats: i64,
request_id: Option<u64>,
trade_index: Option<i64>,
) -> Result<Bond, MostroError> {
let cfg = Settings::get_bond().ok_or_else(|| {
MostroInternalErr(ServiceError::UnexpectedError(
"anti_abuse_bond block is missing while maker bond was deemed required".into(),
))
})?;
let amount = compute_bond_amount(notional_sats, cfg);
let memo = format!("mostro bond order_id={}", order.id);
let mut ln_client = LndConnector::new().await?;
let (invoice_resp, preimage, hash) = ln_client
.create_hold_invoice(&memo, amount)
.await
.map_err(|e| MostroInternalErr(ServiceError::HoldInvoiceError(e.to_string())))?;
let mut bond = Bond::new_requested(order.id, maker_pubkey.to_string(), BondRole::Maker, amount);
bond.hash = Some(bytes_to_string(&hash));
bond.preimage = Some(bytes_to_string(&preimage));
bond.payment_request = Some(invoice_resp.payment_request.clone());
let bond = create_bond(pool, bond).await?;
info!(
"Maker bond requested: bond_id={} order_id={} amount_sats={}",
bond.id, order.id, bond.amount_sats
);
let order_kind = order.get_order_kind().map_err(MostroInternalErr)?;
let bond_small = SmallOrder::new(
Some(order.id),
Some(order_kind),
Some(Status::Pending),
amount,
order.fiat_code.clone(),
order.min_amount,
order.max_amount,
order.fiat_amount,
order.payment_method.clone(),
order.premium,
None,
None,
None,
None,
None,
);
if let Err(e) = bond_invoice_subscribe(hash, request_id).await {
warn!(
bond_id = %bond.id,
order_id = %bond.order_id,
"request_maker_bond: subscribe failed ({}); rolling back bond row",
e
);
let _ = release_bond(pool, &bond).await;
return Err(e);
}
enqueue_order_msg(
request_id,
Some(order.id),
Action::PayBondInvoice,
Some(Payload::PaymentRequest(
Some(bond_small),
invoice_resp.payment_request,
None,
)),
maker_pubkey,
trade_index,
)
.await;
Ok(bond)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum CancelOutcome {
AlreadyDone,
Transient,
}
fn classify_cancel_error(err: &MostroError) -> CancelOutcome {
let s = err.to_string().to_lowercase();
if s.contains("code=notfound") || s.contains("code=alreadyexists") {
return CancelOutcome::AlreadyDone;
}
if s.contains("already cancelled")
|| s.contains("already canceled")
|| s.contains("unable to locate invoice")
|| s.contains("invoice not found")
|| s.contains("no such invoice")
{
return CancelOutcome::AlreadyDone;
}
CancelOutcome::Transient
}
pub async fn release_bond(pool: &Pool<Sqlite>, bond: &Bond) -> Result<(), MostroError> {
let state = BondState::from_str(&bond.state).map_err(|e| {
MostroInternalErr(ServiceError::UnexpectedError(format!(
"Bond {} has unparseable state {:?}: {}",
bond.id, bond.state, e
)))
})?;
if state.is_terminal() {
return Ok(());
}
if let Some(hash) = bond.hash.as_ref() {
match LndConnector::new().await {
Ok(mut ln) => {
if let Err(e) = ln.cancel_hold_invoice(hash).await {
match classify_cancel_error(&e) {
CancelOutcome::AlreadyDone => {
info!(
bond_id = %bond.id,
order_id = %bond.order_id,
"cancel_hold_invoice reports already-done ({}); marking Released",
e
);
}
CancelOutcome::Transient => {
warn!(
bond_id = %bond.id,
order_id = %bond.order_id,
outcome = "transient",
"cancel_hold_invoice failed transiently ({}); leaving bond {} for retry",
e, bond.state
);
return Err(e);
}
}
}
}
Err(e) => {
warn!(
bond_id = %bond.id,
order_id = %bond.order_id,
outcome = "transient",
"could not connect to LND for cancel ({}); leaving bond {} for retry",
e, bond.state
);
return Err(e);
}
}
}
let mut updated = bond.clone();
updated.state = BondState::Released.to_string();
updated.released_at = Some(Utc::now().timestamp());
let id = updated.id;
let order_id = updated.order_id;
updated
.update(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
info!(
"Bond {} released for order {} (was state={})",
id, order_id, bond.state
);
Ok(())
}
pub async fn release_bonds_for_order(
pool: &Pool<Sqlite>,
order_id: Uuid,
) -> Result<(), MostroError> {
release_active_bonds(pool, order_id, false).await
}
async fn release_active_bonds(
pool: &Pool<Sqlite>,
order_id: Uuid,
retain_makers: bool,
) -> Result<(), MostroError> {
let bonds = find_active_bonds_for_order(pool, order_id).await?;
let maker = BondRole::Maker.to_string();
for bond in bonds.iter() {
if retain_makers && bond.role == maker {
continue;
}
if let Err(e) = release_bond(pool, bond).await {
warn!("Failed to release bond {}: {}", bond.id, e);
}
}
Ok(())
}
pub async fn release_bonds_for_order_or_warn(
pool: &Pool<Sqlite>,
order_id: Uuid,
context: &'static str,
) {
if let Err(e) = release_bonds_for_order(pool, order_id).await {
warn!("{context}: bond release failed for {}: {}", order_id, e);
}
}
pub async fn release_taker_bonds_for_order_or_warn(
pool: &Pool<Sqlite>,
order_id: Uuid,
context: &'static str,
) {
if let Err(e) = release_active_bonds(pool, order_id, true).await {
warn!("{context}: bond release failed for {}: {}", order_id, e);
}
}
pub async fn bond_invoice_subscribe(
hash: Vec<u8>,
request_id: Option<u64>,
) -> Result<(), MostroError> {
let mut ln_client = LndConnector::new().await?;
let (tx, mut rx) = channel::<InvoiceMessage>(100);
tokio::spawn(async move {
if let Err(e) = ln_client.subscribe_invoice(hash, tx).await {
warn!("Bond invoice subscriber ended with error: {e}");
}
});
let pool = crate::config::settings::get_db_pool();
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
let hash_hex = bytes_to_string(msg.hash.as_ref());
match msg.state {
InvoiceState::Accepted => {
if let Err(e) = on_bond_invoice_accepted(&hash_hex, &pool, request_id).await {
warn!("Bond invoice accepted handler error: {e}");
}
}
InvoiceState::Canceled => {
if let Err(e) = on_bond_invoice_canceled(&hash_hex, &pool).await {
warn!("Bond invoice canceled handler error: {e}");
}
}
InvoiceState::Settled => {
info!("Bond hash {hash_hex}: invoice settled");
}
InvoiceState::Open => {
info!("Bond hash {hash_hex}: invoice open (waiting for payment)");
}
}
}
});
Ok(())
}
pub async fn resubscribe_active_bonds(pool: &Arc<Pool<Sqlite>>) -> Result<(), MostroError> {
let bonds = find_active_bonds(pool.as_ref()).await?;
for bond in bonds.into_iter() {
if let Some(hash) = bond.hash.as_ref() {
match Vec::<u8>::from_hex(hash) {
Ok(bytes) => {
if let Err(e) = bond_invoice_subscribe(bytes, None).await {
warn!("Failed to resubscribe bond {}: {}", bond.id, e);
} else {
info!("Resubscribed bond {} (state={})", bond.id, bond.state);
}
}
Err(e) => warn!("Bond {} has malformed hash: {}", bond.id, e),
}
}
}
Ok(())
}
async fn on_bond_invoice_accepted(
hash: &str,
pool: &Pool<Sqlite>,
request_id: Option<u64>,
) -> Result<(), MostroError> {
let bond = match find_bond_by_hash(pool, hash).await? {
Some(b) => b,
None => {
warn!("Bond invoice accepted for unknown hash {hash}");
return Ok(());
}
};
if bond.role == BondRole::Maker.to_string() {
return on_maker_bond_accepted(&bond, hash, pool, request_id).await;
}
let now = Utc::now().timestamp();
let result = sqlx::query(
"UPDATE bonds SET state = ?, locked_at = ? \
WHERE id = ? AND state = ? \
AND NOT EXISTS ( \
SELECT 1 FROM bonds b2 \
WHERE b2.order_id = ? AND b2.state = ? AND b2.role = ? AND b2.id != ? \
)",
)
.bind(BondState::Locked.to_string())
.bind(now)
.bind(bond.id)
.bind(BondState::Requested.to_string())
.bind(bond.order_id)
.bind(BondState::Locked.to_string())
.bind(BondRole::Taker.to_string())
.bind(bond.id)
.execute(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
let current = match find_bond_by_hash(pool, hash).await? {
Some(b) => b,
None => return Ok(()),
};
let current_state = match BondState::from_str(¤t.state) {
Ok(s) => s,
Err(e) => {
warn!(
"Bond {} has unparseable state {:?}: {} — skipping resume",
current.id, current.state, e
);
return Ok(());
}
};
if result.rows_affected() == 0 && current_state != BondState::Locked {
info!(
"Bond {} lost concurrent-bonds race (current state={}) — releasing and notifying taker",
current.id, current.state
);
if !current_state.is_terminal() {
if let Err(e) = release_bond(pool, ¤t).await {
warn!(
bond_id = %current.id,
"release_bond on race-loser failed: {}", e
);
}
}
notify_loser(¤t).await;
return Ok(());
}
if result.rows_affected() == 1 {
info!("Bond {} locked for order {}", bond.id, bond.order_id);
let losers = match find_active_bonds_for_order(pool, current.order_id).await {
Ok(rows) => rows,
Err(e) => {
warn!(
order_id = %current.order_id,
"could not enumerate losing bonds after lock: {}", e
);
Vec::new()
}
};
for loser in losers
.iter()
.filter(|b| b.id != current.id && b.state == BondState::Requested.to_string())
{
if let Err(e) = release_bond(pool, loser).await {
warn!(
bond_id = %loser.id,
"failed to release losing concurrent bond: {}", e
);
}
notify_loser(loser).await;
}
}
if current_state != BondState::Locked {
return Ok(());
}
let order = Order::by_id(pool, current.order_id)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?
.ok_or_else(|| {
MostroInternalErr(ServiceError::UnexpectedError(format!(
"Bond {} references missing order {}",
current.id, current.order_id
)))
})?;
if order.status != Status::Pending.to_string()
&& order.status != Status::WaitingTakerBond.to_string()
{
info!(
"Bond {} accepted but order {} is in status {} — skipping resume",
current.id, order.id, order.status
);
return Ok(());
}
let order = promote_taker_context_to_order(pool, order, ¤t).await?;
let my_keys = get_keys()?;
resume_take_after_bond(pool, order, &my_keys, request_id).await
}
async fn on_maker_bond_accepted(
bond: &Bond,
hash: &str,
pool: &Pool<Sqlite>,
request_id: Option<u64>,
) -> Result<(), MostroError> {
let now = Utc::now().timestamp();
let result =
sqlx::query("UPDATE bonds SET state = ?, locked_at = ? WHERE id = ? AND state = ?")
.bind(BondState::Locked.to_string())
.bind(now)
.bind(bond.id)
.bind(BondState::Requested.to_string())
.execute(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
let current = match find_bond_by_hash(pool, hash).await? {
Some(b) => b,
None => return Ok(()),
};
let current_state = match BondState::from_str(¤t.state) {
Ok(s) => s,
Err(e) => {
warn!(
"Maker bond {} has unparseable state {:?}: {} — skipping publish",
current.id, current.state, e
);
return Ok(());
}
};
if current_state != BondState::Locked {
info!(
"Maker bond {} no longer Locked (state={}) — skipping publish",
current.id, current.state
);
return Ok(());
}
if result.rows_affected() == 1 {
info!(
"Maker bond {} locked for order {}",
current.id, current.order_id
);
}
let order = Order::by_id(pool, current.order_id)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?
.ok_or_else(|| {
MostroInternalErr(ServiceError::UnexpectedError(format!(
"Maker bond {} references missing order {}",
current.id, current.order_id
)))
})?;
if order.status != Status::WaitingMakerBond.to_string() {
info!(
"Maker bond {} accepted but order {} is in status {} — skipping publish",
current.id, order.id, order.status
);
return Ok(());
}
let my_keys = get_keys()?;
crate::util::resume_publish_after_maker_bond(pool, &my_keys, order, request_id).await
}
async fn notify_loser(bond: &Bond) {
if let Ok(taker_pk) = PublicKey::from_str(&bond.pubkey) {
enqueue_order_msg(
None,
Some(bond.order_id),
Action::Canceled,
None,
taker_pk,
None,
)
.await;
}
}
async fn promote_taker_context_to_order(
pool: &Pool<Sqlite>,
mut order: Order,
bond: &Bond,
) -> Result<Order, MostroError> {
let kind = order.get_order_kind().map_err(MostroInternalErr)?;
match kind {
mostro_core::order::Kind::Buy => {
order.seller_pubkey = Some(bond.pubkey.clone());
order.master_seller_pubkey = bond.taker_identity.clone();
order.trade_index_seller = bond.taker_trade_index;
}
mostro_core::order::Kind::Sell => {
order.buyer_pubkey = Some(bond.pubkey.clone());
order.master_buyer_pubkey = bond.taker_identity.clone();
order.trade_index_buyer = bond.taker_trade_index;
order.buyer_invoice = bond.taker_invoice.clone();
}
}
if let Some(v) = bond.taker_fiat_amount {
order.fiat_amount = v;
}
if let Some(v) = bond.taker_amount {
order.amount = v;
}
if let Some(v) = bond.taker_fee {
order.fee = v;
}
if let Some(v) = bond.taker_dev_fee {
order.dev_fee = v;
}
order.set_timestamp_now();
order
.update(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))
}
async fn on_bond_invoice_canceled(hash: &str, pool: &Pool<Sqlite>) -> Result<(), MostroError> {
let bond = match find_bond_by_hash(pool, hash).await? {
Some(b) => b,
None => return Ok(()),
};
if BondState::from_str(&bond.state)
.map(|s| s.is_terminal())
.unwrap_or(false)
{
return Ok(());
}
let mut updated = bond.clone();
updated.state = BondState::Released.to_string();
updated.released_at = Some(Utc::now().timestamp());
updated
.update(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
info!(
"Bond {} marked Released after LND cancel (order {})",
bond.id, bond.order_id
);
if let Err(e) = maybe_drop_waiting_taker_bond(pool, bond.order_id).await {
warn!(
order_id = %bond.order_id,
"on_bond_invoice_canceled: failed to flip status back to Pending: {}", e
);
}
Ok(())
}
pub(crate) async fn drop_waiting_taker_bond_to_pending(
pool: &Pool<Sqlite>,
order_id: Uuid,
) -> Result<bool, MostroError> {
let cas = sqlx::query(
"UPDATE orders SET status = ? \
WHERE id = ? AND status = ? \
AND NOT EXISTS ( \
SELECT 1 FROM bonds \
WHERE order_id = ? AND state IN (?, ?) AND role = ? \
)",
)
.bind(Status::Pending.to_string())
.bind(order_id)
.bind(Status::WaitingTakerBond.to_string())
.bind(order_id)
.bind(BondState::Requested.to_string())
.bind(BondState::Locked.to_string())
.bind(BondRole::Taker.to_string())
.execute(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
Ok(cas.rows_affected() == 1)
}
pub(crate) async fn maybe_drop_waiting_taker_bond(
pool: &Pool<Sqlite>,
order_id: Uuid,
) -> Result<(), MostroError> {
if !drop_waiting_taker_bond_to_pending(pool, order_id).await? {
return Ok(());
}
let fresh = match Order::by_id(pool, order_id)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?
{
Some(o) => o,
None => return Ok(()),
};
let my_keys = get_keys()?;
match crate::util::update_order_event(&my_keys, Status::Pending, &fresh).await {
Ok(updated) => {
if let Err(e) = sqlx::query("UPDATE orders SET event_id = ? WHERE id = ?")
.bind(&updated.event_id)
.bind(order_id)
.execute(pool)
.await
{
warn!(
order_id = %order_id,
"maybe_drop_waiting_taker_bond: failed to persist event_id after Pending republish: {}", e
);
}
}
Err(e) => {
warn!(
order_id = %order_id,
"maybe_drop_waiting_taker_bond: Pending republish failed: {}", e
);
}
}
info!(
"Order {} dropped back to Pending after last bond released",
order_id
);
Ok(())
}
async fn resume_take_after_bond(
pool: &Pool<Sqlite>,
mut order: Order,
my_keys: &Keys,
request_id: Option<u64>,
) -> Result<(), MostroError> {
let kind = order.get_order_kind().map_err(MostroInternalErr)?;
let buyer_pubkey = order.get_buyer_pubkey().map_err(MostroInternalErr)?;
let seller_pubkey = order.get_seller_pubkey().map_err(MostroInternalErr)?;
match kind {
mostro_core::order::Kind::Buy => {
show_hold_invoice(
my_keys,
None,
&buyer_pubkey,
&seller_pubkey,
order,
request_id,
)
.await
}
mostro_core::order::Kind::Sell => {
if order.buyer_invoice.is_some() {
let payment_request = order.buyer_invoice.clone();
show_hold_invoice(
my_keys,
payment_request,
&buyer_pubkey,
&seller_pubkey,
order,
request_id,
)
.await
} else {
set_waiting_invoice_status(&mut order, buyer_pubkey, request_id)
.await
.map_err(|_| MostroInternalErr(ServiceError::UpdateOrderStatusError))?;
let order_updated =
crate::util::update_order_event(my_keys, Status::WaitingBuyerInvoice, &order)
.await
.map_err(|e| MostroInternalErr(ServiceError::NostrError(e.to_string())))?;
order_updated
.update(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
Ok(())
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::app::bond::types::BondRole;
use sqlx::sqlite::SqlitePoolOptions;
async fn setup_pool() -> Pool<Sqlite> {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect(":memory:")
.await
.expect("open in-memory sqlite");
sqlx::query(include_str!(
"../../../migrations/20221222153301_orders.sql"
))
.execute(&pool)
.await
.expect("orders migration");
for stmt in include_str!("../../../migrations/20251126120000_dev_fee.sql")
.split(';')
.map(str::trim)
.filter(|s| !s.is_empty() && !s.lines().all(|l| l.trim_start().starts_with("--")))
{
sqlx::query(stmt)
.execute(&pool)
.await
.expect("dev_fee migration");
}
sqlx::query(include_str!(
"../../../migrations/20260423120000_anti_abuse_bond.sql"
))
.execute(&pool)
.await
.expect("bonds migration");
sqlx::query(include_str!(
"../../../migrations/20260518120000_bond_payout_payment_hash.sql"
))
.execute(&pool)
.await
.expect("bond_payout_payment_hash migration");
for stmt in include_str!("../../../migrations/20260530120000_cashu_escrow_fields.sql")
.split(';')
.map(str::trim)
.filter(|s| !s.is_empty() && !s.lines().all(|l| l.trim_start().starts_with("--")))
{
sqlx::query(stmt)
.execute(&pool)
.await
.expect("cashu escrow migration");
}
pool
}
async fn insert_order(pool: &Pool<Sqlite>, id: Uuid) {
sqlx::query(
r#"INSERT INTO orders (
id, kind, event_id, status, premium, payment_method,
amount, fiat_code, fiat_amount, created_at, expires_at
) VALUES (?, 'buy', ?, 'pending', 0, 'ln', 1000, 'USD', 10, 0, 0)"#,
)
.bind(id)
.bind(id.simple().to_string())
.execute(pool)
.await
.expect("insert order");
}
fn make_bond(order_id: Uuid, state: BondState) -> Bond {
let mut b = Bond::new_requested(order_id, "a".repeat(64), BondRole::Taker, 1_500);
b.state = state.to_string();
b.hash = Some("c".repeat(64));
b
}
async fn try_lock(pool: &Pool<Sqlite>, bond: &Bond) -> u64 {
sqlx::query(
"UPDATE bonds SET state = ?, locked_at = ? \
WHERE id = ? AND state = ? \
AND NOT EXISTS ( \
SELECT 1 FROM bonds b2 \
WHERE b2.order_id = ? AND b2.state = ? AND b2.role = ? AND b2.id != ? \
)",
)
.bind(BondState::Locked.to_string())
.bind(Utc::now().timestamp())
.bind(bond.id)
.bind(BondState::Requested.to_string())
.bind(bond.order_id)
.bind(BondState::Locked.to_string())
.bind(BondRole::Taker.to_string())
.bind(bond.id)
.execute(pool)
.await
.unwrap()
.rows_affected()
}
#[tokio::test]
async fn release_bond_is_idempotent_for_terminal_states() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id).await;
let bond = create_bond(&pool, make_bond(order_id, BondState::Released))
.await
.unwrap();
release_bond(&pool, &bond).await.unwrap();
let after = find_bond_by_hash(&pool, &"c".repeat(64))
.await
.unwrap()
.unwrap();
assert_eq!(after.state, "released");
assert_eq!(after.released_at, bond.released_at);
}
#[tokio::test]
async fn release_bonds_for_order_runs_regardless_of_feature_flag() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id).await;
let mut bond = make_bond(order_id, BondState::Requested);
bond.hash = None;
create_bond(&pool, bond).await.unwrap();
release_bonds_for_order(&pool, order_id).await.unwrap();
let active = find_active_bonds_for_order(&pool, order_id).await.unwrap();
assert!(
active.is_empty(),
"bond must be released even with feature disabled"
);
}
#[tokio::test]
async fn release_bond_without_hash_marks_released() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id).await;
let mut bond = make_bond(order_id, BondState::Requested);
bond.hash = None;
let bond = create_bond(&pool, bond).await.unwrap();
release_bond(&pool, &bond).await.unwrap();
let active = find_active_bonds_for_order(&pool, order_id).await.unwrap();
assert!(active.is_empty(), "bond should no longer be active");
}
#[test]
fn taker_bond_required_is_false_without_config() {
assert!(!taker_bond_required());
}
#[test]
fn maker_bond_required_is_false_without_config() {
assert!(!maker_bond_required());
}
#[test]
fn locked_maker_bond_does_not_commit_the_trade() {
let order_id = Uuid::new_v4();
let mut maker = Bond::new_requested(order_id, "a".repeat(64), BondRole::Maker, 1_000);
maker.state = BondState::Locked.to_string();
let taker_requested = Bond::new_requested(order_id, "b".repeat(64), BondRole::Taker, 1_000);
assert!(
!trade_committed_by_locked_taker_bond(&[maker.clone(), taker_requested.clone()]),
"a Locked maker bond + a Requested taker bond must not commit the trade"
);
let mut taker_locked = taker_requested;
taker_locked.state = BondState::Locked.to_string();
assert!(
trade_committed_by_locked_taker_bond(&[maker, taker_locked]),
"a Locked taker bond commits the trade"
);
}
#[test]
fn empty_bond_set_does_not_commit_the_trade() {
assert!(!trade_committed_by_locked_taker_bond(&[]));
}
#[tokio::test]
async fn maker_bond_lock_is_singleton_and_idempotent() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id).await;
let mut bond = Bond::new_requested(order_id, "d".repeat(64), BondRole::Maker, 1_000);
bond.hash = Some("e".repeat(64));
let bond = create_bond(&pool, bond).await.unwrap();
async fn try_lock(pool: &Pool<Sqlite>, bond: &Bond) -> u64 {
sqlx::query("UPDATE bonds SET state = ?, locked_at = ? WHERE id = ? AND state = ?")
.bind(BondState::Locked.to_string())
.bind(Utc::now().timestamp())
.bind(bond.id)
.bind(BondState::Requested.to_string())
.execute(pool)
.await
.unwrap()
.rows_affected()
}
assert_eq!(try_lock(&pool, &bond).await, 1, "first lock wins");
assert_eq!(
try_lock(&pool, &bond).await,
0,
"duplicate firing is a no-op"
);
let after = find_bond_by_hash(&pool, &"e".repeat(64))
.await
.unwrap()
.unwrap();
assert_eq!(after.state, BondState::Locked.to_string());
assert_eq!(after.role, BondRole::Maker.to_string());
}
#[tokio::test]
async fn maker_bond_lock_does_not_touch_other_bonds() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id).await;
let mut maker = Bond::new_requested(order_id, "d".repeat(64), BondRole::Maker, 1_000);
maker.hash = Some("e".repeat(64));
let maker = create_bond(&pool, maker).await.unwrap();
let mut other = Bond::new_requested(order_id, "f".repeat(64), BondRole::Taker, 1_000);
other.hash = Some("0".repeat(64));
let other = create_bond(&pool, other).await.unwrap();
sqlx::query("UPDATE bonds SET state = ?, locked_at = ? WHERE id = ? AND state = ?")
.bind(BondState::Locked.to_string())
.bind(Utc::now().timestamp())
.bind(maker.id)
.bind(BondState::Requested.to_string())
.execute(&pool)
.await
.unwrap();
let other_after = find_bond_by_hash(&pool, &"0".repeat(64))
.await
.unwrap()
.unwrap();
assert_eq!(
other_after.state,
BondState::Requested.to_string(),
"unrelated bond must stay Requested"
);
assert_eq!(other_after.id, other.id);
}
#[tokio::test]
async fn lock_race_guard_admits_only_one_winner() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id).await;
let mut a = make_bond(order_id, BondState::Requested);
a.hash = Some("a".repeat(64));
a.pubkey = "a".repeat(64);
let bond_a = create_bond(&pool, a).await.unwrap();
let mut b = make_bond(order_id, BondState::Requested);
b.hash = Some("b".repeat(64));
b.pubkey = "b".repeat(64);
let bond_b = create_bond(&pool, b).await.unwrap();
assert_eq!(try_lock(&pool, &bond_a).await, 1);
assert_eq!(try_lock(&pool, &bond_b).await, 0);
let active = find_active_bonds_for_order(&pool, order_id).await.unwrap();
let states: Vec<_> = active.iter().map(|b| (b.id, b.state.clone())).collect();
assert!(states
.iter()
.any(|(id, s)| *id == bond_a.id && s == &BondState::Locked.to_string()));
assert!(states
.iter()
.any(|(id, s)| *id == bond_b.id && s == &BondState::Requested.to_string()));
}
#[tokio::test]
async fn locked_maker_bond_does_not_block_taker_lock_race() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id).await;
let mut maker = Bond::new_requested(order_id, "m".repeat(64), BondRole::Maker, 1_000);
maker.state = BondState::Locked.to_string();
maker.hash = Some("d".repeat(64));
let maker = create_bond(&pool, maker).await.unwrap();
let mut taker = make_bond(order_id, BondState::Requested);
taker.pubkey = "t".repeat(64);
taker.hash = Some("e".repeat(64));
let taker = create_bond(&pool, taker).await.unwrap();
assert_eq!(
try_lock(&pool, &taker).await,
1,
"taker must win the lock race despite the Locked maker bond"
);
let maker_after = find_bond_by_hash(&pool, &"d".repeat(64))
.await
.unwrap()
.unwrap();
assert_eq!(maker_after.id, maker.id);
assert_eq!(maker_after.state, BondState::Locked.to_string());
let taker_after = find_bond_by_hash(&pool, &"e".repeat(64))
.await
.unwrap()
.unwrap();
assert_eq!(taker_after.id, taker.id);
assert_eq!(taker_after.state, BondState::Locked.to_string());
}
#[tokio::test]
async fn maybe_drop_waiting_taker_bond_ignores_locked_maker_bond() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id).await;
sqlx::query("UPDATE orders SET status = ? WHERE id = ?")
.bind(Status::WaitingTakerBond.to_string())
.bind(order_id)
.execute(&pool)
.await
.unwrap();
let mut maker = Bond::new_requested(order_id, "m".repeat(64), BondRole::Maker, 1_000);
maker.state = BondState::Locked.to_string();
maker.hash = Some("d".repeat(64));
create_bond(&pool, maker).await.unwrap();
let dropped = drop_waiting_taker_bond_to_pending(&pool, order_id)
.await
.unwrap();
assert!(
dropped,
"order must drop to Pending: the Locked maker bond must not count as an active taker bond"
);
let status: String = sqlx::query_scalar("SELECT status FROM orders WHERE id = ?")
.bind(order_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(status, Status::Pending.to_string());
}
#[tokio::test]
async fn drop_waiting_taker_bond_held_by_active_taker_bond() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id).await;
sqlx::query("UPDATE orders SET status = ? WHERE id = ?")
.bind(Status::WaitingTakerBond.to_string())
.bind(order_id)
.execute(&pool)
.await
.unwrap();
let mut taker = make_bond(order_id, BondState::Requested);
taker.pubkey = "t".repeat(64);
taker.hash = Some("e".repeat(64));
create_bond(&pool, taker).await.unwrap();
let dropped = drop_waiting_taker_bond_to_pending(&pool, order_id)
.await
.unwrap();
assert!(!dropped, "an active taker bond must keep the order parked");
let status: String = sqlx::query_scalar("SELECT status FROM orders WHERE id = ?")
.bind(order_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(status, Status::WaitingTakerBond.to_string());
}
#[tokio::test]
async fn concurrent_requested_bonds_coexist() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id).await;
for tag in ['a', 'b', 'c'] {
let mut bond = make_bond(order_id, BondState::Requested);
bond.pubkey = tag.to_string().repeat(64);
bond.hash = Some(tag.to_string().repeat(64));
create_bond(&pool, bond).await.unwrap();
}
let active = find_active_bonds_for_order(&pool, order_id).await.unwrap();
assert_eq!(active.len(), 3);
assert!(active
.iter()
.all(|b| b.state == BondState::Requested.to_string()));
}
#[tokio::test]
async fn maybe_drop_waiting_taker_bond_noop_when_other_bonds_active() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id).await;
sqlx::query("UPDATE orders SET status = ? WHERE id = ?")
.bind(Status::WaitingTakerBond.to_string())
.bind(order_id)
.execute(&pool)
.await
.unwrap();
let mut bond = make_bond(order_id, BondState::Requested);
bond.hash = None;
create_bond(&pool, bond).await.unwrap();
maybe_drop_waiting_taker_bond(&pool, order_id)
.await
.expect("noop when other bonds active");
let order = Order::by_id(&pool, order_id).await.unwrap().unwrap();
assert_eq!(order.status, Status::WaitingTakerBond.to_string());
}
#[tokio::test]
async fn maybe_drop_waiting_taker_bond_noop_when_order_not_in_waiting_status() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id).await;
maybe_drop_waiting_taker_bond(&pool, order_id)
.await
.expect("noop on non-WaitingTakerBond order");
let order = Order::by_id(&pool, order_id).await.unwrap().unwrap();
assert_eq!(order.status, Status::Pending.to_string());
}
#[tokio::test]
async fn maybe_drop_does_not_revert_concurrent_lock() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id).await;
sqlx::query("UPDATE orders SET status = ? WHERE id = ?")
.bind(Status::WaitingTakerBond.to_string())
.bind(order_id)
.execute(&pool)
.await
.unwrap();
let mut locked = make_bond(order_id, BondState::Locked);
locked.hash = None;
create_bond(&pool, locked).await.unwrap();
maybe_drop_waiting_taker_bond(&pool, order_id)
.await
.expect("noop in the presence of a Locked bond");
let order = Order::by_id(&pool, order_id).await.unwrap().unwrap();
assert_eq!(
order.status,
Status::WaitingTakerBond.to_string(),
"the CAS must NOT flip back to Pending while a Locked bond races"
);
}
#[tokio::test]
async fn pending_to_waiting_taker_bond_cas_refuses_to_overwrite_concurrent_transition() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id).await;
sqlx::query("UPDATE orders SET status = ? WHERE id = ?")
.bind(Status::WaitingPayment.to_string())
.bind(order_id)
.execute(&pool)
.await
.unwrap();
let result = sqlx::query("UPDATE orders SET status = ? WHERE id = ? AND status = ?")
.bind(Status::WaitingTakerBond.to_string())
.bind(order_id)
.bind(Status::Pending.to_string())
.execute(&pool)
.await
.unwrap();
assert_eq!(
result.rows_affected(),
0,
"CAS must refuse to flip a no-longer-Pending order"
);
let order = Order::by_id(&pool, order_id).await.unwrap().unwrap();
assert_eq!(
order.status,
Status::WaitingPayment.to_string(),
"the concurrent transition must NOT be reverted"
);
}
#[tokio::test]
async fn pending_to_waiting_taker_bond_cas_flips_when_status_unchanged() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
insert_order(&pool, order_id).await;
let result = sqlx::query("UPDATE orders SET status = ? WHERE id = ? AND status = ?")
.bind(Status::WaitingTakerBond.to_string())
.bind(order_id)
.bind(Status::Pending.to_string())
.execute(&pool)
.await
.unwrap();
assert_eq!(result.rows_affected(), 1);
let order = Order::by_id(&pool, order_id).await.unwrap().unwrap();
assert_eq!(order.status, Status::WaitingTakerBond.to_string());
}
#[tokio::test]
async fn maybe_drop_waiting_taker_bond_noop_when_order_missing() {
let pool = setup_pool().await;
let order_id = Uuid::new_v4();
maybe_drop_waiting_taker_bond(&pool, order_id)
.await
.expect("noop on missing order");
}
fn ln_err(msg: &str) -> MostroError {
MostroInternalErr(ServiceError::LnNodeError(msg.to_string()))
}
#[test]
fn classify_already_done_by_grpc_code() {
assert_eq!(
classify_cancel_error(&ln_err("code=NotFound message=...")),
CancelOutcome::AlreadyDone
);
assert_eq!(
classify_cancel_error(&ln_err("code=AlreadyExists message=duplicate")),
CancelOutcome::AlreadyDone
);
}
#[test]
fn classify_already_done_by_lnd_message() {
for msg in [
"code=Unknown message=invoice with that hash already cancelled",
"code=Unknown message=invoice with that hash already canceled",
"code=Unknown message=unable to locate invoice",
"code=Unknown message=invoice not found for hash",
"code=Unknown message=no such invoice",
] {
assert_eq!(
classify_cancel_error(&ln_err(msg)),
CancelOutcome::AlreadyDone,
"expected AlreadyDone for: {msg}"
);
}
}
#[test]
fn classify_transient_for_transport_and_unknown() {
for msg in [
"code=Unavailable message=connection refused",
"code=DeadlineExceeded message=timeout",
"code=Internal message=server crashed",
"code=Unknown message=something we don't recognise",
"transport error",
] {
assert_eq!(
classify_cancel_error(&ln_err(msg)),
CancelOutcome::Transient,
"expected Transient for: {msg}"
);
}
}
}