use crate::config::constants::DEV_FEE_LIGHTNING_ADDRESS;
use crate::db::find_unpaid_dev_fees;
use crate::lightning::invoice::decode_invoice;
use crate::lightning::LndConnector;
use crate::lnurl::resolv_ln_address;
use crate::util::{bytes_to_string, publish_dev_fee_audit_event};
use chrono::Utc;
use mostro_core::error::MostroError;
use mostro_core::error::MostroError::MostroInternalErr;
use mostro_core::error::ServiceError;
use mostro_core::order::Order;
use sqlx::SqlitePool;
use std::collections::HashSet;
use tokio::sync::mpsc::channel;
use tracing::{error, info, warn};
#[mutants::skip]
pub async fn run_dev_fee_cycle(
pool: &SqlitePool,
ln_client: &mut LndConnector,
confirmed: &mut HashSet<uuid::Uuid>,
) {
info!("Checking for unpaid development fees");
cleanup_stale_pending_markers(pool).await;
verify_confirmed_orders(pool, ln_client, confirmed).await;
recover_partial_payments(pool, ln_client, confirmed).await;
process_new_dev_fee_payments(pool, ln_client, confirmed).await;
}
async fn cleanup_stale_pending_markers(pool: &SqlitePool) {
const CLEANUP_TTL_SECS: u64 = 300; let now_unix = Utc::now().timestamp() as u64;
let pending_orders = match sqlx::query_as::<_, Order>(
"SELECT * FROM orders WHERE dev_fee_payment_hash LIKE 'PENDING-%'",
)
.fetch_all(pool)
.await
{
Ok(orders) => orders,
Err(e) => {
error!("Failed to query stale PENDING orders: {:?}", e);
return;
}
};
let mut stale_count = 0u32;
for pending_order in pending_orders {
let order_id = pending_order.id;
let marker = pending_order
.dev_fee_payment_hash
.as_deref()
.unwrap_or_default();
let pending_ts = parse_pending_timestamp(marker);
let is_stale = match pending_ts {
Some(ts) => now_unix.saturating_sub(ts) >= CLEANUP_TTL_SECS,
None => {
warn!(
"Order {} has legacy PENDING marker without timestamp, treating as stale",
order_id
);
true
}
};
if !is_stale {
continue;
}
stale_count += 1;
let age_display = pending_ts
.map(|ts| format!("{}s", now_unix.saturating_sub(ts)))
.unwrap_or_else(|| "unknown (legacy)".to_string());
warn!(
"Resetting stale PENDING order {} (age: {})",
order_id, age_display
);
match sqlx::query(
"UPDATE orders
SET dev_fee_paid = 0, dev_fee_payment_hash = NULL
WHERE id = ? AND dev_fee_payment_hash = ?",
)
.bind(order_id)
.bind(marker)
.execute(pool)
.await
{
Ok(result) => {
if result.rows_affected() > 0 {
info!(
"Reset stale PENDING for order {}, will retry payment",
order_id
);
} else {
warn!(
"Skipping stale PENDING reset for order {}: marker changed concurrently",
order_id
);
}
}
Err(e) => {
error!(
"Failed to reset stale PENDING for order {}: {:?}",
order_id, e
);
}
}
}
if stale_count > 0 {
warn!(
"Reset {} stale PENDING dev fee orders (TTL: {}s)",
stale_count, CLEANUP_TTL_SECS
);
}
}
async fn verify_confirmed_orders(
pool: &SqlitePool,
ln_client: &mut LndConnector,
confirmed: &mut HashSet<uuid::Uuid>,
) {
let real_hash_orders = match sqlx::query_as::<_, Order>(
"SELECT * FROM orders
WHERE dev_fee_paid = 1
AND dev_fee_payment_hash IS NOT NULL
AND dev_fee_payment_hash NOT LIKE 'PENDING-%'
AND (status = 'settled-hold-invoice' OR status = 'success')",
)
.fetch_all(pool)
.await
{
Ok(orders) => orders,
Err(e) => {
error!("Failed to query confirmed dev fee orders: {:?}", e);
return;
}
};
for real_hash_order in real_hash_orders {
let order_id = real_hash_order.id;
if confirmed.contains(&order_id) {
continue;
}
match check_dev_fee_payment_status(&real_hash_order, ln_client).await {
DevFeePaymentState::Succeeded => {
confirmed.insert(order_id);
}
DevFeePaymentState::Failed => {
warn!(
"Dev fee payment reported as Failed for order {} (hash: {:?}), \
NOT resetting to avoid duplicate payment risk. \
Manual review may be needed.",
order_id, real_hash_order.dev_fee_payment_hash
);
}
DevFeePaymentState::InFlight | DevFeePaymentState::Unknown => {}
}
}
}
async fn recover_partial_payments(
pool: &SqlitePool,
ln_client: &mut LndConnector,
confirmed: &mut HashSet<uuid::Uuid>,
) {
let hash_orders = match sqlx::query_as::<_, Order>(
"SELECT * FROM orders
WHERE (status = 'settled-hold-invoice' OR status = 'success')
AND dev_fee > 0
AND dev_fee_paid = 0
AND dev_fee_payment_hash IS NOT NULL
AND dev_fee_payment_hash != ''
AND dev_fee_payment_hash NOT LIKE 'PENDING-%'",
)
.fetch_all(pool)
.await
{
Ok(orders) => orders,
Err(e) => {
error!("Failed to query partial-payment orders: {:?}", e);
return;
}
};
for hash_order in hash_orders {
let order_id = hash_order.id;
let existing_hash = hash_order.dev_fee_payment_hash.clone().unwrap_or_default();
info!(
"Order {} has existing payment hash '{}' but dev_fee_paid=0, checking LN status",
order_id, existing_hash
);
match check_dev_fee_payment_status(&hash_order, ln_client).await {
DevFeePaymentState::Succeeded => {
info!(
"Order {} payment already succeeded (hash {}), marking as paid",
order_id, existing_hash
);
match sqlx::query(
"UPDATE orders
SET dev_fee_paid = 1
WHERE id = ? AND dev_fee_paid = 0 AND dev_fee_payment_hash = ?",
)
.bind(order_id)
.bind(&existing_hash)
.execute(pool)
.await
{
Ok(result) => {
if result.rows_affected() > 0 {
confirmed.insert(order_id);
if let Ok(updated) =
sqlx::query_as::<_, Order>("SELECT * FROM orders WHERE id = ?")
.bind(order_id)
.fetch_one(pool)
.await
{
if let Err(e) =
publish_dev_fee_audit_event(&updated, &existing_hash).await
{
warn!(
"Failed to publish audit event for recovered order {}: {:?}",
order_id, e
);
}
}
} else {
if let Ok(is_paid) = sqlx::query_scalar::<_, i32>(
"SELECT dev_fee_paid FROM orders WHERE id = ?",
)
.bind(order_id)
.fetch_one(pool)
.await
{
if is_paid != 0 {
confirmed.insert(order_id);
}
}
}
}
Err(e) => {
error!(
"Failed to mark order {} as paid after confirming payment: {:?}",
order_id, e
);
}
}
}
DevFeePaymentState::Failed => {
info!(
"Order {} existing payment failed (hash {}), clearing for retry",
order_id, existing_hash
);
if let Err(e) = sqlx::query(
"UPDATE orders
SET dev_fee_payment_hash = NULL
WHERE id = ? AND dev_fee_paid = 0 AND dev_fee_payment_hash = ?",
)
.bind(order_id)
.bind(&existing_hash)
.execute(pool)
.await
{
error!(
"Failed to clear failed payment hash for order {}: {:?}",
order_id, e
);
}
}
DevFeePaymentState::InFlight => {
info!(
"Order {} payment still in-flight (hash {}), skipping",
order_id, existing_hash
);
}
DevFeePaymentState::Unknown => {
warn!(
"Order {} payment status unknown (hash {}), skipping to avoid duplicate",
order_id, existing_hash
);
}
}
}
}
async fn process_new_dev_fee_payments(
pool: &SqlitePool,
ln_client: &mut LndConnector,
confirmed: &mut HashSet<uuid::Uuid>,
) {
let unpaid_orders = match find_unpaid_dev_fees(pool).await {
Ok(orders) => orders,
Err(e) => {
error!("Failed to query unpaid dev fee orders: {:?}", e);
return;
}
};
info!("Found {} orders with unpaid dev fees", unpaid_orders.len());
for order in unpaid_orders {
let order_id = order.id;
let now_ts = Utc::now().timestamp() as u64;
let pending_marker = format!("PENDING-{}-{}", uuid::Uuid::new_v4(), now_ts);
match try_claim_order_for_dev_fee(pool, order_id, &pending_marker).await {
Ok(false) => {
info!(
"Order {} already claimed by another cycle, skipping",
order_id
);
continue;
}
Err(e) => {
error!(
"Failed to claim order {} for dev fee payment: {:?}",
order_id, e
);
continue;
}
Ok(true) => {
info!("Claimed order {} with marker {}", order_id, pending_marker);
}
}
info!("Resolving dev fee invoice for order {}", order_id);
let (payment_request, payment_hash_hex) = match tokio::time::timeout(
std::time::Duration::from_secs(20),
resolve_dev_fee_invoice(&order),
)
.await
{
Ok(Ok(result)) => result,
Ok(Err(e)) => {
error!(
"Failed to resolve dev fee invoice for order {}: {:?}",
order_id, e
);
release_pending_claim(pool, order_id, &pending_marker).await;
continue;
}
Err(_) => {
error!(
"Dev fee invoice resolution timeout (20s) for order {}",
order_id
);
release_pending_claim(pool, order_id, &pending_marker).await;
continue;
}
};
info!(
"Storing payment hash {} for order {}",
payment_hash_hex, order_id
);
let updated = match sqlx::query(
"UPDATE orders
SET dev_fee_payment_hash = ?
WHERE id = ? AND dev_fee_paid = 0 AND dev_fee_payment_hash = ?",
)
.bind(&payment_hash_hex)
.bind(order_id)
.bind(&pending_marker)
.execute(pool)
.await
{
Err(e) => {
error!(
"Failed to store payment hash for order {}: {:?}",
order_id, e
);
release_pending_claim(pool, order_id, &pending_marker).await;
continue;
}
Ok(result) => result.rows_affected() > 0,
};
if !updated {
warn!(
"Order {}: cannot store dev fee payment hash; PENDING marker changed concurrently",
order_id
);
continue;
}
let order = match sqlx::query_as::<_, Order>("SELECT * FROM orders WHERE id = ?")
.bind(order_id)
.fetch_one(pool)
.await
{
Err(e) => {
error!(
"Failed to reload order {} after storing payment hash: {:?}",
order_id, e
);
continue;
}
Ok(updated_order) => {
info!("Order {} marked with real payment hash", order_id);
updated_order
}
};
match tokio::time::timeout(
std::time::Duration::from_secs(50),
send_dev_fee_payment(&order, &payment_request, ln_client),
)
.await
{
Ok(Ok(payment_hash)) => {
handle_payment_success(order, pool, confirmed, &payment_hash).await;
}
Ok(Err(e)) => {
handle_payment_failure(order, pool, order_id, &e).await;
}
Err(_) => {
handle_payment_timeout(order, pool, ln_client, confirmed).await;
}
}
}
}
async fn handle_payment_success(
mut order: Order,
pool: &SqlitePool,
confirmed: &mut HashSet<uuid::Uuid>,
payment_hash: &str,
) {
let order_id = order.id;
let dev_fee_amount = order.dev_fee;
if order.dev_fee_payment_hash.as_deref() != Some(payment_hash) {
warn!(
"Order {}: LND returned hash '{}' differs from stored hash '{:?}', using LND's value",
order_id, payment_hash, order.dev_fee_payment_hash
);
order.dev_fee_payment_hash = Some(payment_hash.to_string());
}
order.dev_fee_paid = true;
info!("Payment succeeded for order {}, verifying DB", order_id);
match sqlx::query(
"UPDATE orders
SET dev_fee_paid = 1, dev_fee_payment_hash = ?
WHERE id = ? AND dev_fee_paid = 0",
)
.bind(payment_hash)
.bind(order_id)
.execute(pool)
.await
{
Err(e) => {
error!(
"CRITICAL: Dev fee PAID for order {} but DB update FAILED",
order_id
);
error!(" Payment amount: {} sats", dev_fee_amount);
error!(" Payment hash: {}", payment_hash);
error!(" Database error: {:?}", e);
error!(" ACTION REQUIRED: Manual reconciliation");
}
Ok(result) => {
if result.rows_affected() == 0 {
if let Ok(is_paid) =
sqlx::query_scalar::<_, i32>("SELECT dev_fee_paid FROM orders WHERE id = ?")
.bind(order_id)
.fetch_one(pool)
.await
{
if is_paid != 0 {
confirmed.insert(order_id);
}
}
return;
}
info!("Dev fee payment completed for order {}", order_id);
info!(" Amount: {} sats, Hash: {}", dev_fee_amount, payment_hash);
confirmed.insert(order_id);
if let Ok(verified_order) =
sqlx::query_as::<_, Order>("SELECT * FROM orders WHERE id = ?")
.bind(order_id)
.fetch_one(pool)
.await
{
info!(
"VERIFICATION: order_id={}, dev_fee_paid={}, hash={:?}",
verified_order.id,
verified_order.dev_fee_paid,
verified_order.dev_fee_payment_hash
);
if let Err(e) = publish_dev_fee_audit_event(&verified_order, payment_hash).await {
warn!(
"Failed to publish audit event for order {}: {:?}",
order_id, e
);
}
}
}
}
}
async fn handle_payment_failure(
_order: Order,
pool: &SqlitePool,
order_id: uuid::Uuid,
e: &MostroError,
) {
error!(
"Dev fee payment failed for order {} - error: {:?}",
order_id, e
);
warn!(
"Keeping payment hash for order {} to let idempotency check verify on next cycle",
order_id
);
if let Err(db_err) = sqlx::query(
"UPDATE orders
SET dev_fee_paid = 0
WHERE id = ? AND dev_fee_paid = 0",
)
.bind(order_id)
.execute(pool)
.await
{
error!(
"Failed to update order {} after payment failure: {:?}",
order_id, db_err
);
}
}
async fn handle_payment_timeout(
order: Order,
pool: &SqlitePool,
ln_client: &mut LndConnector,
confirmed: &mut HashSet<uuid::Uuid>,
) {
let order_id = order.id;
let dev_fee = order.dev_fee;
warn!(
"Dev fee payment timeout (50s) for order {} ({} sats), checking LN status",
order_id, dev_fee
);
match check_dev_fee_payment_status(&order, ln_client).await {
DevFeePaymentState::Succeeded => {
info!(
"Payment actually succeeded for order {} despite timeout",
order_id
);
if let Err(e) = sqlx::query(
"UPDATE orders
SET dev_fee_paid = 1
WHERE id = ? AND dev_fee_paid = 0 AND dev_fee_payment_hash = ?",
)
.bind(order_id)
.bind(order.dev_fee_payment_hash.as_deref().unwrap_or_default())
.execute(pool)
.await
{
error!(
"Payment succeeded but failed to update DB for order {}: {:?}",
order_id, e
);
}
confirmed.insert(order_id);
}
DevFeePaymentState::InFlight => {
warn!(
"Payment still in-flight for order {}, keeping hash",
order_id
);
}
DevFeePaymentState::Failed => {
info!(
"Payment confirmed failed for order {}, clearing hash for retry",
order_id
);
if let Err(e) = sqlx::query(
"UPDATE orders
SET dev_fee_paid = 0, dev_fee_payment_hash = NULL
WHERE id = ? AND dev_fee_paid = 0 AND dev_fee_payment_hash = ?",
)
.bind(order_id)
.bind(order.dev_fee_payment_hash.as_deref().unwrap_or_default())
.execute(pool)
.await
{
error!(
"Failed to reset after confirmed failure for order {}: {:?}",
order_id, e
);
}
}
DevFeePaymentState::Unknown => {
warn!(
"Cannot determine payment status for order {}, keeping hash to avoid duplicate",
order_id
);
}
}
}
async fn release_pending_claim(pool: &SqlitePool, order_id: uuid::Uuid, pending_marker: &str) {
if let Err(e) = sqlx::query(
"UPDATE orders SET dev_fee_payment_hash = NULL
WHERE id = ? AND dev_fee_payment_hash = ?",
)
.bind(order_id)
.bind(pending_marker)
.execute(pool)
.await
{
error!(
"Failed to release PENDING claim for order {}: {:?}",
order_id, e
);
}
}
enum DevFeePaymentState {
Succeeded,
InFlight,
Failed,
Unknown,
}
async fn check_dev_fee_payment_status(
order: &Order,
ln_client: &mut LndConnector,
) -> DevFeePaymentState {
use fedimint_tonic_lnd::lnrpc::payment::PaymentStatus;
let payment_hash_str = match &order.dev_fee_payment_hash {
Some(h) if !h.starts_with("PENDING-") => h.clone(),
_ => {
warn!(
"Order {} has no trackable payment hash, cannot verify LN status",
order.id
);
return DevFeePaymentState::Unknown;
}
};
use nostr_sdk::nostr::hashes::hex::FromHex;
let payment_hash_bytes: Vec<u8> = match FromHex::from_hex(&payment_hash_str) {
Ok(bytes) => bytes,
Err(e) => {
error!(
"Failed to decode payment hash '{}' for order {}: {}",
payment_hash_str, order.id, e
);
return DevFeePaymentState::Unknown;
}
};
match tokio::time::timeout(
std::time::Duration::from_secs(10),
ln_client.check_payment_status(&payment_hash_bytes),
)
.await
{
Ok(Ok(status)) => match status {
PaymentStatus::Succeeded => DevFeePaymentState::Succeeded,
PaymentStatus::InFlight => DevFeePaymentState::InFlight,
PaymentStatus::Failed => DevFeePaymentState::Failed,
_ => DevFeePaymentState::Unknown,
},
Ok(Err(e)) => {
warn!(
"LN status check failed for order {} (hash {}): {:?}",
order.id, payment_hash_str, e
);
DevFeePaymentState::Unknown
}
Err(_) => {
warn!(
"LN status check timed out for order {} (hash {})",
order.id, payment_hash_str
);
DevFeePaymentState::Unknown
}
}
}
pub(crate) async fn try_claim_order_for_dev_fee(
pool: &SqlitePool,
order_id: uuid::Uuid,
pending_marker: &str,
) -> Result<bool, MostroError> {
let result = sqlx::query(
"UPDATE orders SET dev_fee_payment_hash = ?
WHERE id = ?
AND (status = 'settled-hold-invoice' OR status = 'success')
AND dev_fee > 0
AND dev_fee_paid = 0
AND (dev_fee_payment_hash IS NULL OR dev_fee_payment_hash = '')",
)
.bind(pending_marker)
.bind(order_id)
.execute(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
Ok(result.rows_affected() > 0)
}
fn parse_pending_timestamp(marker: &str) -> Option<u64> {
let stripped = marker.strip_prefix("PENDING-")?;
if stripped.len() <= 37 {
return None;
}
if stripped.as_bytes().get(36) != Some(&b'-') {
return None;
}
let ts_str = &stripped[37..];
ts_str.parse::<u64>().ok().filter(|&ts| ts > 1_000_000_000)
}
pub async fn resolve_dev_fee_invoice(order: &Order) -> Result<(String, String), MostroError> {
info!(
"Resolving dev fee invoice for order {} - amount: {} sats to {}",
order.id, order.dev_fee, DEV_FEE_LIGHTNING_ADDRESS
);
if order.dev_fee <= 0 {
return Err(MostroInternalErr(ServiceError::WrongAmountError));
}
let payment_request = tokio::time::timeout(
std::time::Duration::from_secs(15),
resolv_ln_address(DEV_FEE_LIGHTNING_ADDRESS, order.dev_fee as u64),
)
.await
.map_err(|_| {
error!(
"Dev fee LNURL resolution timeout for order {} ({} sats)",
order.id, order.dev_fee
);
MostroInternalErr(ServiceError::LnAddressParseError)
})?
.map_err(|e| {
error!(
"Dev fee LNURL resolution failed for order {} ({} sats): {:?}",
order.id, order.dev_fee, e
);
e
})?;
if payment_request.is_empty() {
error!(
"Dev fee LNURL resolution returned empty invoice for order {} ({} sats)",
order.id, order.dev_fee
);
return Err(MostroInternalErr(ServiceError::LnAddressParseError));
}
let invoice = decode_invoice(&payment_request)?;
let payment_hash_hex = bytes_to_string(invoice.payment_hash().as_ref());
info!(
"Resolved dev fee invoice for order {} - hash: {}",
order.id, payment_hash_hex
);
Ok((payment_request, payment_hash_hex))
}
pub async fn send_dev_fee_payment(
order: &Order,
payment_request: &str,
ln_client: &mut LndConnector,
) -> Result<String, MostroError> {
use fedimint_tonic_lnd::lnrpc::payment::PaymentStatus;
info!(
"Sending dev fee payment for order {} - amount: {} sats",
order.id, order.dev_fee
);
if order.dev_fee <= 0 {
return Err(MostroInternalErr(ServiceError::WrongAmountError));
}
let (tx, mut rx) = channel(100);
tokio::time::timeout(
std::time::Duration::from_secs(5),
ln_client.send_payment(payment_request, order.dev_fee, tx),
)
.await
.map_err(|_| {
error!(
"Dev fee send_payment timeout for order {} ({} sats)",
order.id, order.dev_fee
);
MostroInternalErr(ServiceError::LnPaymentError(
"send_payment timeout".to_string(),
))
})?
.map_err(|e| {
error!(
"Dev fee send_payment failed for order {} ({} sats): {:?}",
order.id, order.dev_fee, e
);
e
})?;
let payment_result = tokio::time::timeout(std::time::Duration::from_secs(25), async {
while let Some(msg) = rx.recv().await {
if let Ok(status) = PaymentStatus::try_from(msg.payment.status) {
match status {
PaymentStatus::Succeeded => {
return Ok(msg.payment.payment_hash);
}
PaymentStatus::Failed => {
error!(
"Dev fee payment failed for order {} ({} sats) - failure_reason: {}",
order.id, order.dev_fee, msg.payment.failure_reason
);
return Err(MostroInternalErr(ServiceError::LnPaymentError(format!(
"payment failed: reason {}",
msg.payment.failure_reason
))));
}
_ => {}
}
}
}
error!(
"Dev fee payment channel closed for order {} ({} sats)",
order.id, order.dev_fee
);
Err(MostroInternalErr(ServiceError::LnPaymentError(
"channel closed".to_string(),
)))
})
.await
.map_err(|_| {
error!(
"Dev fee payment result timeout for order {} ({} sats)",
order.id, order.dev_fee
);
MostroInternalErr(ServiceError::LnPaymentError("result timeout".to_string()))
})??;
info!(
"Dev fee payment succeeded for order {} - amount: {} sats, hash: {}",
order.id, order.dev_fee, payment_result
);
Ok(payment_result)
}
#[cfg(test)]
mod tests {
use super::{
cleanup_stale_pending_markers, handle_payment_failure, handle_payment_success,
parse_pending_timestamp, release_pending_claim, try_claim_order_for_dev_fee,
};
use crate::config::settings::Settings;
use crate::config::MOSTRO_CONFIG;
use mostro_core::error::MostroError;
use sqlx::sqlite::SqlitePoolOptions;
use std::collections::HashSet;
fn init_test_settings() {
let _ = MOSTRO_CONFIG.set(Settings {
database: Default::default(),
nostr: Default::default(),
mostro: Default::default(),
lightning: Default::default(),
rpc: Default::default(),
expiration: Some(Default::default()),
anti_abuse_bond: None,
price: None,
});
}
async fn setup_orders_db() -> sqlx::SqlitePool {
init_test_settings();
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect(":memory:")
.await
.expect("Failed to create in-memory DB");
sqlx::query(include_str!("../../migrations/20221222153301_orders.sql"))
.execute(&pool)
.await
.expect("Failed to create base orders table");
sqlx::query(include_str!("../../migrations/20251126120000_dev_fee.sql"))
.execute(&pool)
.await
.expect("Failed to apply dev_fee migration");
sqlx::query(include_str!(
"../../migrations/20260530120000_cashu_escrow_fields.sql"
))
.execute(&pool)
.await
.expect("Failed to apply cashu escrow migration");
pool
}
async fn insert_test_order(
pool: &sqlx::SqlitePool,
id: uuid::Uuid,
status: &str,
dev_fee: i64,
dev_fee_paid: bool,
dev_fee_payment_hash: Option<&str>,
) {
sqlx::query(
r#"
INSERT INTO orders (id, kind, event_id, status, premium, payment_method,
amount, fiat_code, fiat_amount, created_at, expires_at,
dev_fee, dev_fee_paid, dev_fee_payment_hash)
VALUES (?, 'sell', 'evt1', ?, 0, 'cash', 1000, 'USD', 100, 0, 0, ?, ?, ?)
"#,
)
.bind(id)
.bind(status)
.bind(dev_fee)
.bind(dev_fee_paid as i32)
.bind(dev_fee_payment_hash)
.execute(pool)
.await
.expect("Failed to insert test order");
}
#[tokio::test]
async fn atomic_claim_succeeds_for_unclaimed_order() {
let pool = setup_orders_db().await;
let order_id = uuid::Uuid::new_v4();
insert_test_order(&pool, order_id, "success", 100, false, None).await;
let claimed = try_claim_order_for_dev_fee(&pool, order_id, "PENDING-test-1234567890")
.await
.unwrap();
assert!(claimed, "Should successfully claim an unclaimed order");
let hash: Option<String> =
sqlx::query_scalar("SELECT dev_fee_payment_hash FROM orders WHERE id = ?")
.bind(order_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(hash.as_deref(), Some("PENDING-test-1234567890"));
}
#[tokio::test]
async fn atomic_claim_fails_for_already_claimed_order() {
let pool = setup_orders_db().await;
let order_id = uuid::Uuid::new_v4();
insert_test_order(
&pool,
order_id,
"success",
100,
false,
Some("PENDING-other-cycle"),
)
.await;
let claimed = try_claim_order_for_dev_fee(&pool, order_id, "PENDING-test-1234567890")
.await
.unwrap();
assert!(
!claimed,
"Should not claim an order already claimed by another cycle"
);
let hash: Option<String> =
sqlx::query_scalar("SELECT dev_fee_payment_hash FROM orders WHERE id = ?")
.bind(order_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(hash.as_deref(), Some("PENDING-other-cycle"));
}
#[tokio::test]
async fn atomic_claim_fails_for_already_paid_order() {
let pool = setup_orders_db().await;
let order_id = uuid::Uuid::new_v4();
insert_test_order(&pool, order_id, "success", 100, true, None).await;
let claimed = try_claim_order_for_dev_fee(&pool, order_id, "PENDING-test-1234567890")
.await
.unwrap();
assert!(!claimed, "Should not claim an already-paid order");
}
#[tokio::test]
async fn atomic_claim_fails_for_nonexistent_order() {
let pool = setup_orders_db().await;
let order_id = uuid::Uuid::new_v4();
let claimed = try_claim_order_for_dev_fee(&pool, order_id, "PENDING-test-1234567890")
.await
.unwrap();
assert!(!claimed, "Should not claim a nonexistent order");
}
#[tokio::test]
async fn atomic_claim_with_empty_hash_succeeds() {
let pool = setup_orders_db().await;
let order_id = uuid::Uuid::new_v4();
insert_test_order(&pool, order_id, "success", 100, false, Some("")).await;
let claimed = try_claim_order_for_dev_fee(&pool, order_id, "PENDING-test-1234567890")
.await
.unwrap();
assert!(claimed, "Should claim order with empty string payment hash");
}
#[test]
fn test_parse_new_format_with_uuid() {
let marker = "PENDING-550e8400-e29b-41d4-a716-446655440000-1707700000";
assert_eq!(parse_pending_timestamp(marker), Some(1707700000));
}
#[test]
fn test_parse_legacy_format_uuid() {
let marker = "PENDING-550e8400-e29b-41d4-a716-446655440000";
assert_eq!(parse_pending_timestamp(marker), None);
}
#[test]
fn test_parse_not_pending() {
assert_eq!(parse_pending_timestamp("some-random-hash"), None);
assert_eq!(parse_pending_timestamp(""), None);
}
#[test]
fn test_parse_plain_pending() {
assert_eq!(parse_pending_timestamp("PENDING"), None);
assert_eq!(parse_pending_timestamp("PENDING-"), None);
}
#[test]
fn test_parse_invalid_timestamp() {
let marker = "PENDING-550e8400-e29b-41d4-a716-446655440000-notanumber";
assert_eq!(parse_pending_timestamp(marker), None);
}
#[test]
fn test_parse_too_small_timestamp() {
let marker = "PENDING-550e8400-e29b-41d4-a716-446655440000-12345";
assert_eq!(parse_pending_timestamp(marker), None);
}
#[test]
fn test_parse_current_timestamp() {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let marker = format!("PENDING-550e8400-e29b-41d4-a716-446655440000-{}", now);
assert_eq!(parse_pending_timestamp(&marker), Some(now));
}
#[tokio::test]
async fn cleanup_stale_pending_markers_resets_only_stale() {
let pool = setup_orders_db().await;
let fresh_id = uuid::Uuid::new_v4();
let stale_id = uuid::Uuid::new_v4();
insert_test_order(
&pool,
fresh_id,
"success",
100,
false,
Some("PENDING-550e8400-e29b-41d4-a716-446655440000-9999999999"),
)
.await;
insert_test_order(
&pool,
stale_id,
"success",
100,
false,
Some("PENDING-550e8400-e29b-41d4-a716-446655440000-1"),
)
.await;
cleanup_stale_pending_markers(&pool).await;
let fresh: (i32, Option<String>) =
sqlx::query_as("SELECT dev_fee_paid, dev_fee_payment_hash FROM orders WHERE id = ?")
.bind(fresh_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(fresh.0, 0);
assert_eq!(
fresh.1.as_deref(),
Some("PENDING-550e8400-e29b-41d4-a716-446655440000-9999999999")
);
let stale: (i32, Option<String>) =
sqlx::query_as("SELECT dev_fee_paid, dev_fee_payment_hash FROM orders WHERE id = ?")
.bind(stale_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(stale.0, 0);
assert_eq!(stale.1, None);
}
#[tokio::test]
async fn release_pending_claim_clears_matching_marker_only() {
let pool = setup_orders_db().await;
let id = uuid::Uuid::new_v4();
insert_test_order(
&pool,
id,
"success",
100,
false,
Some("PENDING-test-marker"),
)
.await;
release_pending_claim(&pool, id, "PENDING-other-marker").await;
let hash: Option<String> =
sqlx::query_scalar("SELECT dev_fee_payment_hash FROM orders WHERE id = ?")
.bind(id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(hash.as_deref(), Some("PENDING-test-marker"));
release_pending_claim(&pool, id, "PENDING-test-marker").await;
let hash_after: Option<String> =
sqlx::query_scalar("SELECT dev_fee_payment_hash FROM orders WHERE id = ?")
.bind(id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(hash_after, None);
}
#[tokio::test]
async fn handle_payment_success_sets_paid_and_hash_in_db() {
use mostro_core::order::Order;
let pool = setup_orders_db().await;
let order_id = uuid::Uuid::new_v4();
insert_test_order(&pool, order_id, "success", 100, false, None).await;
let order = sqlx::query_as::<_, Order>("SELECT * FROM orders WHERE id = ?")
.bind(order_id)
.fetch_one(&pool)
.await
.unwrap();
let mut confirmed = HashSet::new();
handle_payment_success(order, &pool, &mut confirmed, "deadbeef").await;
let row: (i32, Option<String>) =
sqlx::query_as("SELECT dev_fee_paid, dev_fee_payment_hash FROM orders WHERE id = ?")
.bind(order_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, 1);
assert_eq!(row.1.as_deref(), Some("deadbeef"));
assert!(confirmed.contains(&order_id));
}
#[tokio::test]
async fn handle_payment_failure_marks_unpaid_and_preserves_hash() {
use mostro_core::order::Order;
use mostro_core::prelude::ServiceError;
let pool = setup_orders_db().await;
let order_id = uuid::Uuid::new_v4();
insert_test_order(
&pool,
order_id,
"success",
100,
false,
Some("existing-hash"),
)
.await;
let order = sqlx::query_as::<_, Order>("SELECT * FROM orders WHERE id = ?")
.bind(order_id)
.fetch_one(&pool)
.await
.unwrap();
let err = MostroError::MostroInternalErr(ServiceError::UnexpectedError(
"test failure".to_string(),
));
handle_payment_failure(order, &pool, order_id, &err).await;
let row: (i32, Option<String>) =
sqlx::query_as("SELECT dev_fee_paid, dev_fee_payment_hash FROM orders WHERE id = ?")
.bind(order_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, 0);
assert_eq!(row.1.as_deref(), Some("existing-hash"));
}
#[tokio::test]
async fn targeted_dev_fee_updates_do_not_overwrite_status() {
use mostro_core::order::Order;
let pool = setup_orders_db().await;
let order_id = uuid::Uuid::new_v4();
insert_test_order(
&pool,
order_id,
"settled-hold-invoice",
100,
false,
Some("PENDING-test-marker"),
)
.await;
let stale_copy = sqlx::query_as::<_, Order>("SELECT * FROM orders WHERE id = ?")
.bind(order_id)
.fetch_one(&pool)
.await
.unwrap();
sqlx::query("UPDATE orders SET status = 'success' WHERE id = ?")
.bind(order_id)
.execute(&pool)
.await
.unwrap();
let mut confirmed = HashSet::new();
handle_payment_success(stale_copy, &pool, &mut confirmed, "deadbeef").await;
let row: (String, i32, Option<String>) = sqlx::query_as(
"SELECT status, dev_fee_paid, dev_fee_payment_hash FROM orders WHERE id = ?",
)
.bind(order_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(
row.0, "success",
"status must not be overwritten by dev-fee write"
);
assert_eq!(row.1, 1);
assert_eq!(row.2.as_deref(), Some("deadbeef"));
assert!(confirmed.contains(&order_id));
}
}