pub mod context;
pub mod add_invoice; pub mod admin_add_solver; pub mod admin_cancel; pub mod admin_settle; pub mod admin_take_dispute; pub mod bond; pub mod cancel; pub mod dev_fee; pub mod dispute; pub mod fiat_sent; pub mod last_trade_index;
pub mod order; pub mod orders; pub mod rate_user; pub mod release; pub mod restore_session; pub mod take_buy; pub mod take_sell; pub mod trade_pubkey;
use crate::app::add_invoice::add_invoice_action;
use crate::app::admin_add_solver::admin_add_solver_action;
use crate::app::admin_cancel::admin_cancel_action;
use crate::app::admin_settle::admin_settle_action;
use crate::app::admin_take_dispute::admin_take_dispute_action;
use crate::app::bond::add_bond_invoice_action;
use crate::app::cancel::cancel_action;
use crate::app::context::AppContext;
use crate::app::dispute::dispute_action;
use crate::app::fiat_sent::fiat_sent_action;
use crate::app::last_trade_index::last_trade_index;
use crate::app::order::order_action;
use crate::app::orders::orders_action;
use crate::app::rate_user::update_user_reputation_action;
use crate::app::release::release_action;
use crate::app::restore_session::restore_session_action;
use crate::app::take_buy::take_buy_action;
use crate::app::take_sell::take_sell_action;
use crate::app::trade_pubkey::trade_pubkey_action;
use crate::db::add_new_user;
use crate::db::is_user_present;
use crate::lightning::LndConnector;
use crate::util::enqueue_cant_do_msg;
use mostro_core::error::CantDoReason;
use mostro_core::error::MostroError;
use mostro_core::error::ServiceError;
use mostro_core::message::{Action, Message};
use mostro_core::nip59::{unwrap_message, UnwrappedMessage};
use mostro_core::user::User;
use nostr_sdk::prelude::*;
fn warning_msg(action: &Action, err: ServiceError) {
let message = match &err {
ServiceError::EnvVarError(message) => message.to_string(),
ServiceError::EncryptionError(message) => message.to_string(),
ServiceError::DecryptionError(message) => message.to_string(),
ServiceError::IOError(message) => message.to_string(),
ServiceError::UnexpectedError(message) => message.to_string(),
ServiceError::LnNodeError(message) => message.to_string(),
ServiceError::LnPaymentError(message) => message.to_string(),
ServiceError::DbAccessError(message) => message.to_string(),
ServiceError::NostrError(message) => message.to_string(),
ServiceError::HoldInvoiceError(message) => message.to_string(),
_ => "No message".to_string(),
};
tracing::warn!(
"Error in {} with context {} - inner message {}",
action,
err,
message
);
}
async fn manage_errors(
e: MostroError,
inner_message: Message,
event: UnwrappedMessage,
action: &Action,
) {
match e {
MostroError::MostroCantDo(cause) => {
enqueue_cant_do_msg(
inner_message.get_inner_message_kind().request_id,
inner_message.get_inner_message_kind().id,
cause,
event.sender,
)
.await
}
MostroError::MostroInternalErr(e) => warning_msg(action, e),
}
}
async fn check_trade_index(
ctx: &AppContext,
event: &UnwrappedMessage,
msg: &Message,
) -> Result<(), MostroError> {
let pool = ctx.pool();
let message_kind = msg.get_inner_message_kind();
if !matches!(
message_kind.action,
Action::NewOrder | Action::TakeBuy | Action::TakeSell
) {
return Ok(());
}
match is_user_present(pool, event.identity.to_string()).await {
Ok(user) => {
if let index @ 1.. = message_kind.trade_index() {
let sig = event.signature.ok_or_else(|| {
tracing::error!("Trade-index message missing inner signature");
MostroError::MostroCantDo(CantDoReason::InvalidSignature)
})?;
if index <= user.last_trade_index {
tracing::info!("Invalid trade index");
manage_errors(
MostroError::MostroCantDo(CantDoReason::InvalidTradeIndex),
msg.clone(),
event.clone(),
&message_kind.action,
)
.await;
return Err(MostroError::MostroCantDo(CantDoReason::InvalidTradeIndex));
}
let msg_json = match msg.as_json() {
Ok(m) => m,
Err(e) => {
tracing::error!(
"Failed to serialize message for signature verification: {}",
e
);
return Err(MostroError::MostroInternalErr(
ServiceError::MessageSerializationError,
));
}
};
if !Message::verify_signature(msg_json, event.sender, sig) {
tracing::info!("Invalid signature");
return Err(MostroError::MostroCantDo(CantDoReason::InvalidSignature));
}
}
Ok(())
}
Err(_) => {
if let Some(last_trade_index) = message_kind.trade_index {
if last_trade_index == 0 {
return Err(MostroError::MostroCantDo(CantDoReason::InvalidTradeIndex));
}
if event.identity != event.sender {
let new_user: User = User {
pubkey: event.identity.to_string(),
last_trade_index,
..Default::default()
};
if let Err(e) = add_new_user(pool, new_user).await {
tracing::error!("Error creating new user: {}", e);
return Err(MostroError::MostroCantDo(CantDoReason::CantCreateUser));
}
}
}
Ok(())
}
}
}
async fn handle_message_action_no_ln(
action: &Action,
msg: Message,
event: &UnwrappedMessage,
my_keys: &Keys,
ctx: &AppContext,
) -> Result<()> {
match action {
Action::NewOrder => order_action(ctx, msg, event, my_keys)
.await
.map_err(|e| e.into()),
Action::TakeSell => take_sell_action(ctx, msg, event, my_keys)
.await
.map_err(|e| e.into()),
Action::TakeBuy => take_buy_action(ctx, msg, event, my_keys)
.await
.map_err(|e| e.into()),
Action::FiatSent => fiat_sent_action(ctx, msg, event, my_keys)
.await
.map_err(|e| e.into()),
Action::AddInvoice => add_invoice_action(ctx, msg, event, my_keys)
.await
.map_err(|e| e.into()),
Action::AddBondInvoice => add_bond_invoice_action(ctx, msg, event, my_keys)
.await
.map_err(|e| e.into()),
Action::PayInvoice => Err(MostroError::MostroCantDo(CantDoReason::InvalidAction).into()),
Action::LastTradeIndex => last_trade_index(ctx, msg, event, my_keys)
.await
.map_err(|e| e.into()),
Action::Dispute => dispute_action(ctx, msg, event, my_keys)
.await
.map_err(|e| e.into()),
Action::RateUser => update_user_reputation_action(ctx, msg, event, my_keys)
.await
.map_err(|e| e.into()),
Action::AdminAddSolver => admin_add_solver_action(ctx, msg, event, my_keys)
.await
.map_err(|e| e.into()),
Action::AdminTakeDispute => admin_take_dispute_action(ctx, msg, event, my_keys)
.await
.map_err(|e| e.into()),
Action::TradePubkey => trade_pubkey_action(ctx, msg, event)
.await
.map_err(|e| e.into()),
Action::RestoreSession => restore_session_action(ctx, event)
.await
.map_err(|e| e.into()),
Action::Orders => orders_action(ctx, msg, event).await.map_err(|e| e.into()),
_ => {
tracing::info!("Received message with action {:?}", action);
Ok(())
}
}
}
async fn handle_message_action(
action: &Action,
msg: Message,
event: &UnwrappedMessage,
my_keys: &Keys,
ln_client: &mut LndConnector,
ctx: &AppContext,
) -> Result<()> {
match action {
Action::Release => release_action(ctx, msg, event, my_keys, ln_client)
.await
.map_err(|e| e.into()),
Action::Cancel => cancel_action(ctx, msg, event, my_keys, ln_client)
.await
.map_err(|e| e.into()),
Action::AdminCancel => admin_cancel_action(ctx, msg, event, my_keys, ln_client)
.await
.map_err(|e| e.into()),
Action::AdminSettle => admin_settle_action(ctx, msg, event, my_keys, ln_client)
.await
.map_err(|e| e.into()),
_ => handle_message_action_no_ln(action, msg, event, my_keys, ctx).await,
}
}
pub async fn run(ctx: AppContext, ln_client: &mut LndConnector) -> Result<()> {
let my_keys = ctx.keys();
let client = ctx.nostr_client();
let pow = ctx.settings().mostro.pow;
loop {
let mut notifications = client.notifications();
while let Ok(notification) = notifications.recv().await {
if let RelayPoolNotification::Event { event, .. } = notification {
if !event.check_pow(pow) {
tracing::info!("Not POW verified event!");
continue;
}
if let Kind::GiftWrap = event.kind {
if event.verify().is_err() {
tracing::warn!("Error in event verification")
};
let unwrapped = match unwrap_message(&event, my_keys).await {
Ok(Some(u)) => u,
Ok(None) => continue,
Err(e) => {
tracing::warn!("Error unwrapping NIP-59 message: {}", e);
continue;
}
};
let since_time = chrono::Utc::now()
.checked_sub_signed(chrono::Duration::seconds(10))
.unwrap()
.timestamp() as u64;
if unwrapped.created_at.as_secs() < since_time {
continue;
}
let message = unwrapped.message.clone();
if unwrapped.identity != unwrapped.sender && unwrapped.signature.is_none() {
tracing::warn!(
"Missing inner signature: identity {} differs from trade key {}",
unwrapped.identity,
unwrapped.sender
);
continue;
}
let inner_message = message.get_inner_message_kind();
if let Err(e) = check_trade_index(&ctx, &unwrapped, &message).await {
tracing::warn!("Error checking trade index: {}", e);
continue;
}
if inner_message.verify() {
if let Some(action) = message.inner_action() {
if let Err(e) = handle_message_action(
&action,
message.clone(),
&unwrapped,
my_keys,
ln_client,
&ctx,
)
.await
{
match e.downcast::<MostroError>() {
Ok(err) => {
manage_errors(*err, message, unwrapped, &action).await;
}
Err(e) => {
tracing::error!("Unexpected error type: {}", e);
warning_msg(
&action,
ServiceError::UnexpectedError(e.to_string()),
);
}
}
}
}
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use mostro_core::message::Action;
use nostr_sdk::secp256k1::schnorr::Signature;
use nostr_sdk::{Keys, Kind as NostrKind, Timestamp};
fn create_test_keys() -> Keys {
Keys::generate()
}
fn create_test_message(action: Action, trade_index: Option<u32>) -> Message {
Message::new_order(
Some(uuid::Uuid::new_v4()),
Some(1),
trade_index.map(|i| i as i64),
action,
None, )
}
fn create_test_unwrapped_message() -> UnwrappedMessage {
let identity = create_test_keys();
let trade = create_test_keys();
UnwrappedMessage {
message: create_test_message(Action::NewOrder, None),
signature: None,
sender: trade.public_key(),
identity: identity.public_key(),
created_at: Timestamp::now(),
}
}
#[test]
fn test_warning_msg_all_error_types() {
let action = Action::NewOrder;
warning_msg(&action, ServiceError::EnvVarError("env error".to_string()));
warning_msg(
&action,
ServiceError::EncryptionError("encryption error".to_string()),
);
warning_msg(
&action,
ServiceError::DecryptionError("decryption error".to_string()),
);
warning_msg(&action, ServiceError::IOError("io error".to_string()));
warning_msg(
&action,
ServiceError::UnexpectedError("unexpected error".to_string()),
);
warning_msg(
&action,
ServiceError::LnNodeError("ln node error".to_string()),
);
warning_msg(
&action,
ServiceError::LnPaymentError("ln payment error".to_string()),
);
warning_msg(
&action,
ServiceError::DbAccessError("db access error".to_string()),
);
warning_msg(&action, ServiceError::NostrError("nostr error".to_string()));
warning_msg(
&action,
ServiceError::HoldInvoiceError("hold invoice error".to_string()),
);
warning_msg(&action, ServiceError::MessageSerializationError);
}
#[tokio::test]
async fn test_manage_errors_cant_do() {
let message = create_test_message(Action::NewOrder, None);
let event = create_test_unwrapped_message();
let action = Action::NewOrder;
let error = MostroError::MostroCantDo(CantDoReason::InvalidSignature);
manage_errors(error, message, event, &action).await;
}
#[tokio::test]
async fn test_manage_errors_internal_error() {
let message = create_test_message(Action::NewOrder, None);
let event = create_test_unwrapped_message();
let action = Action::NewOrder;
let error =
MostroError::MostroInternalErr(ServiceError::UnexpectedError("test error".to_string()));
manage_errors(error, message, event, &action).await;
}
mod check_trade_index_tests {
use super::*;
use crate::app::context::test_utils::{test_settings, TestContextBuilder};
use sqlx::SqlitePool;
use std::sync::Arc;
async fn create_test_ctx() -> AppContext {
let pool = Arc::new(SqlitePool::connect(":memory:").await.unwrap());
TestContextBuilder::new()
.with_pool(pool)
.with_settings(test_settings())
.build()
}
#[tokio::test]
async fn test_check_trade_index_non_trading_action() {
let ctx = create_test_ctx().await;
let event = create_test_unwrapped_message();
let message = create_test_message(Action::FiatSent, None);
let result = check_trade_index(&ctx, &event, &message).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_check_trade_index_trading_action_no_index() {
let ctx = create_test_ctx().await;
let event = create_test_unwrapped_message();
let message = create_test_message(Action::NewOrder, None);
let result = check_trade_index(&ctx, &event, &message).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_check_trade_index_with_valid_index() {
let ctx = create_test_ctx().await;
let event = create_test_unwrapped_message();
let message = create_test_message(Action::NewOrder, Some(1));
let result = check_trade_index(&ctx, &event, &message).await;
assert!(result.is_ok() || result.is_err());
}
}
mod handle_message_action_tests {
use super::*;
use crate::app::context::test_utils::{test_settings, TestContextBuilder};
use sqlx::SqlitePool;
use std::sync::Arc;
fn create_restore_session_message() -> Message {
Message::new_restore(None)
}
#[tokio::test]
async fn routes_last_trade_index_to_handler_and_propagates_error() {
let pool = Arc::new(SqlitePool::connect("sqlite::memory:").await.unwrap());
sqlx::migrate!("./migrations")
.run(pool.as_ref())
.await
.unwrap();
let ctx = TestContextBuilder::new()
.with_pool(pool)
.with_settings(test_settings())
.build();
let my_keys = create_test_keys();
let event = create_test_unwrapped_message();
let msg = create_test_message(Action::LastTradeIndex, None);
let result =
handle_message_action_no_ln(&Action::LastTradeIndex, msg, &event, &my_keys, &ctx)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn routes_restore_session_to_handler_and_returns_ok() {
let pool = Arc::new(SqlitePool::connect("sqlite::memory:").await.unwrap());
sqlx::migrate!("./migrations")
.run(pool.as_ref())
.await
.unwrap();
let ctx = TestContextBuilder::new()
.with_pool(pool)
.with_settings(test_settings())
.build();
let my_keys = create_test_keys();
let event = create_test_unwrapped_message();
let msg = create_restore_session_message();
let result =
handle_message_action_no_ln(&Action::RestoreSession, msg, &event, &my_keys, &ctx)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn routes_orders_to_handler_and_propagates_error() {
let pool = Arc::new(SqlitePool::connect("sqlite::memory:").await.unwrap());
sqlx::migrate!("./migrations")
.run(pool.as_ref())
.await
.unwrap();
let ctx = TestContextBuilder::new()
.with_pool(pool)
.with_settings(test_settings())
.build();
let my_keys = create_test_keys();
let event = create_test_unwrapped_message();
let msg = create_test_message(Action::Orders, None);
let result =
handle_message_action_no_ln(&Action::Orders, msg, &event, &my_keys, &ctx).await;
assert!(result.is_err());
}
#[tokio::test]
async fn routes_payinvoice_to_typed_invalid_action_error() {
let pool = Arc::new(SqlitePool::connect("sqlite::memory:").await.unwrap());
sqlx::migrate!("./migrations")
.run(pool.as_ref())
.await
.unwrap();
let ctx = TestContextBuilder::new()
.with_pool(pool)
.with_settings(test_settings())
.build();
let my_keys = create_test_keys();
let event = create_test_unwrapped_message();
let msg = create_test_message(Action::PayInvoice, None);
let result =
handle_message_action_no_ln(&Action::PayInvoice, msg, &event, &my_keys, &ctx).await;
assert!(matches!(
result,
Err(e)
if e.downcast_ref::<MostroError>()
== Some(&MostroError::MostroCantDo(CantDoReason::InvalidAction))
));
}
}
mod message_validation_tests {
use super::*;
#[test]
fn test_signature_verification_logic() {
let keys = create_test_keys();
let sender_keys = create_test_keys();
let sender_matches_rumor = keys.public_key() == keys.public_key();
assert!(sender_matches_rumor);
let sender_differs = sender_keys.public_key() != keys.public_key();
assert!(sender_differs);
}
#[test]
fn test_timestamp_validation() {
let current_time = chrono::Utc::now().timestamp() as u64;
let old_time = current_time - 20; let recent_time = current_time - 5;
let since_time = chrono::Utc::now()
.checked_sub_signed(chrono::Duration::seconds(10))
.unwrap()
.timestamp() as u64;
assert!(old_time < since_time);
assert!(recent_time >= since_time);
}
#[test]
fn test_pow_verification_logic() {
let meets_pow = true; let fails_pow = false;
assert!(meets_pow);
assert!(!fails_pow);
}
}
mod event_processing_tests {
use super::*;
#[test]
fn test_gift_wrap_processing_structure() {
let kind = NostrKind::GiftWrap;
match kind {
NostrKind::GiftWrap => {
}
_ => unreachable!("Only GiftWrap events are considered in this test scope"),
}
}
#[test]
fn test_message_parsing_structure() {
let test_content = r#"[{"order":{"version":1,"request_id":1,"trade_index":null,"id":"550e8400-e29b-41d4-a716-446655440000","action":"new-order","payload":null}}, null]"#;
let result = serde_json::from_str::<(Message, Option<Signature>)>(test_content);
match result {
Ok((message, signature)) => {
assert!(signature.is_none());
if let Message::Order(_) = message {}
}
Err(_) => {
}
}
}
}
}