use crate::config::settings::Settings;
use mostro_core::order::Kind as OrderKind;
use mostro_core::prelude::*;
use nostr_sdk::prelude::*;
use sqlx::pool::Pool;
use sqlx::sqlite::SqliteRow;
use sqlx::{Row, Sqlite, SqlitePool};
use std::fs::{set_permissions, Permissions};
use std::path::Path;
use std::sync::Arc;
use uuid::Uuid;
const EXCLUDED_ORDER_STATUSES: &str = "'expired','success','canceled','dispute','canceledbyadmin','completedbyadmin','settledbyadmin','cooperativelycanceled'";
const ACTIVE_DISPUTE_STATUSES: &str = "'initiated','in-progress'";
#[cfg(unix)]
use std::os::unix::fs::PermissionsExt;
async fn rebuild_disputes_table_without_tokens(pool: &SqlitePool) -> Result<(), MostroError> {
tracing::info!("Rebuilding disputes table without token columns (SQLite compatibility mode)");
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS disputes_temp (
id char(36) primary key not null,
order_id char(36) unique not null,
status varchar(10) not null,
order_previous_status varchar(10) not null,
solver_pubkey char(64),
created_at integer not null,
taken_at integer default 0
)
"#,
)
.execute(pool)
.await
.map_err(|e| {
MostroInternalErr(ServiceError::DbAccessError(format!(
"Failed to create temporary disputes table: {}",
e
)))
})?;
sqlx::query(
r#"
INSERT INTO disputes_temp (id, order_id, status, order_previous_status, solver_pubkey, created_at, taken_at)
SELECT id, order_id, status, order_previous_status, solver_pubkey, created_at, taken_at
FROM disputes
"#,
)
.execute(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(format!(
"Failed to copy data to temporary table: {}", e
))))?;
sqlx::query("DROP TABLE disputes")
.execute(pool)
.await
.map_err(|e| {
MostroInternalErr(ServiceError::DbAccessError(format!(
"Failed to drop original disputes table: {}",
e
)))
})?;
sqlx::query("ALTER TABLE disputes_temp RENAME TO disputes")
.execute(pool)
.await
.map_err(|e| {
MostroInternalErr(ServiceError::DbAccessError(format!(
"Failed to rename temporary table: {}",
e
)))
})?;
tracing::info!("Successfully rebuilt disputes table without token columns");
Ok(())
}
async fn migrate_remove_token_columns(pool: &SqlitePool) -> Result<(), MostroError> {
let buyer_token_exists = sqlx::query_scalar::<_, i32>(
r#"
SELECT COUNT(*)
FROM pragma_table_info('disputes')
WHERE name = 'buyer_token'
"#,
)
.fetch_one(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?
> 0;
let seller_token_exists = sqlx::query_scalar::<_, i32>(
r#"
SELECT COUNT(*)
FROM pragma_table_info('disputes')
WHERE name = 'seller_token'
"#,
)
.fetch_one(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?
> 0;
if !buyer_token_exists && !seller_token_exists {
tracing::debug!(
"No deprecated token columns found in disputes table - migration not needed"
);
return Ok(());
}
let sqlite_version = sqlx::query_scalar::<_, String>("SELECT sqlite_version()")
.fetch_one(pool)
.await
.map_err(|e| {
MostroInternalErr(ServiceError::DbAccessError(format!(
"Failed to get SQLite version: {}",
e
)))
})?;
tracing::info!("SQLite version: {}", sqlite_version);
let supports_drop_column = sqlite_version
.split('.')
.take(3)
.map(|v| v.parse::<u32>().unwrap_or(0))
.collect::<Vec<_>>()
.get(..3)
.map(|parts| {
let major = parts[0];
let minor = parts.get(1).copied().unwrap_or(0);
major > 3 || (major == 3 && minor >= 35)
})
.unwrap_or(false);
if supports_drop_column {
tracing::info!(
"Attempting to remove token columns using DROP COLUMN (SQLite {})...",
sqlite_version
);
let mut transaction = pool.begin().await.map_err(|e| {
MostroInternalErr(ServiceError::DbAccessError(format!(
"Failed to begin transaction: {}",
e
)))
})?;
let drop_result = async {
if buyer_token_exists {
sqlx::query("ALTER TABLE disputes DROP COLUMN buyer_token")
.execute(&mut *transaction)
.await?;
tracing::info!("Dropped buyer_token column");
}
if seller_token_exists {
sqlx::query("ALTER TABLE disputes DROP COLUMN seller_token")
.execute(&mut *transaction)
.await?;
tracing::info!("Dropped seller_token column");
}
Ok::<(), sqlx::Error>(())
}
.await;
match drop_result {
Ok(_) => {
transaction.commit().await.map_err(|e| {
MostroInternalErr(ServiceError::DbAccessError(format!(
"Failed to commit transaction: {}",
e
)))
})?;
tracing::info!("Successfully removed token columns using DROP COLUMN");
Ok(())
}
Err(e) => {
tracing::warn!("DROP COLUMN failed ({}), falling back to table rebuild", e);
transaction.rollback().await.map_err(|rollback_err| {
MostroInternalErr(ServiceError::DbAccessError(format!(
"Failed to rollback transaction: {}",
rollback_err
)))
})?;
rebuild_disputes_table_without_tokens(pool).await
}
}
} else {
tracing::info!(
"SQLite version {} doesn't support DROP COLUMN, using table rebuild method",
sqlite_version
);
rebuild_disputes_table_without_tokens(pool).await
}
}
async fn table_column_exists(
pool: &SqlitePool,
table_name: &str,
column_name: &str,
) -> Result<bool, MostroError> {
Ok(sqlx::query_scalar::<_, i32>(
r#"
SELECT COUNT(*)
FROM pragma_table_info(?1)
WHERE name = ?2
"#,
)
.bind(table_name)
.bind(column_name)
.fetch_one(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?
> 0)
}
fn parse_duplicate_column_name(err: &sqlx::migrate::MigrateError) -> Option<String> {
let error = err.to_string();
let marker = "duplicate column name: ";
let column = error.split(marker).nth(1)?.trim();
Some(column.to_string())
}
fn normalize_sql_identifier(token: &str) -> String {
token
.trim()
.trim_end_matches(',')
.trim_matches('"')
.trim_matches('`')
.trim_matches('[')
.trim_matches(']')
.to_string()
}
fn strip_sql_comments(sql: &str) -> String {
sql.lines()
.filter(|line| !line.trim_start().starts_with("--"))
.collect::<Vec<_>>()
.join("\n")
}
fn parse_add_column_statements(sql: &str) -> Option<Vec<(String, String)>> {
let sql = strip_sql_comments(sql);
let mut operations = Vec::new();
for statement in sql.split(';') {
let statement = statement.trim();
if statement.is_empty() {
continue;
}
let tokens: Vec<_> = statement.split_whitespace().collect();
if tokens.len() < 6
|| !tokens[0].eq_ignore_ascii_case("ALTER")
|| !tokens[1].eq_ignore_ascii_case("TABLE")
|| !tokens[3].eq_ignore_ascii_case("ADD")
|| !tokens[4].eq_ignore_ascii_case("COLUMN")
{
return None;
}
let table_name = normalize_sql_identifier(tokens[2]);
let column_name = normalize_sql_identifier(tokens[5]);
if table_name.is_empty() || column_name.is_empty() {
return None;
}
operations.push((table_name, column_name));
}
if operations.is_empty() {
None
} else {
Some(operations)
}
}
async fn applied_migration_versions(pool: &SqlitePool) -> Result<Vec<i64>, MostroError> {
sqlx::query_scalar::<_, i64>("SELECT version FROM _sqlx_migrations ORDER BY version")
.fetch_all(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))
}
async fn reconcile_existing_add_column_migration(
pool: &SqlitePool,
migrator: &sqlx::migrate::Migrator,
duplicate_column: &str,
) -> Result<bool, MostroError> {
let applied_versions = applied_migration_versions(pool).await?;
for migration in migrator.iter() {
if applied_versions.contains(&migration.version) {
continue;
}
let Some(operations) = parse_add_column_statements(&migration.sql) else {
continue;
};
if !operations
.iter()
.any(|(_, column)| column == duplicate_column)
{
continue;
}
let mut all_columns_exist = true;
for (table_name, column_name) in &operations {
if !table_column_exists(pool, table_name, column_name).await? {
all_columns_exist = false;
break;
}
}
if !all_columns_exist {
continue;
}
sqlx::query(
r#"
INSERT OR IGNORE INTO _sqlx_migrations (
version,
description,
success,
checksum,
execution_time
) VALUES (?1, ?2, TRUE, ?3, 0)
"#,
)
.bind(migration.version)
.bind(&*migration.description)
.bind(&*migration.checksum)
.execute(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
tracing::warn!(
version = migration.version,
description = %migration.description,
duplicate_column,
"Recorded existing add-column migration as already applied"
);
return Ok(true);
}
Ok(false)
}
pub async fn connect() -> Result<Arc<Pool<Sqlite>>, MostroError> {
let db_settings = Settings::get_db();
let db_url = &db_settings.url;
let tmp = db_url.replace("sqlite://", "");
let db_path = Path::new(&tmp);
let conn = if !db_path.exists() {
let _file = std::fs::File::create_new(db_path)
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
#[cfg(unix)]
{
set_permissions(db_path, Permissions::from_mode(0o600))
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
}
match SqlitePool::connect(db_url).await {
Ok(pool) => {
match sqlx::migrate!().run(&pool).await {
Ok(_) => {
tracing::info!(
"Successfully created database file at {}",
db_path.display(),
);
if let Err(e) = migrate_remove_token_columns(&pool).await {
tracing::error!("Failed to migrate token columns: {}", e);
if let Err(cleanup_err) = std::fs::remove_file(db_path) {
tracing::error!(
error = %cleanup_err,
path = %db_path.display(),
"Failed to clean up database file"
);
}
return Err(e);
}
pool
}
Err(e) => {
if let Err(cleanup_err) = std::fs::remove_file(db_path) {
tracing::error!(
error = %cleanup_err,
path = %db_path.display(),
"Failed to clean up database file"
);
}
return Err(MostroInternalErr(ServiceError::DbAccessError(
e.to_string(),
)));
}
}
}
Err(e) => {
tracing::error!(
error = %e,
path = %db_path.display(),
"Failed to create database connection"
);
return Err(MostroInternalErr(ServiceError::DbAccessError(
e.to_string(),
)));
}
}
} else {
let conn = SqlitePool::connect(db_url)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
let migrator = sqlx::migrate!();
if let Err(e) = migrator.run(&conn).await {
if let Some(duplicate_column) = parse_duplicate_column_name(&e) {
if reconcile_existing_add_column_migration(&conn, &migrator, &duplicate_column)
.await?
{
if let Err(e) = migrator.run(&conn).await {
tracing::error!("Failed to run migrations on existing database: {}", e);
return Err(MostroInternalErr(ServiceError::DbAccessError(
e.to_string(),
)));
}
} else {
tracing::error!("Failed to run migrations on existing database: {}", e);
return Err(MostroInternalErr(ServiceError::DbAccessError(
e.to_string(),
)));
}
} else {
tracing::error!("Failed to run migrations on existing database: {}", e);
return Err(MostroInternalErr(ServiceError::DbAccessError(
e.to_string(),
)));
}
}
if let Err(e) = migrate_remove_token_columns(&conn).await {
tracing::error!(
"Failed to migrate token columns on existing database: {}",
e
);
return Err(e);
}
conn
};
Ok(Arc::new(conn))
}
pub async fn get_admin_password(pool: &SqlitePool) -> Result<Option<String>, MostroError> {
if let Some(user) = sqlx::query_as::<_, User>(
r#"
SELECT *
FROM users
WHERE is_admin == 1
LIMIT 1
"#,
)
.fetch_optional(pool)
.await
.map_err(|_| {
MostroInternalErr(ServiceError::DbAccessError(
"Failed to get admin password".to_string(),
))
})? {
Ok(user.admin_password)
} else {
Ok(None)
}
}
pub async fn edit_pubkeys_order(pool: &SqlitePool, order: &Order) -> Result<Order, MostroError> {
let null_key = None::<String>;
let column_name = if let Ok(order_kind) = order.get_order_kind() {
match order_kind {
OrderKind::Buy => "seller_pubkey",
OrderKind::Sell => "buyer_pubkey",
}
} else {
return Err(MostroInternalErr(ServiceError::DbAccessError(
"Order kind not found".to_string(),
)));
};
let master_key_column = if column_name.contains("buyer") {
"master_buyer_pubkey"
} else {
"master_seller_pubkey"
};
let sql = format!(
"UPDATE orders SET {} = ?1, {} = ?2 WHERE id = ?3",
column_name, master_key_column
);
let result = sqlx::query(&sql)
.bind(null_key.clone())
.bind(null_key)
.bind(order.id)
.execute(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
if result.rows_affected() == 0 {
return Err(MostroInternalErr(ServiceError::DbAccessError(
"No order updated".to_string(),
)));
}
let order = sqlx::query_as::<_, Order>(
r#"
SELECT *
FROM orders
WHERE id = ?1
"#,
)
.bind(order.id)
.fetch_one(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
Ok(order)
}
pub async fn find_order_by_hash(pool: &SqlitePool, hash: &str) -> Result<Order, MostroError> {
let order = sqlx::query_as::<_, Order>(
r#"
SELECT *
FROM orders
WHERE hash = ?1
"#,
)
.bind(hash)
.fetch_one(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
Ok(order)
}
pub async fn find_order_by_date(pool: &SqlitePool) -> Result<Vec<Order>, MostroError> {
let expire_time = Timestamp::now();
let order = sqlx::query_as::<_, Order>(
r#"
SELECT *
FROM orders
WHERE expires_at < ?1
AND status IN ('pending', 'waiting-taker-bond')
"#,
)
.bind(expire_time.as_secs() as i64)
.fetch_all(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
Ok(order)
}
pub async fn find_order_by_seconds(pool: &SqlitePool) -> Result<Vec<Order>, MostroError> {
let mostro_settings = Settings::get_mostro();
let exp_seconds = mostro_settings.expiration_seconds as u64;
let expire_time = Timestamp::now() - exp_seconds;
let order = sqlx::query_as::<_, Order>(
r#"
SELECT *
FROM orders
WHERE taken_at < ?1 AND ( status == 'waiting-buyer-invoice' OR status == 'waiting-payment' )
"#,
)
.bind(expire_time.as_secs() as i64)
.fetch_all(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
Ok(order)
}
pub async fn find_dispute_by_order_id(
pool: &SqlitePool,
order_id: Uuid,
) -> Result<Dispute, MostroError> {
let dispute = sqlx::query_as::<_, Dispute>(
r#"
SELECT *
FROM disputes
WHERE order_id == ?1
"#,
)
.bind(order_id)
.fetch_one(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
Ok(dispute)
}
pub async fn update_order_to_initial_state(
pool: &SqlitePool,
order_id: Uuid,
amount: i64,
fee: i64,
dev_fee: i64,
) -> Result<bool, MostroError> {
let status = Status::Pending.to_string();
let hash: Option<String> = None;
let preimage: Option<String> = None;
let buyer_invoice: Option<String> = None;
let result = sqlx::query!(
r#"
UPDATE orders
SET
status = ?1,
amount = ?2,
fee = ?3,
dev_fee = ?4,
hash = ?5,
preimage = ?6,
buyer_invoice = ?7,
taken_at = ?8,
invoice_held_at = ?9
WHERE id = ?10
"#,
status,
amount,
fee,
dev_fee,
hash,
preimage,
buyer_invoice,
0,
0,
order_id,
)
.execute(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
let rows_affected = result.rows_affected();
Ok(rows_affected > 0)
}
pub async fn reset_order_taken_at_time(
pool: &SqlitePool,
order_id: Uuid,
) -> Result<bool, MostroError> {
let taken_at = 0;
let result = sqlx::query!(
r#"
UPDATE orders
SET
taken_at = ?1
WHERE id = ?2
"#,
taken_at,
order_id,
)
.execute(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
let rows_affected = result.rows_affected();
Ok(rows_affected > 0)
}
pub async fn update_order_invoice_held_at_time(
pool: &SqlitePool,
order_id: Uuid,
invoice_held_at: i64,
) -> Result<bool, MostroError> {
let result = sqlx::query!(
r#"
UPDATE orders
SET
invoice_held_at = ?1
WHERE id = ?2
"#,
invoice_held_at,
order_id,
)
.execute(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
let rows_affected = result.rows_affected();
Ok(rows_affected > 0)
}
pub async fn find_held_invoices(pool: &SqlitePool) -> Result<Vec<Order>, MostroError> {
let order = sqlx::query_as::<_, Order>(
r#"
SELECT *
FROM orders
WHERE invoice_held_at !=0 AND status == 'active'
"#,
)
.fetch_all(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
Ok(order)
}
pub async fn find_failed_payment(pool: &SqlitePool) -> Result<Vec<Order>, MostroError> {
let order = sqlx::query_as::<_, Order>(
r#"
SELECT *
FROM orders
WHERE failed_payment == true AND status == 'settled-hold-invoice'
"#,
)
.fetch_all(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
Ok(order)
}
pub async fn find_unpaid_dev_fees(pool: &SqlitePool) -> Result<Vec<Order>, MostroError> {
let orders = sqlx::query_as::<_, Order>(
r#"
SELECT *
FROM orders
WHERE (status = 'settled-hold-invoice' OR status = 'success')
AND dev_fee > 0
AND dev_fee_paid = 0
AND (dev_fee_payment_hash IS NULL OR dev_fee_payment_hash = '')
"#,
)
.fetch_all(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
Ok(orders)
}
pub async fn find_solver_pubkey(
pool: &SqlitePool,
solver_npub: String,
) -> Result<User, MostroError> {
let user = sqlx::query_as::<_, User>(
r#"
SELECT *
FROM users
WHERE pubkey == ?1 AND is_solver == true
LIMIT 1
"#,
)
.bind(solver_npub)
.fetch_one(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
Ok(user)
}
pub async fn is_user_present(pool: &SqlitePool, public_key: String) -> Result<User, MostroError> {
let user = sqlx::query_as::<_, User>(
r#"
SELECT *
FROM users
WHERE pubkey == ?1
LIMIT 1
"#,
)
.bind(public_key)
.fetch_one(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
Ok(user)
}
pub async fn add_new_user(pool: &SqlitePool, new_user: User) -> Result<String, MostroError> {
let created_at: Timestamp = Timestamp::now();
let _result = sqlx::query(
"
INSERT INTO users (pubkey, is_admin,admin_password, is_solver, is_banned, category, last_trade_index, total_reviews, total_rating, last_rating, max_rating, min_rating, created_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
",
)
.bind(new_user.pubkey.clone())
.bind(new_user.is_admin)
.bind(new_user.admin_password)
.bind(new_user.is_solver)
.bind(new_user.is_banned)
.bind(new_user.category)
.bind(new_user.last_trade_index)
.bind(new_user.total_reviews)
.bind(new_user.total_rating)
.bind(new_user.last_rating)
.bind(new_user.max_rating)
.bind(new_user.min_rating)
.bind(created_at.as_secs() as i64)
.execute(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
Ok(new_user.pubkey)
}
pub async fn update_user_trade_index(
pool: &SqlitePool,
public_key: String,
trade_index: i64,
) -> Result<bool, MostroError> {
if !public_key.chars().all(|c| c.is_ascii_hexdigit()) || public_key.len() != 64 {
return Err(MostroCantDo(CantDoReason::InvalidPubkey));
}
if trade_index < 0 {
return Err(MostroCantDo(CantDoReason::InvalidTradeIndex));
}
let result = sqlx::query!(
r#"
UPDATE users SET last_trade_index = ?1 WHERE pubkey = ?2
"#,
trade_index,
public_key,
)
.execute(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
let rows_affected = result.rows_affected();
Ok(rows_affected > 0)
}
pub async fn buyer_has_pending_order(
pool: &SqlitePool,
pubkey: String,
) -> Result<bool, MostroError> {
has_pending_order_with_status(pool, pubkey, "master_buyer_pubkey", "waiting-buyer-invoice")
.await
}
pub async fn seller_has_pending_order(
pool: &SqlitePool,
pubkey: String,
) -> Result<bool, MostroError> {
has_pending_order_with_status(pool, pubkey, "master_seller_pubkey", "waiting-payment").await
}
async fn has_pending_order_with_status(
pool: &SqlitePool,
pubkey: String,
master_key_field: &str,
status: &str,
) -> Result<bool, MostroError> {
if !pubkey.chars().all(|c| c.is_ascii_hexdigit()) || pubkey.len() != 64 {
return Err(MostroCantDo(CantDoReason::InvalidPubkey));
}
let exists = sqlx::query_scalar::<_, bool>(&format!(
"SELECT EXISTS (SELECT 1 FROM orders WHERE {} = ? AND status = ?)",
master_key_field
))
.bind(pubkey)
.bind(status)
.fetch_one(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
Ok(exists)
}
pub async fn update_user_rating(
pool: &SqlitePool,
public_key: String,
last_rating: i64,
min_rating: i64,
max_rating: i64,
total_reviews: i64,
total_rating: f64,
) -> Result<bool, MostroError> {
if !public_key.chars().all(|c| c.is_ascii_hexdigit()) || public_key.len() != 64 {
return Err(MostroCantDo(CantDoReason::InvalidPubkey));
}
if !(0..=5).contains(&last_rating) {
return Err(MostroCantDo(CantDoReason::InvalidRating));
}
if !(0..=5).contains(&min_rating) || !(0..=5).contains(&max_rating) {
return Err(MostroCantDo(CantDoReason::InvalidRating));
}
if MIN_RATING as i64 > last_rating || last_rating > MAX_RATING as i64 {
return Err(MostroCantDo(CantDoReason::InvalidRating));
}
if total_reviews < 0 {
return Err(MostroCantDo(CantDoReason::InvalidRating));
}
if total_rating < 0.0 || total_rating > (total_reviews * 5) as f64 {
return Err(MostroCantDo(CantDoReason::InvalidRating));
}
if !(min_rating <= last_rating && last_rating <= max_rating) {
return Err(MostroCantDo(CantDoReason::InvalidRating));
}
let result = sqlx::query!(
r#"
UPDATE users SET last_rating = ?1, min_rating = ?2, max_rating = ?3, total_reviews = ?4, total_rating = ?5 WHERE pubkey = ?6
"#,
last_rating,
min_rating,
max_rating,
total_reviews,
total_rating,
public_key,
)
.execute(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
let rows_affected = result.rows_affected();
Ok(rows_affected > 0)
}
pub async fn solver_has_write_permission(
pool: &SqlitePool,
solver_pubkey: &str,
order_id: Uuid,
) -> Result<bool, MostroError> {
let result = sqlx::query_scalar::<_, bool>(
r#"
SELECT EXISTS(
SELECT 1
FROM disputes d
INNER JOIN users u ON u.pubkey = d.solver_pubkey
WHERE d.solver_pubkey = ?1
AND d.order_id = ?2
AND u.is_solver = true
AND u.category = 2
)
"#,
)
.bind(solver_pubkey)
.bind(order_id)
.fetch_one(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
Ok(result)
}
pub async fn user_has_solver_write_permission(
pool: &SqlitePool,
pubkey: &str,
) -> Result<bool, MostroError> {
let result = sqlx::query_scalar::<_, bool>(
r#"
SELECT EXISTS(
SELECT 1
FROM users
WHERE pubkey = ?1
AND is_solver = true
AND category = 2
)
"#,
)
.bind(pubkey)
.fetch_one(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
Ok(result)
}
pub async fn is_assigned_solver(
pool: &SqlitePool,
solver_pubkey: &str,
order_id: Uuid,
) -> Result<bool, MostroError> {
tracing::info!(
"Solver_pubkey: {} assigned to order {}",
solver_pubkey,
order_id
);
let result = sqlx::query(
"SELECT EXISTS(SELECT 1 FROM disputes WHERE solver_pubkey = ? AND order_id = ?)",
)
.bind(solver_pubkey)
.bind(order_id)
.map(|row: SqliteRow| row.get(0))
.fetch_one(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
Ok(result)
}
pub async fn is_dispute_taken_by_admin(
pool: &SqlitePool,
order_id: Uuid,
admin_pubkey: &str,
) -> Result<bool, MostroError> {
let dispute = sqlx::query(
"SELECT solver_pubkey FROM disputes WHERE order_id = ? AND status = 'in-progress'",
)
.bind(order_id)
.fetch_optional(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
if let Some(row) = dispute {
if let Some(solver_pubkey) = row
.try_get::<Option<String>, _>("solver_pubkey")
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?
{
return Ok(solver_pubkey == admin_pubkey);
}
}
Ok(false)
}
pub async fn find_user_orders_by_master_key(
pool: &SqlitePool,
master_key: &str,
) -> Result<Vec<RestoredOrdersInfo>, MostroError> {
if !master_key.chars().all(|c| c.is_ascii_hexdigit()) || master_key.len() != 64 {
return Err(MostroCantDo(CantDoReason::InvalidPubkey));
}
let sql_query = format!(
r#"
SELECT id as order_id, trade_index_buyer as trade_index, status FROM orders
WHERE (master_buyer_pubkey = ?)
AND status NOT IN ({})
UNION ALL
SELECT id as order_id, trade_index_seller as trade_index, status FROM orders
WHERE (master_seller_pubkey = ?)
AND status NOT IN ({})
"#,
EXCLUDED_ORDER_STATUSES, EXCLUDED_ORDER_STATUSES
);
let orders = sqlx::query_as::<_, RestoredOrdersInfo>(&sql_query)
.bind(master_key)
.bind(master_key)
.fetch_all(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
Ok(orders)
}
pub async fn find_user_disputes_by_master_key(
pool: &SqlitePool,
master_key: &str,
) -> Result<Vec<RestoredDisputesInfo>, MostroError> {
if !master_key.chars().all(|c| c.is_ascii_hexdigit()) || master_key.len() != 64 {
return Err(MostroCantDo(CantDoReason::InvalidPubkey));
}
let sql_query = format!(
r#"
SELECT
d.id AS dispute_id,
d.order_id AS order_id,
COALESCE(
CASE
WHEN o.master_buyer_pubkey = ? THEN o.trade_index_buyer
WHEN o.master_seller_pubkey = ? THEN o.trade_index_seller
ELSE 0
END, 0
) AS trade_index,
d.status AS status,
CASE
WHEN o.buyer_dispute = 1 AND o.seller_dispute = 0 THEN 'buyer'
WHEN o.seller_dispute = 1 AND o.buyer_dispute = 0 THEN 'seller'
ELSE NULL
END AS initiator,
d.solver_pubkey AS solver_pubkey
FROM disputes d
JOIN orders o ON d.order_id = o.id
WHERE (o.master_buyer_pubkey = ? OR o.master_seller_pubkey = ?)
AND d.status IN ({})
"#,
ACTIVE_DISPUTE_STATUSES
);
let restore_disputes = sqlx::query_as::<_, RestoredDisputesInfo>(&sql_query)
.bind(master_key)
.bind(master_key)
.bind(master_key)
.bind(master_key)
.fetch_all(pool)
.await
.map_err(|e| MostroInternalErr(ServiceError::DbAccessError(e.to_string())))?;
Ok(restore_disputes)
}
async fn process_restore_session_work(
pool: SqlitePool,
master_key: String,
) -> Result<RestoreSessionInfo, MostroError> {
let restore_orders = find_user_orders_by_master_key(&pool, &master_key).await?;
let restore_disputes = find_user_disputes_by_master_key(&pool, &master_key).await?;
tracing::info!(
"Background restore session completed with {} orders, {} disputes",
restore_orders.len(),
restore_disputes.len()
);
Ok(RestoreSessionInfo {
restore_orders,
restore_disputes,
})
}
pub struct RestoreSessionManager {
sender: tokio::sync::mpsc::Sender<RestoreSessionInfo>,
receiver: tokio::sync::mpsc::Receiver<RestoreSessionInfo>,
}
impl Default for RestoreSessionManager {
fn default() -> Self {
Self::new()
}
}
impl RestoreSessionManager {
pub fn new() -> Self {
let (sender, receiver) = tokio::sync::mpsc::channel(10);
Self { sender, receiver }
}
pub async fn start_restore_session(
&self,
pool: SqlitePool,
master_key: String,
) -> Result<(), MostroError> {
let sender = self.sender.clone();
let handle = tokio::runtime::Handle::current();
tokio::task::spawn_blocking(move || {
match handle.block_on(process_restore_session_work(pool, master_key)) {
Ok(restore_data) => {
if let Err(e) = sender.blocking_send(restore_data) {
tracing::warn!(
"RestoreSessionManager: receiver dropped before sending result: {}",
e
);
}
}
Err(e) => {
tracing::error!("Failed to process restore session work: {}", e);
}
}
});
Ok(())
}
pub async fn check_results(&mut self) -> Option<RestoreSessionInfo> {
self.receiver.try_recv().ok()
}
pub async fn wait_for_result(&mut self) -> Option<RestoreSessionInfo> {
self.receiver.recv().await
}
}
#[cfg(test)]
mod tests {
use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
use sqlx::Error;
use std::collections::HashSet;
const TEST_DB_URL: &str = "sqlite::memory:";
async fn setup_db() -> Result<SqlitePool, Error> {
let pool = SqlitePoolOptions::new()
.max_connections(1) .connect(TEST_DB_URL)
.await?;
sqlx::query(
r#"
CREATE TABLE items (
id INTEGER PRIMARY KEY,
value TEXT NOT NULL
)
"#,
)
.execute(&pool)
.await?;
Ok(pool)
}
async fn setup_orders_db() -> Result<SqlitePool, Error> {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect(TEST_DB_URL)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS orders (
id char(36) primary key not null,
kind varchar(4) not null,
event_id char(64) not null,
hash char(64),
preimage char(64),
creator_pubkey char(64),
cancel_initiator_pubkey char(64),
dispute_initiator_pubkey char(64),
buyer_pubkey char(64),
master_buyer_pubkey char(64),
seller_pubkey char(64),
master_seller_pubkey char(64),
status varchar(50) not null,
price_from_api integer not null default 0,
premium integer not null,
payment_method varchar(500) not null,
amount integer not null,
min_amount integer default 0,
max_amount integer default 0,
buyer_dispute integer not null default 0,
seller_dispute integer not null default 0,
buyer_cooperativecancel integer not null default 0,
seller_cooperativecancel integer not null default 0,
fee integer not null default 0,
routing_fee integer not null default 0,
fiat_code varchar(5) not null,
fiat_amount integer not null,
buyer_invoice text,
range_parent_id char(36),
invoice_held_at integer default 0,
taken_at integer default 0,
created_at integer not null,
buyer_sent_rate integer default 0,
seller_sent_rate integer default 0,
payment_attempts integer default 0,
failed_payment integer default 0,
expires_at integer not null,
trade_index_seller integer default 0,
trade_index_buyer integer default 0,
next_trade_pubkey char(64),
next_trade_index integer default 0,
dev_fee integer default 0,
dev_fee_paid integer not null default 0,
dev_fee_payment_hash char(64)
)
"#,
)
.execute(&pool)
.await?;
Ok(pool)
}
async fn insert_test_order(
pool: &SqlitePool,
id: uuid::Uuid,
status: &str,
dev_fee: i64,
dev_fee_paid: bool,
dev_fee_payment_hash: Option<&str>,
) {
sqlx::query(
r#"
INSERT INTO orders (id, kind, event_id, status, premium, payment_method,
amount, fiat_code, fiat_amount, created_at, expires_at,
failed_payment, payment_attempts, dev_fee, dev_fee_paid,
dev_fee_payment_hash)
VALUES (?1, 'buy', 'event123', ?2, 0, 'lightning',
100000, 'USD', 100, 1700000000, 1700086400,
0, 0, ?3, ?4, ?5)
"#,
)
.bind(id)
.bind(status)
.bind(dev_fee)
.bind(dev_fee_paid)
.bind(dev_fee_payment_hash)
.execute(pool)
.await
.unwrap();
}
#[tokio::test]
async fn test_fetch_string_column_scalar() {
let pool = setup_db().await.unwrap();
let total_entries = 20;
let mut query_builder = String::from("INSERT INTO items (id, value) VALUES ");
let mut params: Vec<String> = Vec::new();
for i in 0..total_entries {
let value_string = format!("Entry {}", i % 5);
if i > 0 {
query_builder.push_str(", ");
}
query_builder.push_str(&format!("({}, ?)", i));
params.push(value_string);
}
let mut query = sqlx::query(&query_builder);
for param in ¶ms {
query = query.bind(param);
}
query.execute(&pool).await.unwrap();
let sql = "SELECT value FROM items ORDER BY id";
let fetched_values: Vec<String> = sqlx::query_scalar(sql).fetch_all(&pool).await.unwrap();
let hash_set_values: HashSet<String> = fetched_values.into_iter().collect();
assert!(
hash_set_values.contains("Entry 0"),
"Should contain Entry 0"
);
assert!(
hash_set_values.contains("Entry 1"),
"Should contain Entry 1"
);
assert!(
hash_set_values.contains("Entry 2"),
"Should contain Entry 2"
);
assert!(
hash_set_values.contains("Entry 3"),
"Should contain Entry 3"
);
assert!(
hash_set_values.contains("Entry 4"),
"Should contain Entry 4"
);
assert_eq!(
hash_set_values.len(),
5,
"Should have exactly 5 unique entries"
);
}
#[tokio::test]
async fn find_unpaid_dev_fees_returns_eligible_orders() {
let pool = setup_orders_db().await.unwrap();
let id1 = uuid::Uuid::new_v4();
let id2 = uuid::Uuid::new_v4();
insert_test_order(&pool, id1, "success", 100, false, None).await;
insert_test_order(&pool, id2, "settled-hold-invoice", 50, false, None).await;
let result = super::find_unpaid_dev_fees(&pool).await.unwrap();
assert_eq!(result.len(), 2, "Should find both eligible orders");
}
#[tokio::test]
async fn find_unpaid_dev_fees_excludes_already_paid() {
let pool = setup_orders_db().await.unwrap();
insert_test_order(&pool, uuid::Uuid::new_v4(), "success", 100, true, None).await;
let result = super::find_unpaid_dev_fees(&pool).await.unwrap();
assert!(result.is_empty(), "Should not return already-paid orders");
}
#[tokio::test]
async fn find_unpaid_dev_fees_excludes_orders_with_existing_hash() {
let pool = setup_orders_db().await.unwrap();
insert_test_order(
&pool,
uuid::Uuid::new_v4(),
"success",
100,
false,
Some("abc123hash"),
)
.await;
insert_test_order(
&pool,
uuid::Uuid::new_v4(),
"success",
100,
false,
Some("PENDING-uuid-123"),
)
.await;
let result = super::find_unpaid_dev_fees(&pool).await.unwrap();
assert!(
result.is_empty(),
"Should not return orders with existing payment hash"
);
}
#[tokio::test]
async fn find_unpaid_dev_fees_excludes_wrong_status() {
let pool = setup_orders_db().await.unwrap();
insert_test_order(&pool, uuid::Uuid::new_v4(), "active", 100, false, None).await;
insert_test_order(&pool, uuid::Uuid::new_v4(), "pending", 100, false, None).await;
insert_test_order(&pool, uuid::Uuid::new_v4(), "expired", 100, false, None).await;
let result = super::find_unpaid_dev_fees(&pool).await.unwrap();
assert!(
result.is_empty(),
"Should not return orders with non-eligible statuses"
);
}
#[tokio::test]
async fn find_unpaid_dev_fees_excludes_zero_dev_fee() {
let pool = setup_orders_db().await.unwrap();
insert_test_order(&pool, uuid::Uuid::new_v4(), "success", 0, false, None).await;
let result = super::find_unpaid_dev_fees(&pool).await.unwrap();
assert!(
result.is_empty(),
"Should not return orders with zero dev_fee"
);
}
#[tokio::test]
async fn find_unpaid_dev_fees_with_empty_hash_string() {
let pool = setup_orders_db().await.unwrap();
insert_test_order(&pool, uuid::Uuid::new_v4(), "success", 100, false, Some("")).await;
let result = super::find_unpaid_dev_fees(&pool).await.unwrap();
assert_eq!(
result.len(),
1,
"Empty string hash should be treated as no hash"
);
}
#[tokio::test]
async fn test_find_held_invoices_returns_active_with_held_at() {
let pool = setup_orders_db().await.unwrap();
let id = uuid::Uuid::new_v4();
sqlx::query(
r#"INSERT INTO orders (id, kind, event_id, status, premium, payment_method,
amount, fiat_code, fiat_amount, created_at, expires_at,
failed_payment, payment_attempts, dev_fee, dev_fee_paid,
invoice_held_at)
VALUES (?1, 'buy', 'ev1', 'active', 0, 'lightning',
100000, 'USD', 100, 1700000000, 1700086400,
0, 0, 0, 0, 1700001000)"#,
)
.bind(id)
.execute(&pool)
.await
.unwrap();
let result = super::find_held_invoices(&pool).await.unwrap();
assert_eq!(
result.len(),
1,
"Should find active order with held invoice"
);
}
#[tokio::test]
async fn test_find_held_invoices_ignores_non_active() {
let pool = setup_orders_db().await.unwrap();
sqlx::query(
r#"INSERT INTO orders (id, kind, event_id, status, premium, payment_method,
amount, fiat_code, fiat_amount, created_at, expires_at,
failed_payment, payment_attempts, dev_fee, dev_fee_paid,
invoice_held_at)
VALUES (?1, 'buy', 'ev1', 'pending', 0, 'lightning',
100000, 'USD', 100, 1700000000, 1700086400,
0, 0, 0, 0, 1700001000)"#,
)
.bind(uuid::Uuid::new_v4())
.execute(&pool)
.await
.unwrap();
let result = super::find_held_invoices(&pool).await.unwrap();
assert!(result.is_empty(), "Should not find non-active orders");
}
#[tokio::test]
async fn test_find_held_invoices_ignores_zero_held_at() {
let pool = setup_orders_db().await.unwrap();
sqlx::query(
r#"INSERT INTO orders (id, kind, event_id, status, premium, payment_method,
amount, fiat_code, fiat_amount, created_at, expires_at,
failed_payment, payment_attempts, dev_fee, dev_fee_paid,
invoice_held_at)
VALUES (?1, 'buy', 'ev1', 'active', 0, 'lightning',
100000, 'USD', 100, 1700000000, 1700086400,
0, 0, 0, 0, 0)"#,
)
.bind(uuid::Uuid::new_v4())
.execute(&pool)
.await
.unwrap();
let result = super::find_held_invoices(&pool).await.unwrap();
assert!(result.is_empty(), "Should not find orders with held_at = 0");
}
#[tokio::test]
async fn test_find_failed_payment_returns_matching() {
let pool = setup_orders_db().await.unwrap();
sqlx::query(
r#"INSERT INTO orders (id, kind, event_id, status, premium, payment_method,
amount, fiat_code, fiat_amount, created_at, expires_at,
failed_payment, payment_attempts, dev_fee, dev_fee_paid)
VALUES (?1, 'buy', 'ev1', 'settled-hold-invoice', 0, 'lightning',
100000, 'USD', 100, 1700000000, 1700086400,
1, 3, 0, 0)"#,
)
.bind(uuid::Uuid::new_v4())
.execute(&pool)
.await
.unwrap();
let result = super::find_failed_payment(&pool).await.unwrap();
assert_eq!(result.len(), 1, "Should find failed payment order");
}
#[tokio::test]
async fn test_find_failed_payment_ignores_non_failed() {
let pool = setup_orders_db().await.unwrap();
insert_test_order(
&pool,
uuid::Uuid::new_v4(),
"settled-hold-invoice",
0,
false,
None,
)
.await;
let result = super::find_failed_payment(&pool).await.unwrap();
assert!(result.is_empty(), "Should not find non-failed orders");
}
#[tokio::test]
async fn test_find_failed_payment_ignores_wrong_status() {
let pool = setup_orders_db().await.unwrap();
sqlx::query(
r#"INSERT INTO orders (id, kind, event_id, status, premium, payment_method,
amount, fiat_code, fiat_amount, created_at, expires_at,
failed_payment, payment_attempts, dev_fee, dev_fee_paid)
VALUES (?1, 'buy', 'ev1', 'active', 0, 'lightning',
100000, 'USD', 100, 1700000000, 1700086400,
1, 3, 0, 0)"#,
)
.bind(uuid::Uuid::new_v4())
.execute(&pool)
.await
.unwrap();
let result = super::find_failed_payment(&pool).await.unwrap();
assert!(
result.is_empty(),
"Should not find orders with wrong status"
);
}
#[tokio::test]
async fn test_find_order_by_hash_found() {
let pool = setup_orders_db().await.unwrap();
let id = uuid::Uuid::new_v4();
let hash = "abc123def456";
sqlx::query(
r#"INSERT INTO orders (id, kind, event_id, status, premium, payment_method,
amount, fiat_code, fiat_amount, created_at, expires_at,
failed_payment, payment_attempts, dev_fee, dev_fee_paid, hash)
VALUES (?1, 'buy', 'ev1', 'active', 0, 'lightning',
100000, 'USD', 100, 1700000000, 1700086400,
0, 0, 0, 0, ?2)"#,
)
.bind(id)
.bind(hash)
.execute(&pool)
.await
.unwrap();
let result = super::find_order_by_hash(&pool, hash).await;
assert!(result.is_ok(), "Should find order by hash");
}
#[tokio::test]
async fn test_find_order_by_hash_not_found() {
let pool = setup_orders_db().await.unwrap();
let result = super::find_order_by_hash(&pool, "nonexistent_hash").await;
assert!(result.is_err(), "Should error when hash not found");
}
#[tokio::test]
async fn test_find_order_by_date_includes_waiting_taker_bond() {
let pool = setup_orders_db().await.unwrap();
let now = nostr_sdk::Timestamp::now().as_secs() as i64;
let past = now - 3600;
let future = now + 3600;
async fn insert(pool: &SqlitePool, id: uuid::Uuid, status: &str, expires_at: i64) {
sqlx::query(
r#"INSERT INTO orders (id, kind, event_id, status, premium, payment_method,
amount, fiat_code, fiat_amount, created_at, expires_at,
failed_payment, payment_attempts, dev_fee, dev_fee_paid)
VALUES (?1, 'buy', 'ev', ?2, 0, 'lightning',
1000, 'USD', 10, ?3, ?4,
0, 0, 0, 0)"#,
)
.bind(id)
.bind(status)
.bind(expires_at - 3600)
.bind(expires_at)
.execute(pool)
.await
.unwrap();
}
let pending_expired = uuid::Uuid::new_v4();
let waiting_taker_bond_expired = uuid::Uuid::new_v4();
let pending_fresh = uuid::Uuid::new_v4();
let waiting_taker_bond_fresh = uuid::Uuid::new_v4();
let active_expired = uuid::Uuid::new_v4();
insert(&pool, pending_expired, "pending", past).await;
insert(
&pool,
waiting_taker_bond_expired,
"waiting-taker-bond",
past,
)
.await;
insert(&pool, pending_fresh, "pending", future).await;
insert(
&pool,
waiting_taker_bond_fresh,
"waiting-taker-bond",
future,
)
.await;
insert(&pool, active_expired, "active", past).await;
let expired = super::find_order_by_date(&pool).await.unwrap();
let ids: std::collections::HashSet<uuid::Uuid> = expired.iter().map(|o| o.id).collect();
assert!(
ids.contains(&pending_expired),
"expired Pending must be returned"
);
assert!(
ids.contains(&waiting_taker_bond_expired),
"expired WaitingTakerBond must be returned (Phase 1.5)"
);
assert!(
!ids.contains(&pending_fresh),
"non-expired Pending must NOT be returned"
);
assert!(
!ids.contains(&waiting_taker_bond_fresh),
"non-expired WaitingTakerBond must NOT be returned"
);
assert!(
!ids.contains(&active_expired),
"expired but non-pre-trade orders (e.g. active) must NOT be returned"
);
}
#[tokio::test]
async fn test_update_order_to_initial_state_resets_fields() {
let pool = setup_orders_db().await.unwrap();
let id = uuid::Uuid::new_v4();
sqlx::query(
r#"INSERT INTO orders (id, kind, event_id, status, premium, payment_method,
amount, fiat_code, fiat_amount, created_at, expires_at,
failed_payment, payment_attempts, dev_fee, dev_fee_paid,
hash, preimage, buyer_invoice, taken_at, invoice_held_at)
VALUES (?1, 'buy', 'ev1', 'active', 0, 'lightning',
100000, 'USD', 100, 1700000000, 1700086400,
0, 0, 500, 0,
'somehash', 'somepreimage', 'someinvoice', 1700001000, 1700002000)"#,
)
.bind(id)
.execute(&pool)
.await
.unwrap();
let result = super::update_order_to_initial_state(&pool, id, 50000, 250, 100).await;
assert!(result.is_ok());
assert!(result.unwrap(), "Should return true for existing order");
let status: (String,) = sqlx::query_as("SELECT status FROM orders WHERE id = ?1")
.bind(id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(status.0, "pending", "Status should be reset to pending");
let amounts: (i64, i64, i64) =
sqlx::query_as("SELECT amount, fee, dev_fee FROM orders WHERE id = ?1")
.bind(id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(amounts.0, 50000, "Amount should be updated");
assert_eq!(amounts.1, 250, "Fee should be updated");
assert_eq!(amounts.2, 100, "Dev fee should be updated");
let cleared: (Option<String>, Option<String>, Option<String>) =
sqlx::query_as("SELECT hash, preimage, buyer_invoice FROM orders WHERE id = ?1")
.bind(id)
.fetch_one(&pool)
.await
.unwrap();
assert!(cleared.0.is_none(), "Hash should be cleared");
assert!(cleared.1.is_none(), "Preimage should be cleared");
assert!(cleared.2.is_none(), "Buyer invoice should be cleared");
let times: (i64, i64) =
sqlx::query_as("SELECT taken_at, invoice_held_at FROM orders WHERE id = ?1")
.bind(id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(times.0, 0, "taken_at should be reset to 0");
assert_eq!(times.1, 0, "invoice_held_at should be reset to 0");
}
#[tokio::test]
async fn test_update_order_to_initial_state_nonexistent() {
let pool = setup_orders_db().await.unwrap();
let id = uuid::Uuid::new_v4();
let result = super::update_order_to_initial_state(&pool, id, 50000, 250, 100).await;
assert!(result.is_ok());
assert!(
!result.unwrap(),
"Should return false for nonexistent order"
);
}
async fn setup_users_db() -> Result<SqlitePool, Error> {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect(TEST_DB_URL)
.await?;
sqlx::query(
r#"CREATE TABLE IF NOT EXISTS users (
pubkey char(64) primary key not null,
is_admin integer not null default 0,
admin_password char(64),
is_solver integer not null default 0,
is_banned integer not null default 0,
category integer not null default 0,
last_trade_index integer not null default 0,
total_reviews integer not null default 0,
total_rating real not null default 0.0,
last_rating integer not null default 0,
max_rating integer not null default 0,
min_rating integer not null default 0,
created_at integer not null
)"#,
)
.execute(&pool)
.await?;
Ok(pool)
}
async fn insert_test_user(pool: &SqlitePool, pubkey: &str) {
sqlx::query("INSERT INTO users (pubkey, created_at) VALUES (?1, 1700000000)")
.bind(pubkey)
.execute(pool)
.await
.unwrap();
}
const VALID_PUBKEY: &str = "a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2";
#[tokio::test]
async fn test_update_user_trade_index_valid() {
let pool = setup_users_db().await.unwrap();
insert_test_user(&pool, VALID_PUBKEY).await;
let result = super::update_user_trade_index(&pool, VALID_PUBKEY.to_string(), 5).await;
assert!(result.is_ok());
assert!(result.unwrap(), "Should return true for existing user");
let idx: (i64,) = sqlx::query_as("SELECT last_trade_index FROM users WHERE pubkey = ?1")
.bind(VALID_PUBKEY)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(idx.0, 5);
}
#[tokio::test]
async fn test_update_user_trade_index_invalid_pubkey_short() {
let pool = setup_users_db().await.unwrap();
let result = super::update_user_trade_index(&pool, "abc123".to_string(), 5).await;
assert!(result.is_err(), "Should reject short pubkey");
}
#[tokio::test]
async fn test_update_user_trade_index_invalid_pubkey_non_hex() {
let pool = setup_users_db().await.unwrap();
let bad = "g1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2";
let result = super::update_user_trade_index(&pool, bad.to_string(), 5).await;
assert!(result.is_err(), "Should reject non-hex pubkey");
}
#[tokio::test]
async fn test_update_user_trade_index_negative() {
let pool = setup_users_db().await.unwrap();
insert_test_user(&pool, VALID_PUBKEY).await;
let result = super::update_user_trade_index(&pool, VALID_PUBKEY.to_string(), -1).await;
assert!(result.is_err(), "Should reject negative trade index");
}
#[tokio::test]
async fn test_update_user_trade_index_nonexistent_user() {
let pool = setup_users_db().await.unwrap();
let result = super::update_user_trade_index(&pool, VALID_PUBKEY.to_string(), 5).await;
assert!(result.is_ok());
assert!(!result.unwrap(), "Should return false for nonexistent user");
}
#[tokio::test]
async fn test_buyer_has_pending_order_true() {
let pool = setup_orders_db().await.unwrap();
sqlx::query(
r#"INSERT INTO orders (id, kind, event_id, status, premium, payment_method,
amount, fiat_code, fiat_amount, created_at, expires_at,
failed_payment, payment_attempts, dev_fee, dev_fee_paid,
master_buyer_pubkey)
VALUES (?1, 'buy', 'ev1', 'waiting-buyer-invoice', 0, 'lightning',
100000, 'USD', 100, 1700000000, 1700086400,
0, 0, 0, 0, ?2)"#,
)
.bind(uuid::Uuid::new_v4())
.bind(VALID_PUBKEY)
.execute(&pool)
.await
.unwrap();
let result = super::buyer_has_pending_order(&pool, VALID_PUBKEY.to_string()).await;
assert!(result.is_ok());
assert!(result.unwrap(), "Buyer should have pending order");
}
#[tokio::test]
async fn test_buyer_has_pending_order_false() {
let pool = setup_orders_db().await.unwrap();
sqlx::query(
r#"INSERT INTO orders (id, kind, event_id, status, premium, payment_method,
amount, fiat_code, fiat_amount, created_at, expires_at,
failed_payment, payment_attempts, dev_fee, dev_fee_paid,
master_buyer_pubkey)
VALUES (?1, 'buy', 'ev1', 'active', 0, 'lightning',
100000, 'USD', 100, 1700000000, 1700086400,
0, 0, 0, 0, ?2)"#,
)
.bind(uuid::Uuid::new_v4())
.bind(VALID_PUBKEY)
.execute(&pool)
.await
.unwrap();
let result = super::buyer_has_pending_order(&pool, VALID_PUBKEY.to_string()).await;
assert!(result.is_ok());
assert!(
!result.unwrap(),
"Buyer should NOT have pending order with wrong status"
);
}
#[tokio::test]
async fn test_seller_has_pending_order_true() {
let pool = setup_orders_db().await.unwrap();
sqlx::query(
r#"INSERT INTO orders (id, kind, event_id, status, premium, payment_method,
amount, fiat_code, fiat_amount, created_at, expires_at,
failed_payment, payment_attempts, dev_fee, dev_fee_paid,
master_seller_pubkey)
VALUES (?1, 'sell', 'ev1', 'waiting-payment', 0, 'lightning',
100000, 'USD', 100, 1700000000, 1700086400,
0, 0, 0, 0, ?2)"#,
)
.bind(uuid::Uuid::new_v4())
.bind(VALID_PUBKEY)
.execute(&pool)
.await
.unwrap();
let result = super::seller_has_pending_order(&pool, VALID_PUBKEY.to_string()).await;
assert!(result.is_ok());
assert!(result.unwrap(), "Seller should have pending order");
}
#[tokio::test]
async fn test_has_pending_order_invalid_pubkey() {
let pool = setup_orders_db().await.unwrap();
let result = super::buyer_has_pending_order(&pool, "not_hex".to_string()).await;
assert!(result.is_err(), "Should reject invalid pubkey");
}
#[tokio::test]
async fn test_update_user_rating_valid() {
let pool = setup_users_db().await.unwrap();
insert_test_user(&pool, VALID_PUBKEY).await;
let result =
super::update_user_rating(&pool, VALID_PUBKEY.to_string(), 4, 3, 5, 10, 40.0).await;
assert!(result.is_ok());
assert!(result.unwrap(), "Should update existing user rating");
let row: (i64, i64, i64, i64, f64) = sqlx::query_as(
"SELECT last_rating, min_rating, max_rating, total_reviews, total_rating FROM users WHERE pubkey = ?1"
)
.bind(VALID_PUBKEY)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, 4);
assert_eq!(row.1, 3);
assert_eq!(row.2, 5);
assert_eq!(row.3, 10);
assert!((row.4 - 40.0).abs() < f64::EPSILON);
}
#[tokio::test]
async fn test_update_user_rating_invalid_pubkey() {
let pool = setup_users_db().await.unwrap();
let result = super::update_user_rating(&pool, "short".to_string(), 4, 3, 5, 10, 40.0).await;
assert!(result.is_err(), "Should reject invalid pubkey");
}
#[tokio::test]
async fn test_update_user_rating_out_of_range() {
let pool = setup_users_db().await.unwrap();
insert_test_user(&pool, VALID_PUBKEY).await;
let result =
super::update_user_rating(&pool, VALID_PUBKEY.to_string(), 6, 3, 5, 10, 40.0).await;
assert!(result.is_err(), "Should reject rating > 5");
let result =
super::update_user_rating(&pool, VALID_PUBKEY.to_string(), -1, 3, 5, 10, 40.0).await;
assert!(result.is_err(), "Should reject negative rating");
}
#[tokio::test]
async fn test_update_user_rating_negative_reviews() {
let pool = setup_users_db().await.unwrap();
insert_test_user(&pool, VALID_PUBKEY).await;
let result =
super::update_user_rating(&pool, VALID_PUBKEY.to_string(), 4, 3, 5, -1, 40.0).await;
assert!(result.is_err(), "Should reject negative total_reviews");
}
#[tokio::test]
async fn test_update_user_rating_total_exceeds_max() {
let pool = setup_users_db().await.unwrap();
insert_test_user(&pool, VALID_PUBKEY).await;
let result =
super::update_user_rating(&pool, VALID_PUBKEY.to_string(), 4, 3, 5, 2, 11.0).await;
assert!(result.is_err(), "Should reject total_rating > reviews * 5");
}
#[tokio::test]
async fn test_update_user_rating_min_gt_last() {
let pool = setup_users_db().await.unwrap();
insert_test_user(&pool, VALID_PUBKEY).await;
let result =
super::update_user_rating(&pool, VALID_PUBKEY.to_string(), 2, 3, 5, 10, 40.0).await;
assert!(result.is_err(), "Should reject min_rating > last_rating");
}
#[tokio::test]
async fn test_update_user_rating_last_gt_max() {
let pool = setup_users_db().await.unwrap();
insert_test_user(&pool, VALID_PUBKEY).await;
let result =
super::update_user_rating(&pool, VALID_PUBKEY.to_string(), 5, 3, 4, 10, 40.0).await;
assert!(result.is_err(), "Should reject last_rating > max_rating");
}
#[tokio::test]
async fn test_reset_order_taken_at_time() {
let pool = setup_orders_db().await.unwrap();
let id = uuid::Uuid::new_v4();
sqlx::query(
r#"INSERT INTO orders (id, kind, event_id, status, premium, payment_method,
amount, fiat_code, fiat_amount, created_at, expires_at,
failed_payment, payment_attempts, dev_fee, dev_fee_paid,
taken_at)
VALUES (?1, 'buy', 'ev1', 'active', 0, 'lightning',
100000, 'USD', 100, 1700000000, 1700086400,
0, 0, 0, 0, 1700005000)"#,
)
.bind(id)
.execute(&pool)
.await
.unwrap();
let result = super::reset_order_taken_at_time(&pool, id).await;
assert!(result.is_ok());
let row: (i64,) = sqlx::query_as("SELECT taken_at FROM orders WHERE id = ?1")
.bind(id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, 0, "taken_at should be reset to 0");
}
#[tokio::test]
async fn test_update_order_invoice_held_at_time() {
let pool = setup_orders_db().await.unwrap();
let id = uuid::Uuid::new_v4();
sqlx::query(
r#"INSERT INTO orders (id, kind, event_id, status, premium, payment_method,
amount, fiat_code, fiat_amount, created_at, expires_at,
failed_payment, payment_attempts, dev_fee, dev_fee_paid,
invoice_held_at)
VALUES (?1, 'buy', 'ev1', 'active', 0, 'lightning',
100000, 'USD', 100, 1700000000, 1700086400,
0, 0, 0, 0, 0)"#,
)
.bind(id)
.execute(&pool)
.await
.unwrap();
let result = super::update_order_invoice_held_at_time(&pool, id, 1700005000).await;
assert!(result.is_ok());
let row: (i64,) = sqlx::query_as("SELECT invoice_held_at FROM orders WHERE id = ?1")
.bind(id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(
row.0, 1700005000,
"invoice_held_at should be set to provided value"
);
}
}