use std::collections::HashMap;
use std::ops::Deref;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use bitcoin::hashes::{sha256, Hash};
use bitcoin::secp256k1::{PublicKey, Secp256k1};
use bitcoin::Transaction;
use chrono::Utc;
use lightning::events::HTLCHandlingFailureType;
use lightning::ln::channelmanager::{InterceptId, MIN_FINAL_CLTV_EXPIRY_DELTA};
use lightning::ln::msgs::SocketAddress;
use lightning::ln::types::ChannelId;
use lightning::routing::router::{RouteHint, RouteHintHop};
use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, InvoiceBuilder, RoutingFees};
use lightning_liquidity::events::LiquidityEvent;
use lightning_liquidity::lsps0::ser::{LSPSDateTime, LSPSRequestId};
use lightning_liquidity::lsps1::client::LSPS1ClientConfig as LdkLSPS1ClientConfig;
use lightning_liquidity::lsps1::event::LSPS1ClientEvent;
use lightning_liquidity::lsps1::msgs::{
LSPS1ChannelInfo, LSPS1Options, LSPS1OrderId, LSPS1OrderParams,
};
use lightning_liquidity::lsps2::client::LSPS2ClientConfig as LdkLSPS2ClientConfig;
use lightning_liquidity::lsps2::event::{LSPS2ClientEvent, LSPS2ServiceEvent};
use lightning_liquidity::lsps2::msgs::{LSPS2OpeningFeeParams, LSPS2RawOpeningFeeParams};
use lightning_liquidity::lsps2::service::LSPS2ServiceConfig as LdkLSPS2ServiceConfig;
use lightning_liquidity::lsps2::utils::compute_opening_fee;
use lightning_liquidity::{LiquidityClientConfig, LiquidityServiceConfig};
use lightning_types::payment::PaymentHash;
use rand::Rng;
use tokio::sync::oneshot;
use crate::builder::BuildError;
use crate::chain::ChainSource;
use crate::connection::ConnectionManager;
use crate::logger::{log_debug, log_error, log_info, LdkLogger, Logger};
use crate::runtime::Runtime;
use crate::types::{
Broadcaster, ChannelManager, DynStore, KeysManager, LiquidityManager, PeerManager, Wallet,
};
use crate::{total_anchor_channels_reserve_sats, Config, Error};
const LIQUIDITY_REQUEST_TIMEOUT_SECS: u64 = 5;
const LSPS2_GETINFO_REQUEST_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24);
const LSPS2_CHANNEL_CLTV_EXPIRY_DELTA: u32 = 72;
struct LSPS1Client {
lsp_node_id: PublicKey,
lsp_address: SocketAddress,
token: Option<String>,
ldk_client_config: LdkLSPS1ClientConfig,
pending_opening_params_requests:
Mutex<HashMap<LSPSRequestId, oneshot::Sender<LSPS1OpeningParamsResponse>>>,
pending_create_order_requests: Mutex<HashMap<LSPSRequestId, oneshot::Sender<LSPS1OrderStatus>>>,
pending_check_order_status_requests:
Mutex<HashMap<LSPSRequestId, oneshot::Sender<LSPS1OrderStatus>>>,
}
#[derive(Debug, Clone)]
pub(crate) struct LSPS1ClientConfig {
pub node_id: PublicKey,
pub address: SocketAddress,
pub token: Option<String>,
}
struct LSPS2Client {
lsp_node_id: PublicKey,
lsp_address: SocketAddress,
token: Option<String>,
ldk_client_config: LdkLSPS2ClientConfig,
pending_fee_requests: Mutex<HashMap<LSPSRequestId, oneshot::Sender<LSPS2FeeResponse>>>,
pending_buy_requests: Mutex<HashMap<LSPSRequestId, oneshot::Sender<LSPS2BuyResponse>>>,
}
#[derive(Debug, Clone)]
pub(crate) struct LSPS2ClientConfig {
pub node_id: PublicKey,
pub address: SocketAddress,
pub token: Option<String>,
}
struct LSPS2Service {
service_config: LSPS2ServiceConfig,
ldk_service_config: LdkLSPS2ServiceConfig,
}
#[derive(Debug, Clone)]
pub struct LSPS2ServiceConfig {
pub require_token: Option<String>,
pub advertise_service: bool,
pub channel_opening_fee_ppm: u32,
pub channel_over_provisioning_ppm: u32,
pub min_channel_opening_fee_msat: u64,
pub min_channel_lifetime: u32,
pub max_client_to_self_delay: u32,
pub min_payment_size_msat: u64,
pub max_payment_size_msat: u64,
pub client_trusts_lsp: bool,
}
pub(crate) struct LiquiditySourceBuilder<L: Deref>
where
L::Target: LdkLogger,
{
lsps1_client: Option<LSPS1Client>,
lsps2_client: Option<LSPS2Client>,
lsps2_service: Option<LSPS2Service>,
wallet: Arc<Wallet>,
channel_manager: Arc<ChannelManager>,
keys_manager: Arc<KeysManager>,
chain_source: Arc<ChainSource>,
tx_broadcaster: Arc<Broadcaster>,
kv_store: Arc<DynStore>,
config: Arc<Config>,
logger: L,
}
impl<L: Deref> LiquiditySourceBuilder<L>
where
L::Target: LdkLogger,
{
pub(crate) fn new(
wallet: Arc<Wallet>, channel_manager: Arc<ChannelManager>, keys_manager: Arc<KeysManager>,
chain_source: Arc<ChainSource>, tx_broadcaster: Arc<Broadcaster>, kv_store: Arc<DynStore>,
config: Arc<Config>, logger: L,
) -> Self {
let lsps1_client = None;
let lsps2_client = None;
let lsps2_service = None;
Self {
lsps1_client,
lsps2_client,
lsps2_service,
wallet,
channel_manager,
keys_manager,
chain_source,
tx_broadcaster,
kv_store,
config,
logger,
}
}
pub(crate) fn lsps1_client(
&mut self, lsp_node_id: PublicKey, lsp_address: SocketAddress, token: Option<String>,
) -> &mut Self {
let ldk_client_config = LdkLSPS1ClientConfig { max_channel_fees_msat: None };
let pending_opening_params_requests = Mutex::new(HashMap::new());
let pending_create_order_requests = Mutex::new(HashMap::new());
let pending_check_order_status_requests = Mutex::new(HashMap::new());
self.lsps1_client = Some(LSPS1Client {
lsp_node_id,
lsp_address,
token,
ldk_client_config,
pending_opening_params_requests,
pending_create_order_requests,
pending_check_order_status_requests,
});
self
}
pub(crate) fn lsps2_client(
&mut self, lsp_node_id: PublicKey, lsp_address: SocketAddress, token: Option<String>,
) -> &mut Self {
let ldk_client_config = LdkLSPS2ClientConfig {};
let pending_fee_requests = Mutex::new(HashMap::new());
let pending_buy_requests = Mutex::new(HashMap::new());
self.lsps2_client = Some(LSPS2Client {
lsp_node_id,
lsp_address,
token,
ldk_client_config,
pending_fee_requests,
pending_buy_requests,
});
self
}
pub(crate) fn lsps2_service(
&mut self, promise_secret: [u8; 32], service_config: LSPS2ServiceConfig,
) -> &mut Self {
let ldk_service_config = LdkLSPS2ServiceConfig { promise_secret };
self.lsps2_service = Some(LSPS2Service { service_config, ldk_service_config });
self
}
pub(crate) async fn build(self) -> Result<LiquiditySource<L>, BuildError> {
let liquidity_service_config = self.lsps2_service.as_ref().map(|s| {
let lsps2_service_config = Some(s.ldk_service_config.clone());
let lsps5_service_config = None;
let advertise_service = s.service_config.advertise_service;
LiquidityServiceConfig { lsps2_service_config, lsps5_service_config, advertise_service }
});
let lsps1_client_config = self.lsps1_client.as_ref().map(|s| s.ldk_client_config.clone());
let lsps2_client_config = self.lsps2_client.as_ref().map(|s| s.ldk_client_config.clone());
let lsps5_client_config = None;
let liquidity_client_config = Some(LiquidityClientConfig {
lsps1_client_config,
lsps2_client_config,
lsps5_client_config,
});
let liquidity_manager = Arc::new(
LiquidityManager::new(
Arc::clone(&self.keys_manager),
Arc::clone(&self.keys_manager),
Arc::clone(&self.channel_manager),
Some(Arc::clone(&self.chain_source)),
None,
Arc::clone(&self.kv_store),
Arc::clone(&self.tx_broadcaster),
liquidity_service_config,
liquidity_client_config,
)
.await
.map_err(|_| BuildError::ReadFailed)?,
);
Ok(LiquiditySource {
lsps1_client: self.lsps1_client,
lsps2_client: self.lsps2_client,
lsps2_service: self.lsps2_service,
wallet: self.wallet,
channel_manager: self.channel_manager,
peer_manager: RwLock::new(None),
keys_manager: self.keys_manager,
liquidity_manager,
config: self.config,
logger: self.logger,
})
}
}
pub(crate) struct LiquiditySource<L: Deref>
where
L::Target: LdkLogger,
{
lsps1_client: Option<LSPS1Client>,
lsps2_client: Option<LSPS2Client>,
lsps2_service: Option<LSPS2Service>,
wallet: Arc<Wallet>,
channel_manager: Arc<ChannelManager>,
peer_manager: RwLock<Option<Arc<PeerManager>>>,
keys_manager: Arc<KeysManager>,
liquidity_manager: Arc<LiquidityManager>,
config: Arc<Config>,
logger: L,
}
impl<L: Deref> LiquiditySource<L>
where
L::Target: LdkLogger,
{
pub(crate) fn set_peer_manager(&self, peer_manager: Arc<PeerManager>) {
*self.peer_manager.write().unwrap() = Some(peer_manager);
}
pub(crate) fn liquidity_manager(&self) -> Arc<LiquidityManager> {
Arc::clone(&self.liquidity_manager)
}
pub(crate) fn get_lsps1_lsp_details(&self) -> Option<(PublicKey, SocketAddress)> {
self.lsps1_client.as_ref().map(|s| (s.lsp_node_id, s.lsp_address.clone()))
}
pub(crate) fn get_lsps2_lsp_details(&self) -> Option<(PublicKey, SocketAddress)> {
self.lsps2_client.as_ref().map(|s| (s.lsp_node_id, s.lsp_address.clone()))
}
pub(crate) fn lsps2_channel_needs_manual_broadcast(
&self, counterparty_node_id: PublicKey, user_channel_id: u128,
) -> bool {
self.lsps2_service.as_ref().map_or(false, |lsps2_service| {
lsps2_service.service_config.client_trusts_lsp
&& self
.liquidity_manager()
.lsps2_service_handler()
.and_then(|handler| {
handler
.channel_needs_manual_broadcast(user_channel_id, &counterparty_node_id)
.ok()
})
.unwrap_or(false)
})
}
pub(crate) fn lsps2_store_funding_transaction(
&self, user_channel_id: u128, counterparty_node_id: PublicKey, funding_tx: Transaction,
) {
if self.lsps2_service.as_ref().map_or(false, |svc| !svc.service_config.client_trusts_lsp) {
return;
}
let lsps2_service_handler = self.liquidity_manager.lsps2_service_handler();
if let Some(handler) = lsps2_service_handler {
handler
.store_funding_transaction(user_channel_id, &counterparty_node_id, funding_tx)
.unwrap_or_else(|e| {
debug_assert!(false, "Failed to store funding transaction: {:?}", e);
log_error!(self.logger, "Failed to store funding transaction: {:?}", e);
});
} else {
log_error!(self.logger, "LSPS2 service handler is not available.");
}
}
pub(crate) fn lsps2_funding_tx_broadcast_safe(
&self, user_channel_id: u128, counterparty_node_id: PublicKey,
) {
if self.lsps2_service.as_ref().map_or(false, |svc| !svc.service_config.client_trusts_lsp) {
return;
}
let lsps2_service_handler = self.liquidity_manager.lsps2_service_handler();
if let Some(handler) = lsps2_service_handler {
handler
.set_funding_tx_broadcast_safe(user_channel_id, &counterparty_node_id)
.unwrap_or_else(|e| {
debug_assert!(
false,
"Failed to mark funding transaction safe to broadcast: {:?}",
e
);
log_error!(
self.logger,
"Failed to mark funding transaction safe to broadcast: {:?}",
e
);
});
} else {
log_error!(self.logger, "LSPS2 service handler is not available.");
}
}
pub(crate) async fn handle_next_event(&self) {
match self.liquidity_manager.next_event_async().await {
LiquidityEvent::LSPS1Client(LSPS1ClientEvent::SupportedOptionsReady {
request_id,
counterparty_node_id,
supported_options,
}) => {
if let Some(lsps1_client) = self.lsps1_client.as_ref() {
if counterparty_node_id != lsps1_client.lsp_node_id {
debug_assert!(
false,
"Received response from unexpected LSP counterparty. This should never happen."
);
log_error!(
self.logger,
"Received response from unexpected LSP counterparty. This should never happen."
);
return;
}
if let Some(sender) = lsps1_client
.pending_opening_params_requests
.lock()
.unwrap()
.remove(&request_id)
{
let response = LSPS1OpeningParamsResponse { supported_options };
match sender.send(response) {
Ok(()) => (),
Err(_) => {
log_error!(
self.logger,
"Failed to handle response for request {:?} from liquidity service",
request_id
);
},
}
} else {
debug_assert!(
false,
"Received response from liquidity service for unknown request."
);
log_error!(
self.logger,
"Received response from liquidity service for unknown request."
);
}
} else {
log_error!(
self.logger,
"Received unexpected LSPS1Client::SupportedOptionsReady event!"
);
}
},
LiquidityEvent::LSPS1Client(LSPS1ClientEvent::OrderCreated {
request_id,
counterparty_node_id,
order_id,
order,
payment,
channel,
}) => {
if let Some(lsps1_client) = self.lsps1_client.as_ref() {
if counterparty_node_id != lsps1_client.lsp_node_id {
debug_assert!(
false,
"Received response from unexpected LSP counterparty. This should never happen."
);
log_error!(
self.logger,
"Received response from unexpected LSP counterparty. This should never happen."
);
return;
}
if let Some(sender) = lsps1_client
.pending_create_order_requests
.lock()
.unwrap()
.remove(&request_id)
{
let response = LSPS1OrderStatus {
order_id,
order_params: order,
payment_options: payment.into(),
channel_state: channel,
};
match sender.send(response) {
Ok(()) => (),
Err(_) => {
log_error!(
self.logger,
"Failed to handle response for request {:?} from liquidity service",
request_id
);
},
}
} else {
debug_assert!(
false,
"Received response from liquidity service for unknown request."
);
log_error!(
self.logger,
"Received response from liquidity service for unknown request."
);
}
} else {
log_error!(self.logger, "Received unexpected LSPS1Client::OrderCreated event!");
}
},
LiquidityEvent::LSPS1Client(LSPS1ClientEvent::OrderStatus {
request_id,
counterparty_node_id,
order_id,
order,
payment,
channel,
}) => {
if let Some(lsps1_client) = self.lsps1_client.as_ref() {
if counterparty_node_id != lsps1_client.lsp_node_id {
debug_assert!(
false,
"Received response from unexpected LSP counterparty. This should never happen."
);
log_error!(
self.logger,
"Received response from unexpected LSP counterparty. This should never happen."
);
return;
}
if let Some(sender) = lsps1_client
.pending_check_order_status_requests
.lock()
.unwrap()
.remove(&request_id)
{
let response = LSPS1OrderStatus {
order_id,
order_params: order,
payment_options: payment.into(),
channel_state: channel,
};
match sender.send(response) {
Ok(()) => (),
Err(_) => {
log_error!(
self.logger,
"Failed to handle response for request {:?} from liquidity service",
request_id
);
},
}
} else {
debug_assert!(
false,
"Received response from liquidity service for unknown request."
);
log_error!(
self.logger,
"Received response from liquidity service for unknown request."
);
}
} else {
log_error!(self.logger, "Received unexpected LSPS1Client::OrderStatus event!");
}
},
LiquidityEvent::LSPS2Service(LSPS2ServiceEvent::GetInfo {
request_id,
counterparty_node_id,
token,
}) => {
if let Some(lsps2_service_handler) =
self.liquidity_manager.lsps2_service_handler().as_ref()
{
let service_config = if let Some(service_config) =
self.lsps2_service.as_ref().map(|s| s.service_config.clone())
{
service_config
} else {
log_error!(self.logger, "Failed to handle LSPS2ServiceEvent as LSPS2 liquidity service was not configured.",);
return;
};
if let Some(required) = service_config.require_token {
if token != Some(required) {
log_error!(
self.logger,
"Rejecting LSPS2 request {:?} from counterparty {} as the client provided an invalid token.",
request_id,
counterparty_node_id
);
lsps2_service_handler.invalid_token_provided(&counterparty_node_id, request_id.clone()).unwrap_or_else(|e| {
debug_assert!(false, "Failed to reject LSPS2 request. This should never happen.");
log_error!(
self.logger,
"Failed to reject LSPS2 request {:?} from counterparty {} due to: {:?}. This should never happen.",
request_id,
counterparty_node_id,
e
);
});
return;
}
}
let valid_until = LSPSDateTime(Utc::now() + LSPS2_GETINFO_REQUEST_EXPIRY);
let opening_fee_params = LSPS2RawOpeningFeeParams {
min_fee_msat: service_config.min_channel_opening_fee_msat,
proportional: service_config.channel_opening_fee_ppm,
valid_until,
min_lifetime: service_config.min_channel_lifetime,
max_client_to_self_delay: service_config.max_client_to_self_delay,
min_payment_size_msat: service_config.min_payment_size_msat,
max_payment_size_msat: service_config.max_payment_size_msat,
};
let opening_fee_params_menu = vec![opening_fee_params];
if let Err(e) = lsps2_service_handler.opening_fee_params_generated(
&counterparty_node_id,
request_id,
opening_fee_params_menu,
) {
log_error!(
self.logger,
"Failed to handle generated opening fee params: {:?}",
e
);
}
} else {
log_error!(self.logger, "Failed to handle LSPS2ServiceEvent as LSPS2 liquidity service was not configured.",);
return;
}
},
LiquidityEvent::LSPS2Service(LSPS2ServiceEvent::BuyRequest {
request_id,
counterparty_node_id,
opening_fee_params: _,
payment_size_msat,
}) => {
if let Some(lsps2_service_handler) =
self.liquidity_manager.lsps2_service_handler().as_ref()
{
let service_config = if let Some(service_config) =
self.lsps2_service.as_ref().map(|s| s.service_config.clone())
{
service_config
} else {
log_error!(self.logger, "Failed to handle LSPS2ServiceEvent as LSPS2 liquidity service was not configured.",);
return;
};
let user_channel_id: u128 = rand::rng().random();
let intercept_scid = self.channel_manager.get_intercept_scid();
if let Some(payment_size_msat) = payment_size_msat {
if payment_size_msat > service_config.max_payment_size_msat
|| payment_size_msat < service_config.min_payment_size_msat
{
log_error!(
self.logger,
"Rejecting to handle LSPS2 buy request {:?} from counterparty {} as the client requested an invalid payment size.",
request_id,
counterparty_node_id
);
return;
}
}
match lsps2_service_handler
.invoice_parameters_generated(
&counterparty_node_id,
request_id,
intercept_scid,
LSPS2_CHANNEL_CLTV_EXPIRY_DELTA,
service_config.client_trusts_lsp,
user_channel_id,
)
.await
{
Ok(()) => {},
Err(e) => {
log_error!(
self.logger,
"Failed to provide invoice parameters: {:?}",
e
);
return;
},
}
} else {
log_error!(self.logger, "Failed to handle LSPS2ServiceEvent as LSPS2 liquidity service was not configured.",);
return;
}
},
LiquidityEvent::LSPS2Service(LSPS2ServiceEvent::OpenChannel {
their_network_key,
amt_to_forward_msat,
opening_fee_msat: _,
user_channel_id,
intercept_scid: _,
}) => {
if self.liquidity_manager.lsps2_service_handler().is_none() {
log_error!(self.logger, "Failed to handle LSPS2ServiceEvent as LSPS2 liquidity service was not configured.",);
return;
};
let service_config = if let Some(service_config) =
self.lsps2_service.as_ref().map(|s| s.service_config.clone())
{
service_config
} else {
log_error!(self.logger, "Failed to handle LSPS2ServiceEvent as LSPS2 liquidity service was not configured.",);
return;
};
let init_features = if let Some(peer_manager) =
self.peer_manager.read().unwrap().as_ref()
{
if let Some(peer) = peer_manager.peer_by_node_id(&their_network_key) {
peer.init_features
} else {
log_error!(
self.logger,
"Failed to open LSPS2 channel to {} due to peer not being not connected.",
their_network_key,
);
return;
}
} else {
debug_assert!(false, "Failed to handle LSPS2ServiceEvent as peer manager isn't available. This should never happen.",);
log_error!(self.logger, "Failed to handle LSPS2ServiceEvent as peer manager isn't available. This should never happen.",);
return;
};
let over_provisioning_msat = (amt_to_forward_msat
* service_config.channel_over_provisioning_ppm as u64)
/ 1_000_000;
let channel_amount_sats = (amt_to_forward_msat + over_provisioning_msat) / 1000;
let cur_anchor_reserve_sats =
total_anchor_channels_reserve_sats(&self.channel_manager, &self.config);
let spendable_amount_sats =
self.wallet.get_spendable_amount_sats(cur_anchor_reserve_sats).unwrap_or(0);
let required_funds_sats = channel_amount_sats
+ self.config.anchor_channels_config.as_ref().map_or(0, |c| {
if init_features.requires_anchors_zero_fee_htlc_tx()
&& !c.trusted_peers_no_reserve.contains(&their_network_key)
{
c.per_channel_reserve_sats
} else {
0
}
});
if spendable_amount_sats < required_funds_sats {
log_error!(self.logger,
"Unable to create channel due to insufficient funds. Available: {}sats, Required: {}sats",
spendable_amount_sats, channel_amount_sats
);
return;
}
let mut config = self.channel_manager.get_current_config().clone();
debug_assert_eq!(
config
.channel_handshake_config
.max_inbound_htlc_value_in_flight_percent_of_channel,
100
);
debug_assert!(config.accept_forwards_to_priv_channels);
config.channel_config.forwarding_fee_base_msat = 0;
config.channel_config.forwarding_fee_proportional_millionths = 0;
match self.channel_manager.create_channel(
their_network_key,
channel_amount_sats,
0,
user_channel_id,
None,
Some(config),
) {
Ok(_) => {},
Err(e) => {
log_error!(
self.logger,
"Failed to open LSPS2 channel to {}: {:?}",
their_network_key,
e
);
return;
},
}
},
LiquidityEvent::LSPS2Client(LSPS2ClientEvent::OpeningParametersReady {
request_id,
counterparty_node_id,
opening_fee_params_menu,
}) => {
if let Some(lsps2_client) = self.lsps2_client.as_ref() {
if counterparty_node_id != lsps2_client.lsp_node_id {
debug_assert!(
false,
"Received response from unexpected LSP counterparty. This should never happen."
);
log_error!(
self.logger,
"Received response from unexpected LSP counterparty. This should never happen."
);
return;
}
if let Some(sender) =
lsps2_client.pending_fee_requests.lock().unwrap().remove(&request_id)
{
let response = LSPS2FeeResponse { opening_fee_params_menu };
match sender.send(response) {
Ok(()) => (),
Err(_) => {
log_error!(
self.logger,
"Failed to handle response for request {:?} from liquidity service",
request_id
);
},
}
} else {
debug_assert!(
false,
"Received response from liquidity service for unknown request."
);
log_error!(
self.logger,
"Received response from liquidity service for unknown request."
);
}
} else {
log_error!(
self.logger,
"Received unexpected LSPS2Client::OpeningParametersReady event!"
);
}
},
LiquidityEvent::LSPS2Client(LSPS2ClientEvent::InvoiceParametersReady {
request_id,
counterparty_node_id,
intercept_scid,
cltv_expiry_delta,
..
}) => {
if let Some(lsps2_client) = self.lsps2_client.as_ref() {
if counterparty_node_id != lsps2_client.lsp_node_id {
debug_assert!(
false,
"Received response from unexpected LSP counterparty. This should never happen."
);
log_error!(
self.logger,
"Received response from unexpected LSP counterparty. This should never happen."
);
return;
}
if let Some(sender) =
lsps2_client.pending_buy_requests.lock().unwrap().remove(&request_id)
{
let response = LSPS2BuyResponse { intercept_scid, cltv_expiry_delta };
match sender.send(response) {
Ok(()) => (),
Err(_) => {
log_error!(
self.logger,
"Failed to handle response for request {:?} from liquidity service",
request_id
);
},
}
} else {
debug_assert!(
false,
"Received response from liquidity service for unknown request."
);
log_error!(
self.logger,
"Received response from liquidity service for unknown request."
);
}
} else {
log_error!(
self.logger,
"Received unexpected LSPS2Client::InvoiceParametersReady event!"
);
}
},
e => {
log_error!(self.logger, "Received unexpected liquidity event: {:?}", e);
},
}
}
pub(crate) async fn lsps1_request_opening_params(
&self,
) -> Result<LSPS1OpeningParamsResponse, Error> {
let lsps1_client = self.lsps1_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?;
let client_handler = self.liquidity_manager.lsps1_client_handler().ok_or_else(|| {
log_error!(self.logger, "LSPS1 liquidity client was not configured.",);
Error::LiquiditySourceUnavailable
})?;
let (request_sender, request_receiver) = oneshot::channel();
{
let mut pending_opening_params_requests_lock =
lsps1_client.pending_opening_params_requests.lock().unwrap();
let request_id = client_handler.request_supported_options(lsps1_client.lsp_node_id);
pending_opening_params_requests_lock.insert(request_id, request_sender);
}
tokio::time::timeout(Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), request_receiver)
.await
.map_err(|e| {
log_error!(self.logger, "Liquidity request timed out: {}", e);
Error::LiquidityRequestFailed
})?
.map_err(|e| {
log_error!(self.logger, "Failed to handle response from liquidity service: {}", e);
Error::LiquidityRequestFailed
})
}
pub(crate) async fn lsps1_request_channel(
&self, lsp_balance_sat: u64, client_balance_sat: u64, channel_expiry_blocks: u32,
announce_channel: bool, refund_address: bitcoin::Address,
) -> Result<LSPS1OrderStatus, Error> {
let lsps1_client = self.lsps1_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?;
let client_handler = self.liquidity_manager.lsps1_client_handler().ok_or_else(|| {
log_error!(self.logger, "LSPS1 liquidity client was not configured.",);
Error::LiquiditySourceUnavailable
})?;
let lsp_limits = self.lsps1_request_opening_params().await?.supported_options;
let channel_size_sat = lsp_balance_sat + client_balance_sat;
if channel_size_sat < lsp_limits.min_channel_balance_sat
|| channel_size_sat > lsp_limits.max_channel_balance_sat
{
log_error!(
self.logger,
"Requested channel size of {}sat doesn't meet the LSP-provided limits (min: {}sat, max: {}sat).",
channel_size_sat,
lsp_limits.min_channel_balance_sat,
lsp_limits.max_channel_balance_sat
);
return Err(Error::LiquidityRequestFailed);
}
if lsp_balance_sat < lsp_limits.min_initial_lsp_balance_sat
|| lsp_balance_sat > lsp_limits.max_initial_lsp_balance_sat
{
log_error!(
self.logger,
"Requested LSP-side balance of {}sat doesn't meet the LSP-provided limits (min: {}sat, max: {}sat).",
lsp_balance_sat,
lsp_limits.min_initial_lsp_balance_sat,
lsp_limits.max_initial_lsp_balance_sat
);
return Err(Error::LiquidityRequestFailed);
}
if client_balance_sat < lsp_limits.min_initial_client_balance_sat
|| client_balance_sat > lsp_limits.max_initial_client_balance_sat
{
log_error!(
self.logger,
"Requested client-side balance of {}sat doesn't meet the LSP-provided limits (min: {}sat, max: {}sat).",
client_balance_sat,
lsp_limits.min_initial_client_balance_sat,
lsp_limits.max_initial_client_balance_sat
);
return Err(Error::LiquidityRequestFailed);
}
let order_params = LSPS1OrderParams {
lsp_balance_sat,
client_balance_sat,
required_channel_confirmations: lsp_limits.min_required_channel_confirmations,
funding_confirms_within_blocks: lsp_limits.min_funding_confirms_within_blocks,
channel_expiry_blocks,
token: lsps1_client.token.clone(),
announce_channel,
};
let (request_sender, request_receiver) = oneshot::channel();
let request_id;
{
let mut pending_create_order_requests_lock =
lsps1_client.pending_create_order_requests.lock().unwrap();
request_id = client_handler.create_order(
&lsps1_client.lsp_node_id,
order_params.clone(),
Some(refund_address),
);
pending_create_order_requests_lock.insert(request_id.clone(), request_sender);
}
let response = tokio::time::timeout(
Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS),
request_receiver,
)
.await
.map_err(|e| {
log_error!(self.logger, "Liquidity request with ID {:?} timed out: {}", request_id, e);
Error::LiquidityRequestFailed
})?
.map_err(|e| {
log_error!(self.logger, "Failed to handle response from liquidity service: {}", e);
Error::LiquidityRequestFailed
})?;
if response.order_params != order_params {
log_error!(
self.logger,
"Aborting LSPS1 request as LSP-provided parameters don't match our order. Expected: {:?}, Received: {:?}", order_params, response.order_params
);
return Err(Error::LiquidityRequestFailed);
}
Ok(response)
}
pub(crate) async fn lsps1_check_order_status(
&self, order_id: LSPS1OrderId,
) -> Result<LSPS1OrderStatus, Error> {
let lsps1_client = self.lsps1_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?;
let client_handler = self.liquidity_manager.lsps1_client_handler().ok_or_else(|| {
log_error!(self.logger, "LSPS1 liquidity client was not configured.",);
Error::LiquiditySourceUnavailable
})?;
let (request_sender, request_receiver) = oneshot::channel();
{
let mut pending_check_order_status_requests_lock =
lsps1_client.pending_check_order_status_requests.lock().unwrap();
let request_id = client_handler.check_order_status(&lsps1_client.lsp_node_id, order_id);
pending_check_order_status_requests_lock.insert(request_id, request_sender);
}
let response = tokio::time::timeout(
Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS),
request_receiver,
)
.await
.map_err(|e| {
log_error!(self.logger, "Liquidity request timed out: {}", e);
Error::LiquidityRequestFailed
})?
.map_err(|e| {
log_error!(self.logger, "Failed to handle response from liquidity service: {}", e);
Error::LiquidityRequestFailed
})?;
Ok(response)
}
pub(crate) async fn lsps2_receive_to_jit_channel(
&self, amount_msat: u64, description: &Bolt11InvoiceDescription, expiry_secs: u32,
max_total_lsp_fee_limit_msat: Option<u64>, payment_hash: Option<PaymentHash>,
) -> Result<(Bolt11Invoice, u64), Error> {
let fee_response = self.lsps2_request_opening_fee_params().await?;
let (min_total_fee_msat, min_opening_params) = fee_response
.opening_fee_params_menu
.into_iter()
.filter_map(|params| {
if amount_msat < params.min_payment_size_msat
|| amount_msat > params.max_payment_size_msat
{
log_debug!(self.logger,
"Skipping LSP-offered JIT parameters as the payment of {}msat doesn't meet LSP limits (min: {}msat, max: {}msat)",
amount_msat,
params.min_payment_size_msat,
params.max_payment_size_msat
);
None
} else {
compute_opening_fee(amount_msat, params.min_fee_msat, params.proportional as u64)
.map(|fee| (fee, params))
}
})
.min_by_key(|p| p.0)
.ok_or_else(|| {
log_error!(self.logger, "Failed to handle response from liquidity service",);
Error::LiquidityRequestFailed
})?;
if let Some(max_total_lsp_fee_limit_msat) = max_total_lsp_fee_limit_msat {
if min_total_fee_msat > max_total_lsp_fee_limit_msat {
log_error!(self.logger,
"Failed to request inbound JIT channel as LSP's requested total opening fee of {}msat exceeds our fee limit of {}msat",
min_total_fee_msat, max_total_lsp_fee_limit_msat
);
return Err(Error::LiquidityFeeTooHigh);
}
}
log_debug!(
self.logger,
"Choosing cheapest liquidity offer, will pay {}msat in total LSP fees",
min_total_fee_msat
);
let buy_response =
self.lsps2_send_buy_request(Some(amount_msat), min_opening_params).await?;
let invoice = self.lsps2_create_jit_invoice(
buy_response,
Some(amount_msat),
description,
expiry_secs,
payment_hash,
)?;
log_info!(self.logger, "JIT-channel invoice created: {}", invoice);
Ok((invoice, min_total_fee_msat))
}
pub(crate) async fn lsps2_receive_variable_amount_to_jit_channel(
&self, description: &Bolt11InvoiceDescription, expiry_secs: u32,
max_proportional_lsp_fee_limit_ppm_msat: Option<u64>, payment_hash: Option<PaymentHash>,
) -> Result<(Bolt11Invoice, u64), Error> {
let fee_response = self.lsps2_request_opening_fee_params().await?;
let (min_prop_fee_ppm_msat, min_opening_params) = fee_response
.opening_fee_params_menu
.into_iter()
.map(|params| (params.proportional as u64, params))
.min_by_key(|p| p.0)
.ok_or_else(|| {
log_error!(self.logger, "Failed to handle response from liquidity service",);
Error::LiquidityRequestFailed
})?;
if let Some(max_proportional_lsp_fee_limit_ppm_msat) =
max_proportional_lsp_fee_limit_ppm_msat
{
if min_prop_fee_ppm_msat > max_proportional_lsp_fee_limit_ppm_msat {
log_error!(self.logger,
"Failed to request inbound JIT channel as LSP's requested proportional opening fee of {} ppm msat exceeds our fee limit of {} ppm msat",
min_prop_fee_ppm_msat,
max_proportional_lsp_fee_limit_ppm_msat
);
return Err(Error::LiquidityFeeTooHigh);
}
}
log_debug!(
self.logger,
"Choosing cheapest liquidity offer, will pay {}ppm msat in proportional LSP fees",
min_prop_fee_ppm_msat
);
let buy_response = self.lsps2_send_buy_request(None, min_opening_params).await?;
let invoice = self.lsps2_create_jit_invoice(
buy_response,
None,
description,
expiry_secs,
payment_hash,
)?;
log_info!(self.logger, "JIT-channel invoice created: {}", invoice);
Ok((invoice, min_prop_fee_ppm_msat))
}
async fn lsps2_request_opening_fee_params(&self) -> Result<LSPS2FeeResponse, Error> {
let lsps2_client = self.lsps2_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?;
let client_handler = self.liquidity_manager.lsps2_client_handler().ok_or_else(|| {
log_error!(self.logger, "Liquidity client was not configured.",);
Error::LiquiditySourceUnavailable
})?;
let (fee_request_sender, fee_request_receiver) = oneshot::channel();
{
let mut pending_fee_requests_lock = lsps2_client.pending_fee_requests.lock().unwrap();
let request_id = client_handler
.request_opening_params(lsps2_client.lsp_node_id, lsps2_client.token.clone());
pending_fee_requests_lock.insert(request_id, fee_request_sender);
}
tokio::time::timeout(
Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS),
fee_request_receiver,
)
.await
.map_err(|e| {
log_error!(self.logger, "Liquidity request timed out: {}", e);
Error::LiquidityRequestFailed
})?
.map_err(|e| {
log_error!(self.logger, "Failed to handle response from liquidity service: {}", e);
Error::LiquidityRequestFailed
})
}
async fn lsps2_send_buy_request(
&self, amount_msat: Option<u64>, opening_fee_params: LSPS2OpeningFeeParams,
) -> Result<LSPS2BuyResponse, Error> {
let lsps2_client = self.lsps2_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?;
let client_handler = self.liquidity_manager.lsps2_client_handler().ok_or_else(|| {
log_error!(self.logger, "Liquidity client was not configured.",);
Error::LiquiditySourceUnavailable
})?;
let (buy_request_sender, buy_request_receiver) = oneshot::channel();
{
let mut pending_buy_requests_lock = lsps2_client.pending_buy_requests.lock().unwrap();
let request_id = client_handler
.select_opening_params(lsps2_client.lsp_node_id, amount_msat, opening_fee_params)
.map_err(|e| {
log_error!(
self.logger,
"Failed to send buy request to liquidity service: {:?}",
e
);
Error::LiquidityRequestFailed
})?;
pending_buy_requests_lock.insert(request_id, buy_request_sender);
}
let buy_response = tokio::time::timeout(
Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS),
buy_request_receiver,
)
.await
.map_err(|e| {
log_error!(self.logger, "Liquidity request timed out: {}", e);
Error::LiquidityRequestFailed
})?
.map_err(|e| {
log_error!(self.logger, "Failed to handle response from liquidity service: {:?}", e);
Error::LiquidityRequestFailed
})?;
Ok(buy_response)
}
fn lsps2_create_jit_invoice(
&self, buy_response: LSPS2BuyResponse, amount_msat: Option<u64>,
description: &Bolt11InvoiceDescription, expiry_secs: u32,
payment_hash: Option<PaymentHash>,
) -> Result<Bolt11Invoice, Error> {
let lsps2_client = self.lsps2_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?;
let min_final_cltv_expiry_delta = MIN_FINAL_CLTV_EXPIRY_DELTA + 2;
let (payment_hash, payment_secret) = match payment_hash {
Some(payment_hash) => {
let payment_secret = self
.channel_manager
.create_inbound_payment_for_hash(
payment_hash,
None,
expiry_secs,
Some(min_final_cltv_expiry_delta),
)
.map_err(|e| {
log_error!(self.logger, "Failed to register inbound payment: {:?}", e);
Error::InvoiceCreationFailed
})?;
(payment_hash, payment_secret)
},
None => self
.channel_manager
.create_inbound_payment(None, expiry_secs, Some(min_final_cltv_expiry_delta))
.map_err(|e| {
log_error!(self.logger, "Failed to register inbound payment: {:?}", e);
Error::InvoiceCreationFailed
})?,
};
let route_hint = RouteHint(vec![RouteHintHop {
src_node_id: lsps2_client.lsp_node_id,
short_channel_id: buy_response.intercept_scid,
fees: RoutingFees { base_msat: 0, proportional_millionths: 0 },
cltv_expiry_delta: buy_response.cltv_expiry_delta as u16,
htlc_minimum_msat: None,
htlc_maximum_msat: None,
}]);
let payment_hash = sha256::Hash::from_slice(&payment_hash.0).map_err(|e| {
log_error!(self.logger, "Invalid payment hash: {:?}", e);
Error::InvoiceCreationFailed
})?;
let currency = self.config.network.into();
let mut invoice_builder = InvoiceBuilder::new(currency)
.invoice_description(description.clone())
.payment_hash(payment_hash)
.payment_secret(payment_secret)
.current_timestamp()
.min_final_cltv_expiry_delta(min_final_cltv_expiry_delta.into())
.expiry_time(Duration::from_secs(expiry_secs.into()))
.private_route(route_hint);
if let Some(amount_msat) = amount_msat {
invoice_builder = invoice_builder.amount_milli_satoshis(amount_msat).basic_mpp();
}
invoice_builder
.build_signed(|hash| {
Secp256k1::new()
.sign_ecdsa_recoverable(hash, &self.keys_manager.get_node_secret_key())
})
.map_err(|e| {
log_error!(self.logger, "Failed to build and sign invoice: {}", e);
Error::InvoiceCreationFailed
})
}
pub(crate) async fn handle_channel_ready(
&self, user_channel_id: u128, channel_id: &ChannelId, counterparty_node_id: &PublicKey,
) {
if let Some(lsps2_service_handler) = self.liquidity_manager.lsps2_service_handler() {
if let Err(e) = lsps2_service_handler
.channel_ready(user_channel_id, channel_id, counterparty_node_id)
.await
{
log_error!(
self.logger,
"LSPS2 service failed to handle ChannelReady event: {:?}",
e
);
}
}
}
pub(crate) async fn handle_htlc_intercepted(
&self, intercept_scid: u64, intercept_id: InterceptId, expected_outbound_amount_msat: u64,
payment_hash: PaymentHash,
) {
if let Some(lsps2_service_handler) = self.liquidity_manager.lsps2_service_handler() {
if let Err(e) = lsps2_service_handler
.htlc_intercepted(
intercept_scid,
intercept_id,
expected_outbound_amount_msat,
payment_hash,
)
.await
{
log_error!(
self.logger,
"LSPS2 service failed to handle HTLCIntercepted event: {:?}",
e
);
}
}
}
pub(crate) async fn handle_htlc_handling_failed(&self, failure_type: HTLCHandlingFailureType) {
if let Some(lsps2_service_handler) = self.liquidity_manager.lsps2_service_handler() {
if let Err(e) = lsps2_service_handler.htlc_handling_failed(failure_type).await {
log_error!(
self.logger,
"LSPS2 service failed to handle HTLCHandlingFailed event: {:?}",
e
);
}
}
}
pub(crate) async fn handle_payment_forwarded(
&self, next_channel_id: Option<ChannelId>, skimmed_fee_msat: u64,
) {
if let Some(next_channel_id) = next_channel_id {
if let Some(lsps2_service_handler) = self.liquidity_manager.lsps2_service_handler() {
if let Err(e) =
lsps2_service_handler.payment_forwarded(next_channel_id, skimmed_fee_msat).await
{
log_error!(
self.logger,
"LSPS2 service failed to handle PaymentForwarded: {:?}",
e
);
}
}
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct LSPS1OpeningParamsResponse {
supported_options: LSPS1Options,
}
#[derive(Debug, Clone)]
pub struct LSPS1OrderStatus {
pub order_id: LSPS1OrderId,
pub order_params: LSPS1OrderParams,
pub payment_options: LSPS1PaymentInfo,
pub channel_state: Option<LSPS1ChannelInfo>,
}
#[cfg(not(feature = "uniffi"))]
type LSPS1PaymentInfo = lightning_liquidity::lsps1::msgs::LSPS1PaymentInfo;
#[cfg(feature = "uniffi")]
type LSPS1PaymentInfo = crate::ffi::LSPS1PaymentInfo;
#[derive(Debug, Clone)]
pub(crate) struct LSPS2FeeResponse {
opening_fee_params_menu: Vec<LSPS2OpeningFeeParams>,
}
#[derive(Debug, Clone)]
pub(crate) struct LSPS2BuyResponse {
intercept_scid: u64,
cltv_expiry_delta: u32,
}
#[derive(Clone)]
pub struct LSPS1Liquidity {
runtime: Arc<Runtime>,
wallet: Arc<Wallet>,
connection_manager: Arc<ConnectionManager<Arc<Logger>>>,
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
logger: Arc<Logger>,
}
impl LSPS1Liquidity {
pub(crate) fn new(
runtime: Arc<Runtime>, wallet: Arc<Wallet>,
connection_manager: Arc<ConnectionManager<Arc<Logger>>>,
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>, logger: Arc<Logger>,
) -> Self {
Self { runtime, wallet, connection_manager, liquidity_source, logger }
}
pub fn request_channel(
&self, lsp_balance_sat: u64, client_balance_sat: u64, channel_expiry_blocks: u32,
announce_channel: bool,
) -> Result<LSPS1OrderStatus, Error> {
let liquidity_source =
self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?;
let (lsp_node_id, lsp_address) =
liquidity_source.get_lsps1_lsp_details().ok_or(Error::LiquiditySourceUnavailable)?;
let con_node_id = lsp_node_id;
let con_addr = lsp_address.clone();
let con_cm = Arc::clone(&self.connection_manager);
self.runtime.block_on(async move {
con_cm.connect_peer_if_necessary(con_node_id, con_addr).await
})?;
log_info!(self.logger, "Connected to LSP {}@{}. ", lsp_node_id, lsp_address);
let refund_address = self.wallet.get_new_address()?;
let liquidity_source = Arc::clone(&liquidity_source);
let response = self.runtime.block_on(async move {
liquidity_source
.lsps1_request_channel(
lsp_balance_sat,
client_balance_sat,
channel_expiry_blocks,
announce_channel,
refund_address,
)
.await
})?;
Ok(response)
}
pub fn check_order_status(&self, order_id: LSPS1OrderId) -> Result<LSPS1OrderStatus, Error> {
let liquidity_source =
self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?;
let (lsp_node_id, lsp_address) =
liquidity_source.get_lsps1_lsp_details().ok_or(Error::LiquiditySourceUnavailable)?;
let con_node_id = lsp_node_id;
let con_addr = lsp_address.clone();
let con_cm = Arc::clone(&self.connection_manager);
self.runtime.block_on(async move {
con_cm.connect_peer_if_necessary(con_node_id, con_addr).await
})?;
let liquidity_source = Arc::clone(&liquidity_source);
let response = self
.runtime
.block_on(async move { liquidity_source.lsps1_check_order_status(order_id).await })?;
Ok(response)
}
}