use crate::config::constants::{DEV_FEE_AUDIT_EVENT_KIND, DEV_FEE_LIGHTNING_ADDRESS};
use crate::config::settings::{get_db_pool, Settings};
use crate::config::*;
use crate::db;
use crate::db::is_user_present;
use crate::flow;
use crate::lightning;
use crate::lightning::invoice::is_valid_invoice;
use crate::lightning::LndConnector;
use crate::lnurl::HTTP_CLIENT;
use crate::messages;
use crate::models::Yadio;
use crate::nip33::{create_platform_tag_values, new_order_event, new_rating_event, order_to_tags};
use crate::NOSTR_CLIENT;
use chrono::Duration;
use fedimint_tonic_lnd::lnrpc::invoice::InvoiceState;
use mostro_core::prelude::*;
use nostr_sdk::prelude::*;
use sqlx::Pool;
use sqlx::QueryBuilder;
use sqlx::Sqlite;
use sqlx::SqlitePool;
use sqlx_crud::Crud;
use std::collections::HashMap;
use std::fmt::Write;
use std::str::FromStr;
use std::thread;
use tokio::sync::mpsc::channel;
use tracing::info;
use uuid::Uuid;
pub type FiatNames = std::collections::HashMap<String, String>;
const MAX_RETRY: u16 = 4;
type OrderKind = mostro_core::order::Kind;
fn yadio_base_url() -> String {
let provider = Settings::get_price().and_then(|price| {
price
.providers
.get(&crate::price::ProviderId::Yadio.to_string())
.map(|yadio| (yadio.url.as_str(), yadio.enabled))
});
let legacy = Settings::get_mostro().bitcoin_price_api_url.clone();
select_yadio_base_url(provider, &legacy)
}
fn normalize_base_url(raw: &str) -> String {
raw.trim().trim_end_matches('/').to_string()
}
fn select_yadio_base_url(provider: Option<(&str, bool)>, legacy: &str) -> String {
if let Some((url, enabled)) = provider {
if enabled {
let url = normalize_base_url(url);
if !url.is_empty() {
return url;
}
}
}
normalize_base_url(legacy)
}
pub async fn retries_yadio_request(
req_string: &str,
fiat_code: &str,
) -> Result<(Option<reqwest::Response>, bool), MostroError> {
let api_req_string = format!("{}/currencies", yadio_base_url());
let fiat_list_check = HTTP_CLIENT
.get(api_req_string)
.send()
.await
.map_err(|_| MostroInternalErr(ServiceError::NoAPIResponse))?
.json::<FiatNames>()
.await
.map_err(|_| MostroInternalErr(ServiceError::MalformedAPIRes))?
.contains_key(fiat_code);
if !fiat_list_check {
return Ok((None, fiat_list_check));
}
let res = HTTP_CLIENT
.get(req_string)
.send()
.await
.map_err(|_| MostroInternalErr(ServiceError::NoAPIResponse))?;
Ok((Some(res), fiat_list_check))
}
pub fn get_bitcoin_price(fiat_code: &str) -> Result<f64, MostroError> {
crate::price::get_bitcoin_price(fiat_code)
}
pub async fn get_market_quote(
fiat_amount: &i64,
fiat_code: &str,
premium: i64,
) -> Result<i64, MostroError> {
let req_string = format!(
"{}/convert/{}/{}/BTC",
yadio_base_url(),
fiat_amount,
fiat_code
);
info!("Requesting API price: {}", req_string);
let mut req = (None, false);
let mut no_answer_api = false;
for retries_num in 1..=MAX_RETRY {
match retries_yadio_request(&req_string, fiat_code).await {
Ok(response) => {
req = response;
break;
}
Err(_e) => {
if retries_num == MAX_RETRY {
no_answer_api = true;
}
println!(
"API price request failed retrying - {} tentatives left.",
(MAX_RETRY - retries_num)
);
thread::sleep(std::time::Duration::from_secs(2));
}
};
}
if no_answer_api {
return Err(MostroError::MostroInternalErr(ServiceError::NoAPIResponse));
}
if !req.1 {
return Err(MostroError::MostroInternalErr(ServiceError::NoCurrency));
}
if req.0.is_none() {
return Err(MostroError::MostroInternalErr(
ServiceError::MalformedAPIRes,
));
}
let quote = if let Some(q) = req.0 {
q.json::<Yadio>()
.await
.map_err(|_| MostroError::MostroInternalErr(ServiceError::MessageSerializationError))?
} else {
return Err(MostroError::MostroInternalErr(
ServiceError::MalformedAPIRes,
));
};
let mut sats = quote.result * 100_000_000_f64;
if premium != 0 {
sats = sats - (premium as f64) / 100_f64 * sats;
}
Ok(sats as i64)
}
pub fn get_fee(amount: i64) -> i64 {
let mostro_settings = Settings::get_mostro();
let split_fee = (mostro_settings.fee * amount as f64) / 2.0;
split_fee.round() as i64
}
pub fn calculate_dev_fee(total_mostro_fee: i64, percentage: f64) -> i64 {
let dev_fee = (total_mostro_fee as f64) * percentage;
dev_fee.round() as i64
}
pub fn get_dev_fee(total_mostro_fee: i64) -> i64 {
let mostro_settings = Settings::get_mostro();
calculate_dev_fee(total_mostro_fee, mostro_settings.dev_fee_percentage)
}
pub fn get_expiration_date(expire: Option<i64>) -> i64 {
let mostro_settings = Settings::get_mostro();
let expire_date: i64;
let expires_at_max: i64 = Timestamp::now().as_secs() as i64
+ Duration::days(mostro_settings.max_expiration_days.into()).num_seconds();
if let Some(mut exp) = expire {
if exp > expires_at_max {
exp = expires_at_max;
};
expire_date = exp;
} else {
expire_date = Timestamp::now().as_secs() as i64
+ Duration::hours(mostro_settings.expiration_hours as i64).num_seconds();
}
expire_date
}
pub fn get_expiration_timestamp_for_kind(kind: u16) -> Option<i64> {
let now = Timestamp::now().as_secs() as i64;
if let Some(exp_config) = Settings::get_expiration() {
if let Some(days) = exp_config.get_expiration_for_kind(kind) {
return Some(now + Duration::days(days as i64).num_seconds());
}
}
match kind {
NOSTR_ORDER_EVENT_KIND
| NOSTR_RATING_EVENT_KIND
| NOSTR_DISPUTE_EVENT_KIND
| DEV_FEE_AUDIT_EVENT_KIND => {
let mostro_settings = Settings::get_mostro();
Some(now + Duration::days(mostro_settings.max_expiration_days.into()).num_seconds())
}
_ => None,
}
}
pub async fn get_tags_for_new_order(
new_order_db: &Order,
pool: &SqlitePool,
identity_pubkey: &PublicKey,
trade_pubkey: &PublicKey,
mostro_keys: &Keys,
) -> Result<Option<Tags>, MostroError> {
let mostro_pubkey = mostro_keys.public_key().to_hex();
match is_user_present(pool, identity_pubkey.to_string()).await {
Ok(user) => {
order_to_tags(
new_order_db,
Some((user.total_rating, user.total_reviews, user.created_at)),
Some(&mostro_pubkey),
)
}
Err(_) => {
if identity_pubkey == trade_pubkey {
order_to_tags(new_order_db, Some((0.0, 0, 0)), Some(&mostro_pubkey))
} else {
Err(MostroInternalErr(ServiceError::InvalidPubkey))
}
}
}
}
#[allow(clippy::too_many_arguments)]
pub async fn publish_order(
pool: &SqlitePool,
keys: &Keys,
new_order: &SmallOrder,
initiator_pubkey: PublicKey,
identity_pubkey: PublicKey,
trade_pubkey: PublicKey,
request_id: Option<u64>,
trade_index: Option<i64>,
) -> Result<(), MostroError> {
let mut new_order_db = match prepare_new_order(
new_order,
initiator_pubkey,
trade_index,
identity_pubkey,
trade_pubkey,
)
.await
{
Ok(order) => order,
Err(e) => {
return Err(e);
}
};
let maker_bond_required = crate::app::bond::maker_bond_required();
if maker_bond_required {
let notional = maker_bond_notional_sats(&new_order_db)?;
new_order_db.status = Status::WaitingMakerBond.to_string();
let order = new_order_db
.create(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
info!("New order saved (awaiting maker bond) Id: {}", order.id);
if let Err(e) = crate::app::bond::request_maker_bond(
pool,
&order,
trade_pubkey,
notional,
request_id,
trade_index,
)
.await
{
tracing::warn!(
order_id = %order.id,
"publish_order: request_maker_bond failed ({}); deleting stranded WaitingMakerBond order",
e
);
if let Err(del) = sqlx::query("DELETE FROM orders WHERE id = ? AND status = ?")
.bind(order.id)
.bind(Status::WaitingMakerBond.to_string())
.execute(pool)
.await
{
tracing::warn!(
order_id = %order.id,
"publish_order: failed to delete stranded order: {}", del
);
}
return Err(e);
}
return Ok(());
}
let order = new_order_db
.create(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
info!("New order saved Id: {}", order.id);
finalize_order_publication(
pool,
keys,
order,
identity_pubkey,
trade_pubkey,
request_id,
trade_index,
)
.await
}
fn maker_bond_notional_sats(order: &Order) -> Result<i64, MostroError> {
if order.is_range_order() {
let max_fiat = order.max_amount.filter(|m| *m > 0).ok_or_else(|| {
MostroInternalErr(ServiceError::UnexpectedError(
"range order missing positive max_amount".to_string(),
))
})?;
let price = get_bitcoin_price(&order.fiat_code)?;
if price <= 0.0 {
return Err(MostroInternalErr(ServiceError::NoAPIResponse));
}
let sats = (max_fiat as f64 / price) * 1E8;
return Ok(sats as i64);
}
if order.amount > 0 {
return Ok(order.amount);
}
let price = get_bitcoin_price(&order.fiat_code)?;
if price <= 0.0 {
return Err(MostroInternalErr(ServiceError::NoAPIResponse));
}
let sats = (order.fiat_amount as f64 / price) * 1E8;
Ok(sats as i64)
}
async fn finalize_order_publication(
pool: &SqlitePool,
keys: &Keys,
mut order: Order,
identity_pubkey: PublicKey,
trade_pubkey: PublicKey,
request_id: Option<u64>,
trade_index: Option<i64>,
) -> Result<(), MostroError> {
let order_id = order.id;
order.status = Status::Pending.to_string();
let event = if let Some(tags) =
get_tags_for_new_order(&order, pool, &identity_pubkey, &trade_pubkey, keys).await?
{
new_order_event(keys, "", order_id.to_string(), tags)
.map_err(|e| MostroInternalErr(ServiceError::NostrError(e.to_string())))?
} else {
return Err(MostroInternalErr(ServiceError::InvalidPubkey));
};
info!("Order event to be published: {event:#?}");
let event_id = event.id.to_string();
info!("Publishing Event Id: {event_id} for Order Id: {order_id}");
order.event_id = event_id;
let mut small = order.as_new_order();
small.id = Some(order_id);
order
.update(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
enqueue_order_msg(
request_id,
Some(order_id),
Action::NewOrder,
Some(Payload::Order(small)),
trade_pubkey,
trade_index,
)
.await;
NOSTR_CLIENT
.get()
.unwrap()
.send_event(&event)
.await
.map(|_s| ())
.map_err(|err| MostroInternalErr(ServiceError::NostrError(err.to_string())))
}
pub async fn resume_publish_after_maker_bond(
pool: &SqlitePool,
keys: &Keys,
order: Order,
request_id: Option<u64>,
) -> Result<(), MostroError> {
let cas = sqlx::query("UPDATE orders SET status = ? WHERE id = ? AND status = ?")
.bind(Status::Pending.to_string())
.bind(order.id)
.bind(Status::WaitingMakerBond.to_string())
.execute(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
if cas.rows_affected() != 1 {
info!(
"resume_publish_after_maker_bond: order {} no longer WaitingMakerBond — skipping deferred publish",
order.id
);
return Ok(());
}
let kind = order.get_order_kind().map_err(MostroInternalErr)?;
let (trade_pubkey, identity_pubkey, trade_index) = match kind {
OrderKind::Sell => (
order.get_seller_pubkey().map_err(MostroInternalErr)?,
order
.get_master_seller_pubkey()
.map_err(MostroInternalErr)?,
order.trade_index_seller,
),
OrderKind::Buy => (
order.get_buyer_pubkey().map_err(MostroInternalErr)?,
order.get_master_buyer_pubkey().map_err(MostroInternalErr)?,
order.trade_index_buyer,
),
};
finalize_order_publication(
pool,
keys,
order,
identity_pubkey,
trade_pubkey,
request_id,
trade_index,
)
.await
}
async fn prepare_new_order(
new_order: &SmallOrder,
initiator_pubkey: PublicKey,
trade_index: Option<i64>,
identity_pubkey: PublicKey,
trade_pubkey: PublicKey,
) -> Result<Order, MostroError> {
let mut fee = 0;
let dev_fee = 0;
if new_order.amount > 0 {
fee = get_fee(new_order.amount); }
let expiry_date = get_expiration_date(new_order.expires_at);
let mut new_order_db = Order {
id: Uuid::new_v4(),
kind: OrderKind::Sell.to_string(),
status: Status::Pending.to_string(),
creator_pubkey: initiator_pubkey.to_string(),
payment_method: new_order.payment_method.clone(),
amount: new_order.amount,
fee,
dev_fee,
dev_fee_paid: false,
dev_fee_payment_hash: None,
fiat_code: new_order.fiat_code.clone(),
min_amount: new_order.min_amount,
max_amount: new_order.max_amount,
fiat_amount: new_order.fiat_amount,
premium: new_order.premium,
buyer_invoice: new_order.buyer_invoice.clone(),
created_at: Timestamp::now().as_secs() as i64,
expires_at: expiry_date,
..Default::default()
};
match new_order.kind {
Some(OrderKind::Buy) => {
new_order_db.kind = OrderKind::Buy.to_string();
new_order_db.buyer_pubkey = Some(trade_pubkey.to_string());
new_order_db.master_buyer_pubkey = Some(identity_pubkey.to_string());
new_order_db.trade_index_buyer = trade_index;
}
Some(OrderKind::Sell) => {
new_order_db.kind = OrderKind::Sell.to_string();
new_order_db.seller_pubkey = Some(trade_pubkey.to_string());
new_order_db.master_seller_pubkey = Some(identity_pubkey.to_string());
new_order_db.trade_index_seller = trade_index;
}
None => {
return Err(MostroCantDo(CantDoReason::InvalidOrderKind));
}
}
new_order_db.price_from_api = new_order.amount == 0;
Ok(new_order_db)
}
pub async fn send_dm(
receiver_pubkey: PublicKey,
sender_keys: &Keys,
payload: &str,
expiration: Option<Timestamp>,
) -> Result<(), MostroError> {
info!(
"sender key {} - receiver key {}",
sender_keys.public_key().to_hex(),
receiver_pubkey.to_hex()
);
let message = Message::from_json(payload)
.map_err(|_| MostroInternalErr(ServiceError::MessageSerializationError))?;
let event = wrap_message(
&message,
sender_keys,
sender_keys,
receiver_pubkey,
WrapOptions {
signed: false,
expiration,
..WrapOptions::default()
},
)
.await?;
info!(
"Sending message, Event ID: {} to {} with payload: {:#?}",
event.id,
receiver_pubkey.to_hex(),
payload
);
if let Ok(client) = get_nostr_client() {
client
.send_event(&event)
.await
.map_err(|e| MostroInternalErr(ServiceError::NostrError(e.to_string())))?;
}
Ok(())
}
pub async fn publish_dev_fee_audit_event(
order: &Order,
payment_hash: &str,
) -> Result<(), MostroError> {
use std::borrow::Cow;
let ln_network = match LN_STATUS.get() {
Some(status) => status.networks.join(","),
None => "unknown".to_string(),
};
let keys = get_keys()?;
let client = get_nostr_client()?;
let mut tag_list = vec![
Tag::custom(
TagKind::Custom(Cow::Borrowed("order-id")),
vec![order.id.to_string()],
),
Tag::custom(
TagKind::Custom(Cow::Borrowed("amount")),
vec![order.dev_fee.to_string()],
),
Tag::custom(
TagKind::Custom(Cow::Borrowed("hash")),
vec![payment_hash.to_string()],
),
Tag::custom(
TagKind::Custom(Cow::Borrowed("destination")),
vec![DEV_FEE_LIGHTNING_ADDRESS.to_string()],
),
Tag::custom(TagKind::Custom(Cow::Borrowed("network")), vec![ln_network]),
Tag::custom(
TagKind::Custom(Cow::Borrowed("y")),
create_platform_tag_values(Settings::get_mostro().name.as_deref()),
),
Tag::custom(
TagKind::Custom(Cow::Borrowed("z")),
vec!["dev-fee-payment".to_string()],
),
];
if let Some(expiration_timestamp) = get_expiration_timestamp_for_kind(DEV_FEE_AUDIT_EVENT_KIND)
{
tag_list.push(Tag::expiration(Timestamp::from(
expiration_timestamp as u64,
)));
}
let tags = Tags::from_list(tag_list);
let event = EventBuilder::new(nostr_sdk::Kind::Custom(DEV_FEE_AUDIT_EVENT_KIND), "")
.tags(tags)
.sign_with_keys(&keys)
.map_err(|e| MostroInternalErr(ServiceError::NostrError(e.to_string())))?;
client
.send_event(&event)
.await
.map_err(|e| MostroInternalErr(ServiceError::NostrError(e.to_string())))?;
info!(
"📡 Published dev fee audit event for order {} - {} sats to relays",
order.id, order.dev_fee
);
Ok(())
}
pub fn get_keys() -> Result<Keys, MostroError> {
let nostr_settings = Settings::get_nostr();
match Keys::parse(&nostr_settings.nsec_privkey) {
Ok(my_keys) => Ok(my_keys),
Err(e) => {
tracing::error!("Failed to parse nostr private key: {}", e);
Err(MostroInternalErr(ServiceError::NostrError(e.to_string())))
}
}
}
#[allow(clippy::too_many_arguments)]
pub async fn update_user_rating_event(
user: &str,
buyer_sent_rate: bool,
seller_sent_rate: bool,
tags: Tags,
msg: &Message,
keys: &Keys,
pool: &SqlitePool,
) -> Result<()> {
let mut order = get_order(msg, pool).await?;
let event = new_rating_event(keys, "", user.to_string(), tags)?;
info!("Sending replaceable event: {event:#?}");
if buyer_sent_rate {
order.buyer_sent_rate = buyer_sent_rate;
}
if seller_sent_rate {
order.seller_sent_rate = seller_sent_rate;
}
order.update(pool).await?;
MESSAGE_QUEUES.queue_order_rate.write().await.push(event);
Ok(())
}
async fn get_ratings_for_pending_order(
order_updated: &Order,
status: Status,
) -> Result<Option<(f64, i64, i64)>, MostroError> {
if status == Status::Pending || status == Status::WaitingTakerBond {
let identity_pubkey = match order_updated.is_sell_order() {
Ok(_) => order_updated
.get_master_seller_pubkey()
.map_err(MostroInternalErr)?,
Err(_) => order_updated
.get_master_buyer_pubkey()
.map_err(MostroInternalErr)?,
};
let trade_pubkey = match order_updated.is_sell_order() {
Ok(_) => order_updated
.get_seller_pubkey()
.map_err(MostroInternalErr)?,
Err(_) => order_updated
.get_buyer_pubkey()
.map_err(MostroInternalErr)?,
};
match is_user_present(&get_db_pool(), identity_pubkey.to_string()).await {
Ok(user) => Ok(Some((
user.total_rating,
user.total_reviews,
user.created_at,
))),
Err(_) => {
if identity_pubkey == trade_pubkey {
Ok(Some((0.0, 0, 0)))
} else {
Err(MostroInternalErr(ServiceError::InvalidPubkey))
}
}
}
} else {
Ok(None)
}
}
pub async fn update_order_event(
keys: &Keys,
status: Status,
order: &Order,
) -> Result<Order, MostroError> {
let mut order_updated = order.clone();
order_updated.status = status.to_string();
let reputation_data = get_ratings_for_pending_order(&order_updated, status).await?;
let mostro_pubkey = keys.public_key().to_hex();
if let Some(tags) = order_to_tags(&order_updated, reputation_data, Some(&mostro_pubkey))? {
let event = new_order_event(keys, "", order.id.to_string(), tags)
.map_err(|e| MostroInternalErr(ServiceError::NostrError(e.to_string())))?;
info!("Sending replaceable event: {event:#?}");
order_updated.event_id = event.id.to_string();
if let Ok(client) = get_nostr_client() {
if client.send_event(&event).await.is_err() {
tracing::warn!("order id : {} is expired", order_updated.id)
}
}
};
info!(
"Order Id: {} updated Nostr new Status: {}",
order.id,
status.to_string()
);
Ok(order_updated)
}
pub async fn connect_nostr() -> Result<Client, MostroError> {
let nostr_settings = Settings::get_nostr();
let mut limits = RelayLimits::default();
limits.messages.max_size = Some(6_000);
limits.events.max_size = Some(6_500);
let opts = ClientOptions::new().relay_limits(limits);
let client = ClientBuilder::default().opts(opts).build();
for relay in nostr_settings.relays.iter() {
client
.add_relay(relay)
.await
.map_err(|e| MostroInternalErr(ServiceError::NostrError(e.to_string())))?;
}
client.connect().await;
Ok(client)
}
pub async fn show_hold_invoice(
my_keys: &Keys,
payment_request: Option<String>,
buyer_pubkey: &PublicKey,
seller_pubkey: &PublicKey,
mut order: Order,
request_id: Option<u64>,
) -> Result<(), MostroError> {
let mut ln_client = lightning::LndConnector::new().await?;
let new_amount = order.amount + order.fee;
let (invoice_response, preimage, hash) = ln_client
.create_hold_invoice(
&messages::hold_invoice_description(
&order.id.to_string(),
&order.fiat_code,
&order.fiat_amount.to_string(),
)
.map_err(|e| MostroInternalErr(ServiceError::HoldInvoiceError(e.to_string())))?,
new_amount,
)
.await
.map_err(|e| MostroInternalErr(ServiceError::HoldInvoiceError(e.to_string())))?;
if let Some(invoice) = payment_request {
order.buyer_invoice = Some(invoice);
};
order.preimage = Some(bytes_to_string(&preimage));
order.hash = Some(bytes_to_string(&hash));
order.status = Status::WaitingPayment.to_string();
order.buyer_pubkey = Some(buyer_pubkey.to_string());
order.seller_pubkey = Some(seller_pubkey.to_string());
let pool = db::connect()
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
let order_updated = update_order_event(my_keys, Status::WaitingPayment, &order)
.await
.map_err(|e| MostroInternalErr(ServiceError::NostrError(e.to_string())))?;
order_updated
.update(&pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
let mut new_order = order.as_new_order();
new_order.status = Some(Status::WaitingPayment);
new_order.amount = new_amount;
new_order.buyer_invoice = None;
enqueue_order_msg(
request_id,
Some(order.id),
Action::PayInvoice,
Some(Payload::PaymentRequest(
Some(new_order),
invoice_response.payment_request,
None,
)),
*seller_pubkey,
order.trade_index_seller,
)
.await;
enqueue_order_msg(
request_id,
Some(order.id),
Action::WaitingSellerToPay,
None,
*buyer_pubkey,
order.trade_index_buyer,
)
.await;
let _ = invoice_subscribe(hash, request_id).await;
Ok(())
}
pub async fn invoice_subscribe(hash: Vec<u8>, request_id: Option<u64>) -> Result<(), MostroError> {
let mut ln_client_invoices = lightning::LndConnector::new().await?;
let (tx, mut rx) = channel(100);
let invoice_task = {
async move {
let _ = ln_client_invoices
.subscribe_invoice(hash, tx)
.await
.map_err(|e| e.to_string());
}
};
tokio::spawn(invoice_task);
let pool = get_db_pool();
let subs = {
async move {
while let Some(msg) = rx.recv().await {
let hash = bytes_to_string(msg.hash.as_ref());
if msg.state == InvoiceState::Accepted {
let keys = match get_keys() {
Ok(k) => k,
Err(e) => {
info!("Failed to get keys: {e}");
continue;
}
};
if let Err(e) = flow::hold_invoice_paid(&hash, request_id, &pool, &keys).await {
info!("Invoice flow error {e}");
} else {
info!("Invoice with hash {hash} accepted!");
}
} else if msg.state == InvoiceState::Settled {
if let Err(e) = flow::hold_invoice_settlement(&hash, &pool).await {
info!("Invoice flow error {e}");
}
} else if msg.state == InvoiceState::Canceled {
if let Err(e) = flow::hold_invoice_canceled(&hash, &pool).await {
info!("Invoice flow error {e}");
}
} else {
info!("Invoice with hash: {hash} subscribed!");
}
}
}
};
tokio::spawn(subs);
Ok(())
}
pub async fn get_market_amount_and_fee(
fiat_amount: i64,
fiat_code: &str,
premium: i64,
) -> Result<(i64, i64)> {
let new_sats_amount = get_market_quote(&fiat_amount, fiat_code, premium).await?;
let fee = get_fee(new_sats_amount);
Ok((new_sats_amount, fee))
}
pub async fn set_waiting_invoice_status(
order: &mut Order,
buyer_pubkey: PublicKey,
request_id: Option<u64>,
) -> Result<i64> {
let kind = OrderKind::from_str(&order.kind)
.map_err(|_| MostroCantDo(CantDoReason::InvalidOrderKind))?;
let status = Status::WaitingBuyerInvoice;
let buyer_final_amount = order.amount.saturating_sub(order.fee);
let order_data = SmallOrder::new(
Some(order.id),
Some(kind),
Some(status),
buyer_final_amount,
order.fiat_code.clone(),
order.min_amount,
order.max_amount,
order.fiat_amount,
order.payment_method.clone(),
order.premium,
None,
None,
None,
Some(order.created_at),
None,
);
enqueue_order_msg(
request_id,
Some(order.id),
Action::AddInvoice,
Some(Payload::Order(order_data)),
buyer_pubkey,
order.trade_index_buyer,
)
.await;
let seller_pubkey = order.get_seller_pubkey().map_err(MostroInternalErr)?;
enqueue_order_msg(
request_id,
Some(order.id),
Action::WaitingBuyerInvoice,
None,
seller_pubkey,
order.trade_index_seller,
)
.await;
Ok(order.amount)
}
pub async fn rate_counterpart(
buyer_pubkey: &PublicKey,
seller_pubkey: &PublicKey,
order: &Order,
request_id: Option<u64>,
) -> Result<()> {
enqueue_order_msg(
request_id,
Some(order.id),
Action::Rate,
None,
*buyer_pubkey,
None,
)
.await;
enqueue_order_msg(
request_id,
Some(order.id),
Action::Rate,
None,
*seller_pubkey,
None,
)
.await;
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub async fn settle_seller_hold_invoice(
event: &UnwrappedMessage,
ln_client: &mut LndConnector,
action: Action,
is_admin: bool,
order: &Order,
) -> Result<(), MostroError> {
let seller_pubkey = order
.get_seller_pubkey()
.map_err(|_| MostroCantDo(CantDoReason::InvalidPubkey))?
.to_string();
let sender_pubkey = event.sender.to_string();
if !is_admin && sender_pubkey != seller_pubkey {
return Err(MostroCantDo(CantDoReason::InvalidPubkey));
}
if let Some(preimage) = order.preimage.as_ref() {
ln_client.settle_hold_invoice(preimage).await?;
info!("{action}: Order Id {}: hold invoice settled", order.id);
} else {
return Err(MostroCantDo(CantDoReason::InvalidInvoice));
}
Ok(())
}
pub fn bytes_to_string(bytes: &[u8]) -> String {
bytes.iter().fold(String::new(), |mut output, b| {
let _ = write!(output, "{:02x}", b);
output
})
}
pub async fn enqueue_cant_do_msg(
request_id: Option<u64>,
order_id: Option<Uuid>,
reason: CantDoReason,
destination_key: PublicKey,
) {
let message = Message::cant_do(order_id, request_id, Some(Payload::CantDo(Some(reason))));
MESSAGE_QUEUES
.queue_order_cantdo
.write()
.await
.push((message, destination_key));
}
pub async fn enqueue_restore_session_msg(payload: Option<Payload>, destination_key: PublicKey) {
let message = Message::new_restore(payload);
MESSAGE_QUEUES
.queue_restore_session_msg
.write()
.await
.push((message, destination_key));
}
pub async fn enqueue_order_msg(
request_id: Option<u64>,
order_id: Option<Uuid>,
action: Action,
payload: Option<Payload>,
destination_key: PublicKey,
trade_index: Option<i64>,
) {
let message = Message::new_order(order_id, request_id, trade_index, action, payload);
MESSAGE_QUEUES
.queue_order_msg
.write()
.await
.push((message, destination_key));
}
pub fn get_fiat_amount_requested(order: &Order, msg: &Message) -> Option<i64> {
if order.is_range_order() {
if let Some(amount_buyer) = msg.get_inner_message_kind().get_amount() {
info!("amount_buyer: {amount_buyer}");
match Some(amount_buyer) <= order.max_amount && Some(amount_buyer) >= order.min_amount {
true => Some(amount_buyer),
false => None,
}
} else {
None
}
} else {
Some(order.fiat_amount)
}
}
pub fn get_nostr_client() -> Result<&'static Client, MostroError> {
if let Some(client) = NOSTR_CLIENT.get() {
Ok(client)
} else {
Err(MostroInternalErr(ServiceError::NostrError(
"Client not initialized!".to_string(),
)))
}
}
pub async fn get_nostr_relays() -> Option<HashMap<RelayUrl, Relay>> {
if let Some(client) = NOSTR_CLIENT.get() {
Some(client.relays().await)
} else {
None
}
}
pub async fn get_dispute(msg: &Message, pool: &Pool<Sqlite>) -> Result<Dispute, MostroError> {
let dispute_msg = msg.get_inner_message_kind();
let dispute_id = dispute_msg
.id
.ok_or(MostroInternalErr(ServiceError::InvalidDisputeId))?;
let dispute = Dispute::by_id(pool, dispute_id)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
if let Some(dispute) = dispute {
Ok(dispute)
} else {
Err(MostroInternalErr(ServiceError::InvalidDisputeId))
}
}
pub async fn get_order(msg: &Message, pool: &Pool<Sqlite>) -> Result<Order, MostroError> {
let order_msg = msg.get_inner_message_kind();
let order_id = order_msg
.id
.ok_or(MostroInternalErr(ServiceError::InvalidOrderId))?;
let order = Order::by_id(pool, order_id)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
if let Some(order) = order {
Ok(order)
} else {
Err(MostroInternalErr(ServiceError::InvalidOrderId))
}
}
pub async fn get_user_orders_by_id(
pool: &Pool<Sqlite>,
orders: &[Uuid],
user_pubkey: &str,
) -> Result<Vec<Order>, MostroError> {
if orders.is_empty() {
return Ok(Vec::new());
}
let mut query_builder = QueryBuilder::new("SELECT * FROM orders WHERE id IN (");
{
let mut separated = query_builder.separated(", ");
for order_id in orders {
separated.push_bind(order_id);
}
}
query_builder.push(") AND (");
query_builder.push("master_buyer_pubkey = ");
query_builder.push_bind(user_pubkey);
query_builder.push(" OR master_seller_pubkey = ");
query_builder.push_bind(user_pubkey);
query_builder.push(")");
query_builder.push(" ORDER BY CASE id");
for (index, order_id) in orders.iter().enumerate() {
query_builder.push(" WHEN ");
query_builder.push_bind(order_id);
query_builder.push(" THEN ");
query_builder.push_bind(index as i64);
}
query_builder.push(" END");
let found_orders = query_builder
.build_query_as::<Order>()
.fetch_all(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
Ok(found_orders)
}
pub async fn validate_invoice(msg: &Message, order: &Order) -> Result<Option<String>, MostroError> {
let mut payment_request = None;
if let Some(pr) = msg.get_inner_message_kind().get_payment_request() {
let total_buyer_fees = order.fee;
if is_valid_invoice(
pr.clone(),
Some(order.amount as u64),
Some(total_buyer_fees as u64),
)
.await
.is_err()
{
return Err(MostroCantDo(CantDoReason::InvalidInvoice));
}
else {
payment_request = Some(pr);
}
}
Ok(payment_request)
}
pub async fn notify_taker_reputation(
pool: &Pool<Sqlite>,
order: &Order,
) -> Result<(), MostroError> {
let is_buy_order = order.is_buy_order().is_ok();
let user = match is_buy_order {
true => order.master_seller_pubkey.clone(),
false => order.master_buyer_pubkey.clone(),
};
let master_key = match user {
Some(user) => user.to_string(),
None => return Err(MostroCantDo(CantDoReason::InvalidPubkey)),
};
let reputation_data = match is_user_present(pool, master_key).await {
Ok(user) => {
let now = Timestamp::now().as_secs();
UserInfo {
rating: user.total_rating,
reviews: user.total_reviews,
operating_days: (now - user.created_at as u64) / 86400,
}
}
Err(_) => UserInfo {
rating: 0.0,
reviews: 0,
operating_days: 0,
},
};
let order_status = order.get_order_status().map_err(MostroInternalErr)?;
let (action, receiver) = match order_status {
Status::WaitingBuyerInvoice => {
if !is_buy_order {
(
Action::PayInvoice,
order.get_seller_pubkey().map_err(MostroInternalErr)?,
)
} else {
return Ok(());
}
}
Status::WaitingPayment => {
if is_buy_order {
(
Action::AddInvoice,
order.get_buyer_pubkey().map_err(MostroInternalErr)?,
)
} else {
return Err(MostroCantDo(CantDoReason::NotAllowedByStatus));
}
}
_ => {
return Err(MostroCantDo(CantDoReason::NotAllowedByStatus));
}
};
enqueue_order_msg(
None,
Some(order.id),
action,
Some(Payload::Peer(Peer {
pubkey: "".to_string(),
reputation: Some(reputation_data),
})),
receiver,
None,
)
.await;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bitcoin_price::BitcoinPriceManager;
use mostro_core::message::{Message, MessageKind};
use mostro_core::order::Order;
use sqlx::sqlite::SqlitePoolOptions;
use sqlx::SqlitePool;
use std::sync::Once;
use uuid::{uuid, Uuid};
static INIT: Once = Once::new();
fn initialize() {
INIT.call_once(|| {
});
}
#[test]
fn select_yadio_base_url_prefers_enabled_provider() {
assert_eq!(
select_yadio_base_url(
Some(("https://provider.example", true)),
"https://legacy.example"
),
"https://provider.example"
);
}
#[test]
fn select_yadio_base_url_strips_trailing_slash_like_provider_new() {
assert_eq!(
select_yadio_base_url(
Some(("https://api.yadio.io/", true)),
"https://legacy.example"
),
"https://api.yadio.io"
);
assert_eq!(
select_yadio_base_url(Some((" https://api.yadio.io// ", true)), "ignored"),
"https://api.yadio.io"
);
assert_eq!(
select_yadio_base_url(None, "https://legacy.example/"),
"https://legacy.example"
);
}
#[test]
fn select_yadio_base_url_falls_back_when_provider_unusable() {
let legacy = "https://legacy.example";
assert_eq!(
select_yadio_base_url(Some(("https://provider.example", false)), legacy),
legacy
);
assert_eq!(select_yadio_base_url(Some((" ", true)), legacy), legacy);
assert_eq!(select_yadio_base_url(Some(("/", true)), legacy), legacy);
assert_eq!(select_yadio_base_url(None, legacy), legacy);
}
async fn setup_orders_pool() -> SqlitePool {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect(":memory:")
.await
.unwrap();
sqlx::query(include_str!("../migrations/20221222153301_orders.sql"))
.execute(&pool)
.await
.unwrap();
sqlx::query(include_str!("../migrations/20251126120000_dev_fee.sql"))
.execute(&pool)
.await
.unwrap();
sqlx::query(include_str!(
"../migrations/20260530120000_cashu_escrow_fields.sql"
))
.execute(&pool)
.await
.unwrap();
pool
}
async fn insert_order(
pool: &SqlitePool,
id: Uuid,
identity_buyer_pubkey: Option<&str>,
identity_seller_pubkey: Option<&str>,
creator_pubkey: &str,
) {
sqlx::query(
r#"
INSERT INTO orders (
id,
kind,
event_id,
creator_pubkey,
status,
premium,
payment_method,
amount,
fiat_code,
fiat_amount,
created_at,
expires_at,
master_buyer_pubkey,
master_seller_pubkey
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"#,
)
.bind(id)
.bind("buy")
.bind(id.simple().to_string())
.bind(creator_pubkey)
.bind("active")
.bind(0_i64)
.bind("ln")
.bind(1_000_i64)
.bind("USD")
.bind(1_000_i64)
.bind(1_000_i64)
.bind(2_000_i64)
.bind(identity_buyer_pubkey)
.bind(identity_seller_pubkey)
.execute(pool)
.await
.unwrap();
}
#[test]
fn test_bytes_to_string() {
initialize();
let bytes = vec![0xde, 0xad, 0xbe, 0xef];
let result = bytes_to_string(&bytes);
assert_eq!(result, "deadbeef");
}
#[tokio::test]
async fn test_get_market_quote_url_construction() {
initialize();
let base_url = "https://api.yadio.io";
let fiat_amount = 1000;
let fiat_code = "USD";
let expected_url = format!("{}/convert/{}/{}/BTC", base_url, fiat_amount, fiat_code);
assert_eq!(expected_url, "https://api.yadio.io/convert/1000/USD/BTC");
let currencies_url = format!("{}/currencies", base_url);
assert_eq!(currencies_url, "https://api.yadio.io/currencies");
}
#[tokio::test]
async fn test_get_nostr_client_failure() {
initialize();
let client = NOSTR_CLIENT.get();
assert!(client.is_none());
}
#[tokio::test]
async fn test_get_nostr_client_success() {
initialize();
let client = Client::default();
NOSTR_CLIENT.set(client).unwrap();
let client_result = get_nostr_client();
assert!(client_result.is_ok());
}
#[test]
fn test_bytes_to_string_empty() {
initialize();
let bytes: Vec<u8> = vec![];
let result = bytes_to_string(&bytes);
assert_eq!(result, "");
}
#[tokio::test]
async fn test_send_dm() {
initialize();
let receiver_pubkey = Keys::generate().public_key();
let uuid = uuid!("308e1272-d5f4-47e6-bd97-3504baea9c23");
let message = Message::Order(MessageKind::new(
Some(uuid),
None,
None,
Action::FiatSent,
None,
));
let payload = message.as_json().unwrap();
let sender_keys = Keys::generate();
let result = send_dm(receiver_pubkey, &sender_keys, &payload, None).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_get_fiat_amount_requested() {
initialize();
let uuid = uuid!("308e1272-d5f4-47e6-bd97-3504baea9c23");
let order = Order {
amount: 1000,
min_amount: Some(500),
max_amount: Some(2000),
..Default::default()
};
let message = Message::Order(MessageKind::new(
Some(uuid),
Some(1),
Some(1),
Action::TakeSell,
Some(Payload::Amount(order.amount)),
));
let amount = get_fiat_amount_requested(&order, &message);
assert_eq!(amount, Some(1000));
}
#[tokio::test]
async fn test_get_user_orders_by_id_filters_and_preserves_order() {
initialize();
let pool = setup_orders_pool().await;
let user_pubkey = "a".repeat(64);
let other_pubkey = "b".repeat(64);
let first_id = Uuid::new_v4();
let second_id = Uuid::new_v4();
let third_id = Uuid::new_v4();
insert_order(
&pool,
first_id,
Some(&user_pubkey),
Some(&other_pubkey),
&user_pubkey,
)
.await;
insert_order(
&pool,
second_id,
Some(&other_pubkey),
Some(&user_pubkey),
&user_pubkey,
)
.await;
insert_order(
&pool,
third_id,
Some(&other_pubkey),
Some(&other_pubkey),
&other_pubkey,
)
.await;
let requested = vec![second_id, first_id, third_id];
let orders = get_user_orders_by_id(&pool, &requested, &user_pubkey)
.await
.unwrap();
assert_eq!(orders.len(), 2);
assert_eq!(orders[0].id, second_id);
assert_eq!(orders[1].id, first_id);
}
#[tokio::test]
async fn test_get_user_orders_by_id_empty_input() {
initialize();
let pool = setup_orders_pool().await;
let user_pubkey = "a".repeat(64);
let orders = get_user_orders_by_id(&pool, &[], &user_pubkey)
.await
.unwrap();
assert!(orders.is_empty());
}
#[test]
fn test_get_dev_fee_basic() {
let fee = calculate_dev_fee(1_000, 0.30);
assert_eq!(fee, 300);
}
#[test]
fn test_get_dev_fee_rounding() {
let fee = calculate_dev_fee(333, 0.30);
assert_eq!(fee, 100);
}
#[test]
fn test_get_dev_fee_zero() {
let fee = calculate_dev_fee(0, 0.30);
assert_eq!(fee, 0);
}
#[test]
fn test_get_dev_fee_tiny_amounts() {
let fee = calculate_dev_fee(1, 0.30);
assert_eq!(fee, 0);
}
#[test]
fn maker_bond_notional_uses_fixed_amount_directly() {
let order = Order {
amount: 50_000,
fiat_code: "USD".to_string(),
fiat_amount: 25,
..Default::default()
};
assert_eq!(maker_bond_notional_sats(&order).unwrap(), 50_000);
}
#[test]
fn maker_bond_notional_range_sizes_against_max_at_price() {
BitcoinPriceManager::set_price_for_test("T6RANGE", 60_000.0);
let order = Order {
amount: 0,
min_amount: Some(10),
max_amount: Some(100),
fiat_code: "T6RANGE".to_string(),
fiat_amount: 0,
..Default::default()
};
assert!(order.is_range_order());
assert_eq!(maker_bond_notional_sats(&order).unwrap(), 166_666);
}
#[test]
fn maker_bond_notional_range_rejects_non_positive_max() {
let order = Order {
amount: 0,
min_amount: Some(10),
max_amount: Some(0),
fiat_code: "T6ZERO".to_string(),
fiat_amount: 0,
..Default::default()
};
assert!(order.is_range_order());
let err = maker_bond_notional_sats(&order).unwrap_err();
assert!(
matches!(err, MostroInternalErr(ServiceError::UnexpectedError(_))),
"expected UnexpectedError for non-positive max_amount, got {err:?}"
);
}
}