lightning_liquidity/lsps2/
service.rs

1// This file is Copyright its original authors, visible in version control
2// history.
3//
4// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7// You may not use this file except in accordance with one or both of these
8// licenses.
9
10//! Contains the main bLIP-52 / LSPS2 server-side object, [`LSPS2ServiceHandler`].
11
12use alloc::boxed::Box;
13use alloc::string::{String, ToString};
14use alloc::vec::Vec;
15use lightning::util::persist::KVStore;
16
17use core::cmp::Ordering as CmpOrdering;
18use core::future::Future as StdFuture;
19use core::ops::Deref;
20use core::sync::atomic::{AtomicUsize, Ordering};
21use core::task;
22
23use crate::events::EventQueue;
24use crate::lsps0::ser::{
25	LSPSMessage, LSPSProtocolMessageHandler, LSPSRequestId, LSPSResponseError,
26	JSONRPC_INTERNAL_ERROR_ERROR_CODE, JSONRPC_INTERNAL_ERROR_ERROR_MESSAGE,
27	LSPS0_CLIENT_REJECTED_ERROR_CODE,
28};
29use crate::lsps2::event::LSPS2ServiceEvent;
30use crate::lsps2::payment_queue::{InterceptedHTLC, PaymentQueue};
31use crate::lsps2::utils::{
32	compute_opening_fee, is_expired_opening_fee_params, is_valid_opening_fee_params,
33};
34use crate::message_queue::{MessageQueue, MessageQueueNotifierGuard};
35use crate::persist::{
36	LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
37};
38use crate::prelude::hash_map::Entry;
39use crate::prelude::{new_hash_map, HashMap};
40use crate::sync::{Arc, Mutex, MutexGuard, RwLock};
41use crate::utils::async_poll::dummy_waker;
42
43use lightning::chain::chaininterface::BroadcasterInterface;
44use lightning::events::HTLCHandlingFailureType;
45use lightning::ln::channelmanager::{AChannelManager, FailureCode, InterceptId};
46use lightning::ln::msgs::{ErrorAction, LightningError};
47use lightning::ln::types::ChannelId;
48use lightning::util::errors::APIError;
49use lightning::util::logger::Level;
50use lightning::util::ser::Writeable;
51use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum};
52
53use lightning_types::payment::PaymentHash;
54
55use bitcoin::secp256k1::PublicKey;
56use bitcoin::Transaction;
57
58use crate::lsps2::msgs::{
59	LSPS2BuyRequest, LSPS2BuyResponse, LSPS2GetInfoRequest, LSPS2GetInfoResponse, LSPS2Message,
60	LSPS2OpeningFeeParams, LSPS2RawOpeningFeeParams, LSPS2Request, LSPS2Response,
61	LSPS2_BUY_REQUEST_INVALID_OPENING_FEE_PARAMS_ERROR_CODE,
62	LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_LARGE_ERROR_CODE,
63	LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_SMALL_ERROR_CODE,
64	LSPS2_GET_INFO_REQUEST_UNRECOGNIZED_OR_STALE_TOKEN_ERROR_CODE,
65};
66
67const MAX_PENDING_REQUESTS_PER_PEER: usize = 10;
68const MAX_TOTAL_PENDING_REQUESTS: usize = 1000;
69const MAX_TOTAL_PEERS: usize = 100000;
70
71/// Server-side configuration options for JIT channels.
72#[derive(Clone, Debug)]
73pub struct LSPS2ServiceConfig {
74	/// Used to calculate the promise for channel parameters supplied to clients.
75	///
76	/// Note: If this changes then old promises given out will be considered invalid.
77	pub promise_secret: [u8; 32],
78}
79
80/// Information about the initial payment size and JIT channel opening fee.
81/// This will be provided in the `OpenChannel` event.
82#[derive(Clone, Debug, PartialEq)]
83struct OpenChannelParams {
84	opening_fee_msat: u64,
85	amt_to_forward_msat: u64,
86}
87
88/// A payment that will be forwarded while skimming the given JIT channel opening fee.
89#[derive(Clone, Debug, PartialEq)]
90struct FeePayment {
91	htlcs: Vec<InterceptedHTLC>,
92	opening_fee_msat: u64,
93}
94
95#[derive(Debug)]
96struct ChannelStateError(String);
97
98impl From<ChannelStateError> for LightningError {
99	fn from(value: ChannelStateError) -> Self {
100		LightningError { err: value.0, action: ErrorAction::IgnoreAndLog(Level::Info) }
101	}
102}
103
104/// Possible actions that need to be taken when an HTLC is intercepted.
105#[derive(Debug, PartialEq)]
106enum HTLCInterceptedAction {
107	/// The opening of the JIT channel.
108	OpenChannel(OpenChannelParams),
109	/// The forwarding of the intercepted HTLC.
110	ForwardHTLC(ChannelId),
111	ForwardPayment(ChannelId, FeePayment),
112}
113
114/// The forwarding of a payment while skimming the JIT channel opening fee.
115#[derive(Debug, PartialEq)]
116struct ForwardPaymentAction(ChannelId, FeePayment);
117
118/// The forwarding of previously intercepted HTLCs without skimming any further fees.
119#[derive(Debug, PartialEq)]
120struct ForwardHTLCsAction(ChannelId, Vec<InterceptedHTLC>);
121
122#[derive(Debug, Clone)]
123enum TrustModel {
124	ClientTrustsLsp { funding_tx_broadcast_safe: bool, funding_tx: Option<Transaction> },
125	LspTrustsClient,
126}
127
128impl TrustModel {
129	fn should_manually_broadcast(&self, state_is_payment_forward: bool) -> bool {
130		match self {
131			TrustModel::ClientTrustsLsp { funding_tx_broadcast_safe, funding_tx } => {
132				*funding_tx_broadcast_safe && state_is_payment_forward && funding_tx.is_some()
133			},
134			// in lsp-trusts-client, the broadcast is automatic, so we never need to manually broadcast.
135			TrustModel::LspTrustsClient => false,
136		}
137	}
138
139	fn new(client_trusts_lsp: bool) -> Self {
140		if client_trusts_lsp {
141			TrustModel::ClientTrustsLsp { funding_tx_broadcast_safe: false, funding_tx: None }
142		} else {
143			TrustModel::LspTrustsClient
144		}
145	}
146
147	fn set_funding_tx(&mut self, funding_tx: Transaction) {
148		match self {
149			TrustModel::ClientTrustsLsp { funding_tx: tx, .. } => {
150				*tx = Some(funding_tx);
151			},
152			TrustModel::LspTrustsClient => {
153				// No-op
154			},
155		}
156	}
157
158	fn set_funding_tx_broadcast_safe(&mut self, funding_tx_broadcast_safe: bool) {
159		match self {
160			TrustModel::ClientTrustsLsp { funding_tx_broadcast_safe: safe, .. } => {
161				*safe = funding_tx_broadcast_safe;
162			},
163			TrustModel::LspTrustsClient => {
164				// No-op
165			},
166		}
167	}
168
169	fn get_funding_tx(&self) -> Option<&Transaction> {
170		match self {
171			TrustModel::ClientTrustsLsp { funding_tx, .. } => funding_tx.as_ref(),
172			_ => None,
173		}
174	}
175
176	fn is_client_trusts_lsp(&self) -> bool {
177		match self {
178			TrustModel::ClientTrustsLsp { .. } => true,
179			TrustModel::LspTrustsClient => false,
180		}
181	}
182}
183
184impl_writeable_tlv_based_enum!(TrustModel,
185	(0, ClientTrustsLsp) => {
186		(0, funding_tx_broadcast_safe, required),
187		(2, funding_tx, option),
188	},
189	(2, LspTrustsClient) => {
190	},
191);
192
193/// The different states a requested JIT channel can be in.
194#[derive(Debug)]
195enum OutboundJITChannelState {
196	/// The JIT channel SCID was created after a buy request, and we are awaiting an initial payment
197	/// of sufficient size to open the channel.
198	PendingInitialPayment { payment_queue: PaymentQueue },
199	/// An initial payment of sufficient size was intercepted to the JIT channel SCID, triggering the
200	/// opening of the channel. We are awaiting the completion of the channel establishment.
201	PendingChannelOpen { payment_queue: PaymentQueue, opening_fee_msat: u64 },
202	/// The channel is open and a payment was forwarded while skimming the JIT channel fee.
203	/// No further payments can be forwarded until the pending payment succeeds or fails, as we need
204	/// to know whether the JIT channel fee needs to be skimmed from a next payment or not.
205	PendingPaymentForward {
206		payment_queue: PaymentQueue,
207		opening_fee_msat: u64,
208		channel_id: ChannelId,
209	},
210	/// The channel is open, no payment is currently being forwarded, and the JIT channel fee still
211	/// needs to be paid. This state can occur when the initial payment fails, e.g. due to a
212	/// prepayment probe. We are awaiting a next payment of sufficient size to forward and skim the
213	/// JIT channel fee.
214	PendingPayment { payment_queue: PaymentQueue, opening_fee_msat: u64, channel_id: ChannelId },
215	/// The channel is open and a payment was successfully forwarded while skimming the JIT channel
216	/// fee. Any subsequent HTLCs can be forwarded without additional logic.
217	PaymentForwarded { channel_id: ChannelId },
218}
219
220impl OutboundJITChannelState {
221	fn new() -> Self {
222		OutboundJITChannelState::PendingInitialPayment { payment_queue: PaymentQueue::new() }
223	}
224
225	fn htlc_intercepted(
226		&mut self, opening_fee_params: &LSPS2OpeningFeeParams, payment_size_msat: &Option<u64>,
227		htlc: InterceptedHTLC,
228	) -> Result<Option<HTLCInterceptedAction>, ChannelStateError> {
229		match self {
230			OutboundJITChannelState::PendingInitialPayment { payment_queue } => {
231				let (total_expected_outbound_amount_msat, num_htlcs) = payment_queue.add_htlc(htlc);
232
233				let (expected_payment_size_msat, mpp_mode) =
234					if let Some(payment_size_msat) = payment_size_msat {
235						(*payment_size_msat, true)
236					} else {
237						debug_assert_eq!(num_htlcs, 1);
238						if num_htlcs != 1 {
239							return Err(ChannelStateError(
240								"Paying via multiple HTLCs is disallowed in \"no-MPP+var-invoice\" mode.".to_string()
241							));
242						}
243						(total_expected_outbound_amount_msat, false)
244					};
245
246				if expected_payment_size_msat < opening_fee_params.min_payment_size_msat
247					|| expected_payment_size_msat > opening_fee_params.max_payment_size_msat
248				{
249					return Err(ChannelStateError(
250							format!("Payment size violates our limits: expected_payment_size_msat = {}, min_payment_size_msat = {}, max_payment_size_msat = {}",
251									expected_payment_size_msat,
252									opening_fee_params.min_payment_size_msat,
253									opening_fee_params.max_payment_size_msat
254							)));
255				}
256
257				let opening_fee_msat = compute_opening_fee(
258					expected_payment_size_msat,
259					opening_fee_params.min_fee_msat,
260					opening_fee_params.proportional.into(),
261				).ok_or(ChannelStateError(
262					format!("Could not compute valid opening fee with min_fee_msat = {}, proportional = {}, and expected_payment_size_msat = {}",
263						opening_fee_params.min_fee_msat,
264						opening_fee_params.proportional,
265						expected_payment_size_msat
266					))
267				)?;
268
269				let amt_to_forward_msat =
270					expected_payment_size_msat.saturating_sub(opening_fee_msat);
271
272				// Go ahead and open the channel if we intercepted sufficient HTLCs.
273				if total_expected_outbound_amount_msat >= expected_payment_size_msat
274					&& amt_to_forward_msat > 0
275				{
276					*self = OutboundJITChannelState::PendingChannelOpen {
277						payment_queue: core::mem::take(payment_queue),
278						opening_fee_msat,
279					};
280					let open_channel = HTLCInterceptedAction::OpenChannel(OpenChannelParams {
281						opening_fee_msat,
282						amt_to_forward_msat,
283					});
284					Ok(Some(open_channel))
285				} else {
286					if mpp_mode {
287						*self = OutboundJITChannelState::PendingInitialPayment {
288							payment_queue: core::mem::take(payment_queue),
289						};
290						Ok(None)
291					} else {
292						Err(ChannelStateError(
293							"Intercepted HTLC is too small to pay opening fee".to_string(),
294						))
295					}
296				}
297			},
298			OutboundJITChannelState::PendingChannelOpen { payment_queue, opening_fee_msat } => {
299				let mut payment_queue = core::mem::take(payment_queue);
300				payment_queue.add_htlc(htlc);
301				*self = OutboundJITChannelState::PendingChannelOpen {
302					payment_queue,
303					opening_fee_msat: *opening_fee_msat,
304				};
305				Ok(None)
306			},
307			OutboundJITChannelState::PendingPaymentForward {
308				payment_queue,
309				opening_fee_msat,
310				channel_id,
311			} => {
312				let mut payment_queue = core::mem::take(payment_queue);
313				payment_queue.add_htlc(htlc);
314				*self = OutboundJITChannelState::PendingPaymentForward {
315					payment_queue,
316					opening_fee_msat: *opening_fee_msat,
317					channel_id: *channel_id,
318				};
319				Ok(None)
320			},
321			OutboundJITChannelState::PendingPayment {
322				payment_queue,
323				opening_fee_msat,
324				channel_id,
325			} => {
326				let mut payment_queue = core::mem::take(payment_queue);
327				payment_queue.add_htlc(htlc);
328				if let Some(entry) = payment_queue.pop_greater_than_msat(*opening_fee_msat) {
329					let forward_payment = HTLCInterceptedAction::ForwardPayment(
330						*channel_id,
331						FeePayment { htlcs: entry.htlcs, opening_fee_msat: *opening_fee_msat },
332					);
333					*self = OutboundJITChannelState::PendingPaymentForward {
334						payment_queue,
335						opening_fee_msat: *opening_fee_msat,
336						channel_id: *channel_id,
337					};
338					Ok(Some(forward_payment))
339				} else {
340					*self = OutboundJITChannelState::PendingPayment {
341						payment_queue,
342						opening_fee_msat: *opening_fee_msat,
343						channel_id: *channel_id,
344					};
345					Ok(None)
346				}
347			},
348			OutboundJITChannelState::PaymentForwarded { channel_id } => {
349				let forward = HTLCInterceptedAction::ForwardHTLC(*channel_id);
350				*self = OutboundJITChannelState::PaymentForwarded { channel_id: *channel_id };
351				Ok(Some(forward))
352			},
353		}
354	}
355
356	fn channel_ready(
357		&mut self, channel_id: ChannelId,
358	) -> Result<ForwardPaymentAction, ChannelStateError> {
359		match self {
360			OutboundJITChannelState::PendingChannelOpen { payment_queue, opening_fee_msat } => {
361				if let Some(entry) = payment_queue.pop_greater_than_msat(*opening_fee_msat) {
362					let forward_payment = ForwardPaymentAction(
363						channel_id,
364						FeePayment { htlcs: entry.htlcs, opening_fee_msat: *opening_fee_msat },
365					);
366					*self = OutboundJITChannelState::PendingPaymentForward {
367						payment_queue: core::mem::take(payment_queue),
368						opening_fee_msat: *opening_fee_msat,
369						channel_id,
370					};
371					Ok(forward_payment)
372				} else {
373					return Err(ChannelStateError(
374						"No forwardable payment available when moving to channel ready."
375							.to_string(),
376					));
377				}
378			},
379			state => Err(ChannelStateError(format!(
380				"Channel ready received when JIT Channel was in state: {:?}",
381				state
382			))),
383		}
384	}
385
386	fn htlc_handling_failed(&mut self) -> Result<Option<ForwardPaymentAction>, ChannelStateError> {
387		match self {
388			OutboundJITChannelState::PendingPaymentForward {
389				payment_queue,
390				opening_fee_msat,
391				channel_id,
392			} => {
393				if let Some(entry) = payment_queue.pop_greater_than_msat(*opening_fee_msat) {
394					let forward_payment = ForwardPaymentAction(
395						*channel_id,
396						FeePayment { htlcs: entry.htlcs, opening_fee_msat: *opening_fee_msat },
397					);
398					*self = OutboundJITChannelState::PendingPaymentForward {
399						payment_queue: core::mem::take(payment_queue),
400						opening_fee_msat: *opening_fee_msat,
401						channel_id: *channel_id,
402					};
403					Ok(Some(forward_payment))
404				} else {
405					*self = OutboundJITChannelState::PendingPayment {
406						payment_queue: core::mem::take(payment_queue),
407						opening_fee_msat: *opening_fee_msat,
408						channel_id: *channel_id,
409					};
410					Ok(None)
411				}
412			},
413			OutboundJITChannelState::PendingPayment {
414				payment_queue,
415				opening_fee_msat,
416				channel_id,
417			} => {
418				*self = OutboundJITChannelState::PendingPayment {
419					payment_queue: core::mem::take(payment_queue),
420					opening_fee_msat: *opening_fee_msat,
421					channel_id: *channel_id,
422				};
423				Ok(None)
424			},
425			OutboundJITChannelState::PaymentForwarded { channel_id } => {
426				*self = OutboundJITChannelState::PaymentForwarded { channel_id: *channel_id };
427				Ok(None)
428			},
429			state => Err(ChannelStateError(format!(
430				"HTLC handling failed when JIT Channel was in state: {:?}",
431				state
432			))),
433		}
434	}
435
436	fn payment_forwarded(
437		&mut self, skimmed_fee_msat: u64,
438	) -> Result<Option<ForwardHTLCsAction>, ChannelStateError> {
439		match self {
440			OutboundJITChannelState::PendingPaymentForward {
441				payment_queue,
442				channel_id,
443				opening_fee_msat,
444			} => {
445				if skimmed_fee_msat >= *opening_fee_msat {
446					let mut pq = core::mem::take(payment_queue);
447					let forward_htlcs = ForwardHTLCsAction(*channel_id, pq.clear());
448					*self = OutboundJITChannelState::PaymentForwarded { channel_id: *channel_id };
449					Ok(Some(forward_htlcs))
450				} else {
451					*self = OutboundJITChannelState::PendingPaymentForward {
452						payment_queue: core::mem::take(payment_queue),
453						opening_fee_msat: *opening_fee_msat,
454						channel_id: *channel_id,
455					};
456					Ok(None)
457				}
458			},
459			OutboundJITChannelState::PaymentForwarded { channel_id } => {
460				*self = OutboundJITChannelState::PaymentForwarded { channel_id: *channel_id };
461				Ok(None)
462			},
463			state => Err(ChannelStateError(format!(
464				"Payment forwarded when JIT Channel was in state: {:?}",
465				state
466			))),
467		}
468	}
469}
470
471impl_writeable_tlv_based_enum!(OutboundJITChannelState,
472	(0, PendingInitialPayment) => {
473		(0, payment_queue, required),
474	},
475	(2, PendingChannelOpen) => {
476		(0, payment_queue, required),
477		(2, opening_fee_msat, required),
478	},
479	(4, PendingPaymentForward) => {
480		(0, payment_queue, required),
481		(2, opening_fee_msat, required),
482		(4, channel_id, required),
483	},
484	(6, PendingPayment) => {
485		(0, payment_queue, required),
486		(2, opening_fee_msat, required),
487		(4, channel_id, required),
488	},
489	(8, PaymentForwarded) => {
490		(0, channel_id, required),
491	},
492);
493
494struct OutboundJITChannel {
495	state: OutboundJITChannelState,
496	user_channel_id: u128,
497	opening_fee_params: LSPS2OpeningFeeParams,
498	payment_size_msat: Option<u64>,
499	trust_model: TrustModel,
500}
501
502impl_writeable_tlv_based!(OutboundJITChannel, {
503	(0, state, required),
504	(2, user_channel_id, required),
505	(4, opening_fee_params, required),
506	(6, payment_size_msat, option),
507	(8, trust_model, required),
508});
509
510impl OutboundJITChannel {
511	fn new(
512		payment_size_msat: Option<u64>, opening_fee_params: LSPS2OpeningFeeParams,
513		user_channel_id: u128, client_trusts_lsp: bool,
514	) -> Self {
515		Self {
516			user_channel_id,
517			state: OutboundJITChannelState::new(),
518			opening_fee_params,
519			payment_size_msat,
520			trust_model: TrustModel::new(client_trusts_lsp),
521		}
522	}
523
524	fn htlc_intercepted(
525		&mut self, htlc: InterceptedHTLC,
526	) -> Result<Option<HTLCInterceptedAction>, LightningError> {
527		let action =
528			self.state.htlc_intercepted(&self.opening_fee_params, &self.payment_size_msat, htlc)?;
529		Ok(action)
530	}
531
532	fn htlc_handling_failed(&mut self) -> Result<Option<ForwardPaymentAction>, LightningError> {
533		let action = self.state.htlc_handling_failed()?;
534		Ok(action)
535	}
536
537	fn channel_ready(
538		&mut self, channel_id: ChannelId,
539	) -> Result<ForwardPaymentAction, LightningError> {
540		let action = self.state.channel_ready(channel_id)?;
541		Ok(action)
542	}
543
544	fn payment_forwarded(
545		&mut self, skimmed_fee_msat: u64,
546	) -> Result<Option<ForwardHTLCsAction>, LightningError> {
547		let action = self.state.payment_forwarded(skimmed_fee_msat)?;
548		Ok(action)
549	}
550
551	fn is_pending_initial_payment(&self) -> bool {
552		matches!(self.state, OutboundJITChannelState::PendingInitialPayment { .. })
553	}
554
555	fn is_prunable(&self) -> bool {
556		// We deem an OutboundJITChannel prunable if our offer expired and we haven't intercepted
557		// any HTLCs initiating the flow yet.
558		let is_expired = is_expired_opening_fee_params(&self.opening_fee_params);
559		self.is_pending_initial_payment() && is_expired
560	}
561
562	fn set_funding_tx(&mut self, funding_tx: Transaction) {
563		self.trust_model.set_funding_tx(funding_tx);
564	}
565
566	fn set_funding_tx_broadcast_safe(&mut self, funding_tx_broadcast_safe: bool) {
567		self.trust_model.set_funding_tx_broadcast_safe(funding_tx_broadcast_safe);
568	}
569
570	fn should_broadcast_funding_transaction(&self) -> bool {
571		self.trust_model.should_manually_broadcast(matches!(
572			self.state,
573			OutboundJITChannelState::PaymentForwarded { .. }
574		))
575	}
576
577	fn get_channel_id(&self) -> Option<ChannelId> {
578		match self.state {
579			OutboundJITChannelState::PaymentForwarded { channel_id } => Some(channel_id),
580			_ => None,
581		}
582	}
583
584	fn get_funding_tx(&self) -> Option<&Transaction> {
585		self.trust_model.get_funding_tx()
586	}
587
588	fn is_client_trusts_lsp(&self) -> bool {
589		self.trust_model.is_client_trusts_lsp()
590	}
591}
592
593pub(crate) struct PeerState {
594	outbound_channels_by_intercept_scid: HashMap<u64, OutboundJITChannel>,
595	intercept_scid_by_user_channel_id: HashMap<u128, u64>,
596	intercept_scid_by_channel_id: HashMap<ChannelId, u64>,
597	pending_requests: HashMap<LSPSRequestId, LSPS2Request>,
598	needs_persist: bool,
599}
600
601impl PeerState {
602	fn new() -> Self {
603		let outbound_channels_by_intercept_scid = new_hash_map();
604		let pending_requests = new_hash_map();
605		let intercept_scid_by_user_channel_id = new_hash_map();
606		let intercept_scid_by_channel_id = new_hash_map();
607		let needs_persist = false;
608		Self {
609			outbound_channels_by_intercept_scid,
610			pending_requests,
611			intercept_scid_by_user_channel_id,
612			intercept_scid_by_channel_id,
613			needs_persist,
614		}
615	}
616
617	fn insert_outbound_channel(&mut self, intercept_scid: u64, channel: OutboundJITChannel) {
618		self.outbound_channels_by_intercept_scid.insert(intercept_scid, channel);
619		self.needs_persist |= true;
620	}
621
622	fn prune_expired_request_state(&mut self) {
623		self.pending_requests.retain(|_, entry| {
624			match entry {
625				LSPS2Request::GetInfo(_) => false,
626				LSPS2Request::Buy(request) => {
627					// Prune any expired buy requests.
628					!is_expired_opening_fee_params(&request.opening_fee_params)
629				},
630			}
631		});
632
633		self.outbound_channels_by_intercept_scid.retain(|intercept_scid, entry| {
634			if entry.is_prunable() {
635				// We abort the flow, and prune any data kept.
636				self.intercept_scid_by_channel_id.retain(|_, iscid| intercept_scid != iscid);
637				self.intercept_scid_by_user_channel_id.retain(|_, iscid| intercept_scid != iscid);
638				self.needs_persist |= true;
639				return false;
640			}
641			true
642		});
643	}
644
645	fn pending_requests_and_channels(&self) -> usize {
646		let pending_requests = self.pending_requests.len();
647		let pending_outbound_channels = self
648			.outbound_channels_by_intercept_scid
649			.iter()
650			.filter(|(_, v)| v.is_pending_initial_payment())
651			.count();
652		pending_requests + pending_outbound_channels
653	}
654
655	fn is_prunable(&self) -> bool {
656		// Return whether the entire state is empty.
657		self.pending_requests.is_empty() && self.outbound_channels_by_intercept_scid.is_empty()
658	}
659}
660
661impl_writeable_tlv_based!(PeerState, {
662	(0, outbound_channels_by_intercept_scid, required),
663	(2, intercept_scid_by_user_channel_id, required),
664	(4, intercept_scid_by_channel_id, required),
665	(_unused, pending_requests, (static_value, new_hash_map())),
666	(_unused, needs_persist, (static_value, false)),
667});
668
669macro_rules! get_or_insert_peer_state_entry {
670	($self: ident, $outer_state_lock: expr, $message_queue_notifier: expr, $counterparty_node_id: expr) => {{
671		// Return an internal error and abort if we hit the maximum allowed number of total peers.
672		let is_limited_by_max_total_peers = $outer_state_lock.len() >= MAX_TOTAL_PEERS;
673		match $outer_state_lock.entry(*$counterparty_node_id) {
674			Entry::Vacant(e) => {
675				if is_limited_by_max_total_peers {
676					let error_response = LSPSResponseError {
677						code: JSONRPC_INTERNAL_ERROR_ERROR_CODE,
678						message: JSONRPC_INTERNAL_ERROR_ERROR_MESSAGE.to_string(), data: None,
679					};
680
681					let msg = LSPSMessage::Invalid(error_response);
682					$message_queue_notifier.enqueue($counterparty_node_id, msg);
683
684					let err = format!(
685						"Dropping request from peer {} due to reaching maximally allowed number of total peers: {}",
686						$counterparty_node_id, MAX_TOTAL_PEERS
687					);
688
689					return Err(LightningError { err, action: ErrorAction::IgnoreAndLog(Level::Error) });
690				} else {
691					e.insert(Mutex::new(PeerState::new()))
692				}
693			}
694			Entry::Occupied(e) => {
695				e.into_mut()
696			}
697		}
698
699	}}
700}
701
702/// The main object allowing to send and receive bLIP-52 / LSPS2 messages.
703pub struct LSPS2ServiceHandler<CM: Deref, K: Deref + Clone, T: Deref>
704where
705	CM::Target: AChannelManager,
706	K::Target: KVStore,
707	T::Target: BroadcasterInterface,
708{
709	channel_manager: CM,
710	kv_store: K,
711	tx_broadcaster: T,
712	pending_messages: Arc<MessageQueue>,
713	pending_events: Arc<EventQueue<K>>,
714	per_peer_state: RwLock<HashMap<PublicKey, Mutex<PeerState>>>,
715	peer_by_intercept_scid: RwLock<HashMap<u64, PublicKey>>,
716	peer_by_channel_id: RwLock<HashMap<ChannelId, PublicKey>>,
717	total_pending_requests: AtomicUsize,
718	config: LSPS2ServiceConfig,
719	persistence_in_flight: AtomicUsize,
720}
721
722impl<CM: Deref, K: Deref + Clone, T: Deref + Clone> LSPS2ServiceHandler<CM, K, T>
723where
724	CM::Target: AChannelManager,
725	K::Target: KVStore,
726	T::Target: BroadcasterInterface,
727{
728	/// Constructs a `LSPS2ServiceHandler`.
729	pub(crate) fn new(
730		per_peer_state: HashMap<PublicKey, Mutex<PeerState>>, pending_messages: Arc<MessageQueue>,
731		pending_events: Arc<EventQueue<K>>, channel_manager: CM, kv_store: K, tx_broadcaster: T,
732		config: LSPS2ServiceConfig,
733	) -> Result<Self, lightning::io::Error> {
734		let mut peer_by_intercept_scid = new_hash_map();
735		let mut peer_by_channel_id = new_hash_map();
736		for (node_id, peer_state) in per_peer_state.iter() {
737			let peer_state_lock = peer_state.lock().unwrap();
738			for (intercept_scid, _) in peer_state_lock.outbound_channels_by_intercept_scid.iter() {
739				let res = peer_by_intercept_scid.insert(*intercept_scid, *node_id);
740				debug_assert!(res.is_none(), "Intercept SCIDs should never collide");
741				if res.is_some() {
742					return Err(lightning::io::Error::new(
743						lightning::io::ErrorKind::InvalidData,
744						"Failed to read LSPS2 peer state due to data inconsistencies: Intercept SCIDs should never collide",
745					));
746				}
747			}
748
749			for (channel_id, _) in peer_state_lock.intercept_scid_by_channel_id.iter() {
750				let res = peer_by_channel_id.insert(*channel_id, *node_id);
751				debug_assert!(res.is_none(), "Channel IDs should never collide");
752				if res.is_some() {
753					return Err(lightning::io::Error::new(
754							lightning::io::ErrorKind::InvalidData,
755							"Failed to read LSPS2 peer state due to data inconsistencies: Channel IDs should never collide",
756					));
757				}
758			}
759		}
760
761		Ok(Self {
762			pending_messages,
763			pending_events,
764			per_peer_state: RwLock::new(per_peer_state),
765			peer_by_intercept_scid: RwLock::new(peer_by_intercept_scid),
766			peer_by_channel_id: RwLock::new(peer_by_channel_id),
767			total_pending_requests: AtomicUsize::new(0),
768			persistence_in_flight: AtomicUsize::new(0),
769			channel_manager,
770			kv_store,
771			tx_broadcaster,
772			config,
773		})
774	}
775
776	/// Returns a reference to the used config.
777	pub fn config(&self) -> &LSPS2ServiceConfig {
778		&self.config
779	}
780
781	/// Returns whether the peer has any active LSPS2 requests.
782	pub(crate) fn has_active_requests(&self, counterparty_node_id: &PublicKey) -> bool {
783		let outer_state_lock = self.per_peer_state.read().unwrap();
784		outer_state_lock.get(counterparty_node_id).map_or(false, |inner| {
785			let peer_state = inner.lock().unwrap();
786			!peer_state.outbound_channels_by_intercept_scid.is_empty()
787		})
788	}
789
790	/// Used by LSP to inform a client requesting a JIT Channel the token they used is invalid.
791	///
792	/// Should be called in response to receiving a [`LSPS2ServiceEvent::GetInfo`] event.
793	///
794	/// [`LSPS2ServiceEvent::GetInfo`]: crate::lsps2::event::LSPS2ServiceEvent::GetInfo
795	pub fn invalid_token_provided(
796		&self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId,
797	) -> Result<(), APIError> {
798		let mut message_queue_notifier = self.pending_messages.notifier();
799
800		let outer_state_lock = self.per_peer_state.read().unwrap();
801
802		match outer_state_lock.get(counterparty_node_id) {
803			Some(inner_state_lock) => {
804				let mut peer_state_lock = inner_state_lock.lock().unwrap();
805
806				match self.remove_pending_request(&mut peer_state_lock, &request_id) {
807					Some(LSPS2Request::GetInfo(_)) => {
808						let response = LSPS2Response::GetInfoError(LSPSResponseError {
809							code: LSPS2_GET_INFO_REQUEST_UNRECOGNIZED_OR_STALE_TOKEN_ERROR_CODE,
810							message: "an unrecognized or stale token was provided".to_string(),
811							data: None,
812						});
813						let msg = LSPS2Message::Response(request_id, response).into();
814						message_queue_notifier.enqueue(counterparty_node_id, msg);
815						Ok(())
816					},
817					_ => Err(APIError::APIMisuseError {
818						err: format!(
819							"No pending get_info request for request_id: {:?}",
820							request_id
821						),
822					}),
823				}
824			},
825			None => Err(APIError::APIMisuseError {
826				err: format!("No state for the counterparty exists: {}", counterparty_node_id),
827			}),
828		}
829	}
830
831	/// Used by LSP to provide fee parameters to a client requesting a JIT Channel.
832	///
833	/// Should be called in response to receiving a [`LSPS2ServiceEvent::GetInfo`] event.
834	///
835	/// [`LSPS2ServiceEvent::GetInfo`]: crate::lsps2::event::LSPS2ServiceEvent::GetInfo
836	pub fn opening_fee_params_generated(
837		&self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId,
838		opening_fee_params_menu: Vec<LSPS2RawOpeningFeeParams>,
839	) -> Result<(), APIError> {
840		let mut message_queue_notifier = self.pending_messages.notifier();
841
842		let outer_state_lock = self.per_peer_state.read().unwrap();
843
844		match outer_state_lock.get(counterparty_node_id) {
845			Some(inner_state_lock) => {
846				let mut peer_state_lock = inner_state_lock.lock().unwrap();
847
848				match self.remove_pending_request(&mut peer_state_lock, &request_id) {
849					Some(LSPS2Request::GetInfo(_)) => {
850						let mut opening_fee_params_menu: Vec<LSPS2OpeningFeeParams> =
851							opening_fee_params_menu
852								.into_iter()
853								.map(|param| {
854									param.into_opening_fee_params(
855										&self.config.promise_secret,
856										counterparty_node_id,
857									)
858								})
859								.collect();
860						opening_fee_params_menu.sort_by(|a, b| {
861							match a.min_fee_msat.cmp(&b.min_fee_msat) {
862								CmpOrdering::Equal => a.proportional.cmp(&b.proportional),
863								other => other,
864							}
865						});
866						let response = LSPS2Response::GetInfo(LSPS2GetInfoResponse {
867							opening_fee_params_menu,
868						});
869						let msg = LSPS2Message::Response(request_id, response).into();
870						message_queue_notifier.enqueue(counterparty_node_id, msg);
871						Ok(())
872					},
873					_ => Err(APIError::APIMisuseError {
874						err: format!(
875							"No pending get_info request for request_id: {:?}",
876							request_id
877						),
878					}),
879				}
880			},
881			None => Err(APIError::APIMisuseError {
882				err: format!("No state for the counterparty exists: {}", counterparty_node_id),
883			}),
884		}
885	}
886
887	/// Used by LSP to provide the client with the intercept scid, a unique `user_channel_id`, and
888	/// `cltv_expiry_delta` to include in their invoice.
889	///
890	/// The intercept scid must be retrieved from [`ChannelManager::get_intercept_scid`]. The given
891	/// `user_channel_id` must be locally unique and will eventually be returned via events to be
892	/// used when opening the channel via [`ChannelManager::create_channel`]. Note implementors
893	/// will need to ensure their calls to [`ChannelManager::create_channel`] are idempotent based
894	/// on this identifier.
895	///
896	/// Should be called in response to receiving a [`LSPS2ServiceEvent::BuyRequest`] event.
897	///
898	/// `client_trusts_lsp`:
899	/// * false (default) => "LSP trusts client": LSP broadcasts the funding
900	///   transaction as soon as it is safe and forwards the payment normally.
901	/// * true => "Client trusts LSP": LSP may defer broadcasting the funding
902	///   transaction until after the client claims the forwarded HTLC(s).
903	///
904	/// [`ChannelManager::create_channel`]: lightning::ln::channelmanager::ChannelManager::create_channel
905	/// [`ChannelManager::get_intercept_scid`]: lightning::ln::channelmanager::ChannelManager::get_intercept_scid
906	/// [`LSPS2ServiceEvent::BuyRequest`]: crate::lsps2::event::LSPS2ServiceEvent::BuyRequest
907	pub async fn invoice_parameters_generated(
908		&self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId, intercept_scid: u64,
909		cltv_expiry_delta: u32, client_trusts_lsp: bool, user_channel_id: u128,
910	) -> Result<(), APIError> {
911		let mut message_queue_notifier = self.pending_messages.notifier();
912		let mut should_persist = false;
913
914		match self.per_peer_state.read().unwrap().get(counterparty_node_id) {
915			Some(inner_state_lock) => {
916				let mut peer_state_lock = inner_state_lock.lock().unwrap();
917
918				match self.remove_pending_request(&mut peer_state_lock, &request_id) {
919					Some(LSPS2Request::Buy(buy_request)) => {
920						{
921							let mut peer_by_intercept_scid =
922								self.peer_by_intercept_scid.write().unwrap();
923							peer_by_intercept_scid.insert(intercept_scid, *counterparty_node_id);
924						}
925
926						let outbound_jit_channel = OutboundJITChannel::new(
927							buy_request.payment_size_msat,
928							buy_request.opening_fee_params,
929							user_channel_id,
930							client_trusts_lsp,
931						);
932
933						peer_state_lock
934							.intercept_scid_by_user_channel_id
935							.insert(user_channel_id, intercept_scid);
936						peer_state_lock
937							.insert_outbound_channel(intercept_scid, outbound_jit_channel);
938						should_persist |= peer_state_lock.needs_persist;
939
940						let response = LSPS2Response::Buy(LSPS2BuyResponse {
941							jit_channel_scid: intercept_scid.into(),
942							lsp_cltv_expiry_delta: cltv_expiry_delta,
943							client_trusts_lsp,
944						});
945						let msg = LSPS2Message::Response(request_id, response).into();
946						message_queue_notifier.enqueue(counterparty_node_id, msg);
947					},
948					_ => {
949						return Err(APIError::APIMisuseError {
950							err: format!("No pending buy request for request_id: {:?}", request_id),
951						})
952					},
953				}
954			},
955			None => {
956				return Err(APIError::APIMisuseError {
957					err: format!("No state for the counterparty exists: {}", counterparty_node_id),
958				})
959			},
960		};
961
962		if should_persist {
963			self.persist_peer_state(*counterparty_node_id).await.map_err(|e| {
964				APIError::APIMisuseError {
965					err: format!(
966						"Failed to persist peer state for {}: {}",
967						counterparty_node_id, e
968					),
969				}
970			})?;
971		}
972
973		Ok(())
974	}
975
976	/// Forward [`Event::HTLCIntercepted`] event parameters into this function.
977	///
978	/// Will fail the intercepted HTLC if the intercept scid matches a payment we are expecting
979	/// but the payment amount is incorrect or the expiry has passed.
980	///
981	/// Will generate a [`LSPS2ServiceEvent::OpenChannel`] event if the intercept scid matches a payment we are expected
982	/// and the payment amount is correct and the offer has not expired.
983	///
984	/// Will do nothing if the intercept scid does not match any of the ones we gave out.
985	///
986	/// [`Event::HTLCIntercepted`]: lightning::events::Event::HTLCIntercepted
987	/// [`LSPS2ServiceEvent::OpenChannel`]: crate::lsps2::event::LSPS2ServiceEvent::OpenChannel
988	pub async fn htlc_intercepted(
989		&self, intercept_scid: u64, intercept_id: InterceptId, expected_outbound_amount_msat: u64,
990		payment_hash: PaymentHash,
991	) -> Result<(), APIError> {
992		let event_queue_notifier = self.pending_events.notifier();
993		let mut should_persist = None;
994
995		if let Some(counterparty_node_id) =
996			self.peer_by_intercept_scid.read().unwrap().get(&intercept_scid)
997		{
998			let outer_state_lock = self.per_peer_state.read().unwrap();
999			match outer_state_lock.get(counterparty_node_id) {
1000				Some(inner_state_lock) => {
1001					let mut peer_state = inner_state_lock.lock().unwrap();
1002					if let Some(jit_channel) =
1003						peer_state.outbound_channels_by_intercept_scid.get_mut(&intercept_scid)
1004					{
1005						should_persist = Some(*counterparty_node_id);
1006						let htlc = InterceptedHTLC {
1007							intercept_id,
1008							expected_outbound_amount_msat,
1009							payment_hash,
1010						};
1011						match jit_channel.htlc_intercepted(htlc) {
1012							Ok(Some(HTLCInterceptedAction::OpenChannel(open_channel_params))) => {
1013								let event = LSPS2ServiceEvent::OpenChannel {
1014									their_network_key: counterparty_node_id.clone(),
1015									amt_to_forward_msat: open_channel_params.amt_to_forward_msat,
1016									opening_fee_msat: open_channel_params.opening_fee_msat,
1017									user_channel_id: jit_channel.user_channel_id,
1018									intercept_scid,
1019								};
1020								event_queue_notifier.enqueue(event);
1021							},
1022							Ok(Some(HTLCInterceptedAction::ForwardHTLC(channel_id))) => {
1023								self.channel_manager.get_cm().forward_intercepted_htlc(
1024									intercept_id,
1025									&channel_id,
1026									*counterparty_node_id,
1027									expected_outbound_amount_msat,
1028								)?;
1029							},
1030							Ok(Some(HTLCInterceptedAction::ForwardPayment(
1031								channel_id,
1032								FeePayment { opening_fee_msat, htlcs },
1033							))) => {
1034								let amounts_to_forward_msat =
1035									calculate_amount_to_forward_per_htlc(&htlcs, opening_fee_msat);
1036
1037								for (intercept_id, amount_to_forward_msat) in
1038									amounts_to_forward_msat
1039								{
1040									self.channel_manager.get_cm().forward_intercepted_htlc(
1041										intercept_id,
1042										&channel_id,
1043										*counterparty_node_id,
1044										amount_to_forward_msat,
1045									)?;
1046								}
1047							},
1048							Ok(None) => {},
1049							Err(e) => {
1050								self.channel_manager
1051									.get_cm()
1052									.fail_intercepted_htlc(intercept_id)?;
1053								peer_state
1054									.outbound_channels_by_intercept_scid
1055									.remove(&intercept_scid);
1056								// TODO: cleanup peer_by_intercept_scid
1057								return Err(APIError::APIMisuseError { err: e.err });
1058							},
1059						}
1060					}
1061
1062					peer_state.needs_persist |= should_persist.is_some();
1063				},
1064				None => {
1065					return Err(APIError::APIMisuseError {
1066						err: format!("No counterparty found for scid: {}", intercept_scid),
1067					});
1068				},
1069			}
1070		}
1071
1072		if let Some(counterparty_node_id) = should_persist {
1073			self.persist_peer_state(counterparty_node_id).await.map_err(|e| {
1074				APIError::APIMisuseError {
1075					err: format!(
1076						"Failed to persist peer state for {}: {}",
1077						counterparty_node_id, e
1078					),
1079				}
1080			})?;
1081		}
1082
1083		Ok(())
1084	}
1085
1086	/// Forward [`Event::HTLCHandlingFailed`] event parameter into this function.
1087	///
1088	/// Will attempt to forward the next payment in the queue if one is present.
1089	/// Will do nothing if the intercept scid does not match any of the ones we gave out
1090	/// or if the payment queue is empty
1091	///
1092	/// [`Event::HTLCHandlingFailed`]: lightning::events::Event::HTLCHandlingFailed
1093	pub async fn htlc_handling_failed(
1094		&self, failure_type: HTLCHandlingFailureType,
1095	) -> Result<(), APIError> {
1096		let mut should_persist = None;
1097		if let HTLCHandlingFailureType::Forward { channel_id, .. } = failure_type {
1098			let peer_by_channel_id = self.peer_by_channel_id.read().unwrap();
1099			if let Some(counterparty_node_id) = peer_by_channel_id.get(&channel_id) {
1100				let outer_state_lock = self.per_peer_state.read().unwrap();
1101				match outer_state_lock.get(counterparty_node_id) {
1102					Some(inner_state_lock) => {
1103						let mut peer_state = inner_state_lock.lock().unwrap();
1104						if let Some(intercept_scid) =
1105							peer_state.intercept_scid_by_channel_id.get(&channel_id).copied()
1106						{
1107							should_persist = Some(*counterparty_node_id);
1108
1109							if let Some(jit_channel) = peer_state
1110								.outbound_channels_by_intercept_scid
1111								.get_mut(&intercept_scid)
1112							{
1113								match jit_channel.htlc_handling_failed() {
1114									Ok(Some(ForwardPaymentAction(
1115										channel_id,
1116										FeePayment { opening_fee_msat, htlcs },
1117									))) => {
1118										let amounts_to_forward_msat =
1119											calculate_amount_to_forward_per_htlc(
1120												&htlcs,
1121												opening_fee_msat,
1122											);
1123
1124										for (intercept_id, amount_to_forward_msat) in
1125											amounts_to_forward_msat
1126										{
1127											self.channel_manager
1128												.get_cm()
1129												.forward_intercepted_htlc(
1130													intercept_id,
1131													&channel_id,
1132													*counterparty_node_id,
1133													amount_to_forward_msat,
1134												)?;
1135										}
1136									},
1137									Ok(None) => {},
1138									Err(e) => {
1139										return Err(APIError::APIMisuseError {
1140											err: format!("Unable to fail HTLC: {}.", e.err),
1141										});
1142									},
1143								}
1144							}
1145						}
1146						peer_state.needs_persist |= should_persist.is_some();
1147					},
1148					None => {},
1149				}
1150			}
1151		}
1152
1153		if let Some(counterparty_node_id) = should_persist {
1154			self.persist_peer_state(counterparty_node_id).await.map_err(|e| {
1155				APIError::APIMisuseError {
1156					err: format!(
1157						"Failed to persist peer state for {}: {}",
1158						counterparty_node_id, e
1159					),
1160				}
1161			})?;
1162		}
1163
1164		Ok(())
1165	}
1166
1167	/// Forward [`Event::PaymentForwarded`] event parameter into this function.
1168	///
1169	/// Will register the forwarded payment as having paid the JIT channel fee, and forward any held
1170	/// and future HTLCs for the SCID of the initial invoice.
1171	///
1172	/// When the reported skimmed fee equals or exceeds the promised opening fee, any HTLCs that
1173	/// were being held for that JIT channel are forwarded. In a `client_trusts_lsp` flow, once
1174	/// the fee has been fully paid, the channel's funding transaction will be broadcasted.
1175	///
1176	/// Note that `next_channel_id` and `skimmed_fee_msat` are required to be provided.
1177	/// Therefore, the corresponding [`Event::PaymentForwarded`] events need to be generated and
1178	/// serialized by LDK versions greater or equal to 0.0.122.
1179	///
1180	/// [`Event::PaymentForwarded`]: lightning::events::Event::PaymentForwarded
1181	pub async fn payment_forwarded(
1182		&self, next_channel_id: ChannelId, skimmed_fee_msat: u64,
1183	) -> Result<(), APIError> {
1184		let mut should_persist = None;
1185		if let Some(counterparty_node_id) =
1186			self.peer_by_channel_id.read().unwrap().get(&next_channel_id)
1187		{
1188			let outer_state_lock = self.per_peer_state.read().unwrap();
1189			match outer_state_lock.get(counterparty_node_id) {
1190				Some(inner_state_lock) => {
1191					let mut peer_state = inner_state_lock.lock().unwrap();
1192					if let Some(intercept_scid) =
1193						peer_state.intercept_scid_by_channel_id.get(&next_channel_id).copied()
1194					{
1195						should_persist = Some(*counterparty_node_id);
1196
1197						if let Some(jit_channel) =
1198							peer_state.outbound_channels_by_intercept_scid.get_mut(&intercept_scid)
1199						{
1200							match jit_channel.payment_forwarded(skimmed_fee_msat) {
1201								Ok(Some(ForwardHTLCsAction(channel_id, htlcs))) => {
1202									for htlc in htlcs {
1203										self.channel_manager.get_cm().forward_intercepted_htlc(
1204											htlc.intercept_id,
1205											&channel_id,
1206											*counterparty_node_id,
1207											htlc.expected_outbound_amount_msat,
1208										)?;
1209									}
1210								},
1211								Ok(None) => {},
1212								Err(e) => {
1213									return Err(APIError::APIMisuseError {
1214										err: format!(
1215											"Forwarded payment was not applicable for JIT channel: {}",
1216											e.err
1217										),
1218									})
1219								},
1220							}
1221
1222							self.broadcast_funding_transaction_if_applies(jit_channel);
1223						}
1224					} else {
1225						return Err(APIError::APIMisuseError {
1226							err: format!("No state for for channel id: {}", next_channel_id),
1227						});
1228					}
1229					peer_state.needs_persist |= should_persist.is_some();
1230				},
1231				None => {
1232					return Err(APIError::APIMisuseError {
1233						err: format!("No counterparty state for: {}", counterparty_node_id),
1234					});
1235				},
1236			}
1237		}
1238
1239		if let Some(counterparty_node_id) = should_persist {
1240			self.persist_peer_state(counterparty_node_id).await.map_err(|e| {
1241				APIError::APIMisuseError {
1242					err: format!(
1243						"Failed to persist peer state for {}: {}",
1244						counterparty_node_id, e
1245					),
1246				}
1247			})?;
1248		}
1249
1250		Ok(())
1251	}
1252
1253	/// Abandons a pending JIT‐open flow for `user_channel_id`, removing all local state.
1254	///
1255	/// This removes the intercept SCID, any outbound channel state, and associated
1256	/// channel‐ID mappings for the specified `user_channel_id`, but only while no payment
1257	/// has been forwarded yet and no channel has been opened on-chain.
1258	///
1259	/// Returns an error if:
1260	///  - there is no channel matching `user_channel_id`, or
1261	///  - a payment has already been forwarded or a channel has already been opened
1262	///
1263	/// Note: this does *not* close or roll back any on‐chain channel which may already
1264	/// have been opened. The caller must call this before or instead of initiating the channel
1265	/// open, as it only affects the local LSPS2 state and doesn't affect any channels that
1266	/// might already exist on-chain. Any pending channel open attempts must be managed
1267	/// separately.
1268	pub async fn channel_open_abandoned(
1269		&self, counterparty_node_id: &PublicKey, user_channel_id: u128,
1270	) -> Result<(), APIError> {
1271		{
1272			let outer_state_lock = self.per_peer_state.read().unwrap();
1273			let inner_state_lock = outer_state_lock.get(counterparty_node_id).ok_or_else(|| {
1274				APIError::APIMisuseError {
1275					err: format!("No counterparty state for: {}", counterparty_node_id),
1276				}
1277			})?;
1278			let mut peer_state = inner_state_lock.lock().unwrap();
1279
1280			let intercept_scid = peer_state
1281				.intercept_scid_by_user_channel_id
1282				.get(&user_channel_id)
1283				.copied()
1284				.ok_or_else(|| APIError::APIMisuseError {
1285					err: format!(
1286						"Could not find a channel with user_channel_id {}",
1287						user_channel_id
1288					),
1289				})?;
1290
1291			let jit_channel = peer_state
1292				.outbound_channels_by_intercept_scid
1293				.get(&intercept_scid)
1294				.ok_or_else(|| APIError::APIMisuseError {
1295				err: format!(
1296					"Failed to map intercept_scid {} for user_channel_id {} to a channel.",
1297					intercept_scid, user_channel_id,
1298				),
1299			})?;
1300
1301			let is_pending = matches!(
1302				jit_channel.state,
1303				OutboundJITChannelState::PendingInitialPayment { .. }
1304					| OutboundJITChannelState::PendingChannelOpen { .. }
1305			);
1306
1307			if !is_pending {
1308				return Err(APIError::APIMisuseError {
1309					err: "Cannot abandon channel open after channel creation or payment forwarding"
1310						.to_string(),
1311				});
1312			}
1313
1314			peer_state.intercept_scid_by_user_channel_id.remove(&user_channel_id);
1315			peer_state.outbound_channels_by_intercept_scid.remove(&intercept_scid);
1316			peer_state.intercept_scid_by_channel_id.retain(|_, &mut scid| scid != intercept_scid);
1317			peer_state.needs_persist |= true;
1318		}
1319
1320		self.persist_peer_state(*counterparty_node_id).await.map_err(|e| {
1321			APIError::APIMisuseError {
1322				err: format!("Failed to persist peer state for {}: {}", counterparty_node_id, e),
1323			}
1324		})?;
1325
1326		Ok(())
1327	}
1328
1329	/// Used to fail intercepted HTLCs backwards when a channel open attempt ultimately fails.
1330	///
1331	/// This function should be called after receiving an [`LSPS2ServiceEvent::OpenChannel`] event
1332	/// but only if the channel could not be successfully established. It resets the JIT channel
1333	/// state so that the payer may try the payment again.
1334	///
1335	/// [`LSPS2ServiceEvent::OpenChannel`]: crate::lsps2::event::LSPS2ServiceEvent::OpenChannel
1336	pub async fn channel_open_failed(
1337		&self, counterparty_node_id: &PublicKey, user_channel_id: u128,
1338	) -> Result<(), APIError> {
1339		{
1340			let outer_state_lock = self.per_peer_state.read().unwrap();
1341
1342			let inner_state_lock = outer_state_lock.get(counterparty_node_id).ok_or_else(|| {
1343				APIError::APIMisuseError {
1344					err: format!("No counterparty state for: {}", counterparty_node_id),
1345				}
1346			})?;
1347
1348			let mut peer_state = inner_state_lock.lock().unwrap();
1349
1350			let intercept_scid = peer_state
1351				.intercept_scid_by_user_channel_id
1352				.get(&user_channel_id)
1353				.copied()
1354				.ok_or_else(|| APIError::APIMisuseError {
1355					err: format!(
1356						"Could not find a channel with user_channel_id {}",
1357						user_channel_id
1358					),
1359				})?;
1360
1361			let jit_channel = peer_state
1362				.outbound_channels_by_intercept_scid
1363				.get_mut(&intercept_scid)
1364				.ok_or_else(|| APIError::APIMisuseError {
1365					err: format!(
1366						"Failed to map intercept_scid {} for user_channel_id {} to a channel.",
1367						intercept_scid, user_channel_id,
1368					),
1369				})?;
1370
1371			if let OutboundJITChannelState::PendingChannelOpen { payment_queue, .. } =
1372				&mut jit_channel.state
1373			{
1374				let intercepted_htlcs = payment_queue.clear();
1375				for htlc in intercepted_htlcs {
1376					self.channel_manager.get_cm().fail_htlc_backwards_with_reason(
1377						&htlc.payment_hash,
1378						FailureCode::TemporaryNodeFailure,
1379					);
1380				}
1381
1382				jit_channel.state = OutboundJITChannelState::PendingInitialPayment {
1383					payment_queue: PaymentQueue::new(),
1384				};
1385			} else {
1386				return Err(APIError::APIMisuseError {
1387					err: "Channel is not in the PendingChannelOpen state.".to_string(),
1388				});
1389			}
1390
1391			peer_state.needs_persist |= true;
1392		}
1393		self.persist_peer_state(*counterparty_node_id).await.map_err(|e| {
1394			APIError::APIMisuseError {
1395				err: format!("Failed to persist peer state for {}: {}", counterparty_node_id, e),
1396			}
1397		})?;
1398
1399		Ok(())
1400	}
1401
1402	/// Forward [`Event::ChannelReady`] event parameters into this function.
1403	///
1404	/// Will forward the intercepted HTLC if it matches a channel
1405	/// we need to forward a payment over otherwise it will be ignored.
1406	///
1407	/// [`Event::ChannelReady`]: lightning::events::Event::ChannelReady
1408	pub async fn channel_ready(
1409		&self, user_channel_id: u128, channel_id: &ChannelId, counterparty_node_id: &PublicKey,
1410	) -> Result<(), APIError> {
1411		let mut should_persist = false;
1412		{
1413			let mut peer_by_channel_id = self.peer_by_channel_id.write().unwrap();
1414			peer_by_channel_id.insert(*channel_id, *counterparty_node_id);
1415		}
1416		match self.per_peer_state.read().unwrap().get(counterparty_node_id) {
1417			Some(inner_state_lock) => {
1418				let mut peer_state = inner_state_lock.lock().unwrap();
1419				if let Some(intercept_scid) =
1420					peer_state.intercept_scid_by_user_channel_id.get(&user_channel_id).copied()
1421				{
1422					should_persist |= true;
1423					peer_state.intercept_scid_by_channel_id.insert(*channel_id, intercept_scid);
1424					if let Some(jit_channel) =
1425						peer_state.outbound_channels_by_intercept_scid.get_mut(&intercept_scid)
1426					{
1427						match jit_channel.channel_ready(*channel_id) {
1428							Ok(ForwardPaymentAction(
1429								channel_id,
1430								FeePayment { opening_fee_msat, htlcs },
1431							)) => {
1432								let amounts_to_forward_msat =
1433									calculate_amount_to_forward_per_htlc(&htlcs, opening_fee_msat);
1434
1435								for (intercept_id, amount_to_forward_msat) in
1436									amounts_to_forward_msat
1437								{
1438									self.channel_manager.get_cm().forward_intercepted_htlc(
1439										intercept_id,
1440										&channel_id,
1441										*counterparty_node_id,
1442										amount_to_forward_msat,
1443									)?;
1444								}
1445							},
1446							Err(e) => {
1447								return Err(APIError::APIMisuseError {
1448									err: format!(
1449										"Failed to transition to channel ready: {}",
1450										e.err
1451									),
1452								})
1453							},
1454						}
1455					} else {
1456						return Err(APIError::APIMisuseError {
1457							err: format!(
1458								"Could not find a channel with user_channel_id {}",
1459								user_channel_id
1460							),
1461						});
1462					}
1463				} else {
1464					return Err(APIError::APIMisuseError {
1465						err: format!(
1466							"Could not find a channel with that user_channel_id {}",
1467							user_channel_id
1468						),
1469					});
1470				}
1471				peer_state.needs_persist |= should_persist;
1472			},
1473			None => {
1474				return Err(APIError::APIMisuseError {
1475					err: format!("No counterparty state for: {}", counterparty_node_id),
1476				});
1477			},
1478		}
1479
1480		if should_persist {
1481			self.persist_peer_state(*counterparty_node_id).await.map_err(|e| {
1482				APIError::APIMisuseError {
1483					err: format!(
1484						"Failed to persist peer state for {}: {}",
1485						counterparty_node_id, e
1486					),
1487				}
1488			})?;
1489		}
1490
1491		Ok(())
1492	}
1493
1494	fn handle_get_info_request(
1495		&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
1496		params: LSPS2GetInfoRequest,
1497	) -> Result<(), LightningError> {
1498		let mut message_queue_notifier = self.pending_messages.notifier();
1499		let event_queue_notifier = self.pending_events.notifier();
1500
1501		let mut outer_state_lock = self.per_peer_state.write().unwrap();
1502		let inner_state_lock = get_or_insert_peer_state_entry!(
1503			self,
1504			outer_state_lock,
1505			message_queue_notifier,
1506			counterparty_node_id
1507		);
1508		let mut peer_state_lock = inner_state_lock.lock().unwrap();
1509		let request = LSPS2Request::GetInfo(params.clone());
1510		self.insert_pending_request(
1511			&mut peer_state_lock,
1512			&mut message_queue_notifier,
1513			request_id.clone(),
1514			*counterparty_node_id,
1515			request,
1516		)?;
1517
1518		let event = LSPS2ServiceEvent::GetInfo {
1519			request_id,
1520			counterparty_node_id: *counterparty_node_id,
1521			token: params.token,
1522		};
1523		event_queue_notifier.enqueue(event);
1524
1525		Ok(())
1526	}
1527
1528	fn handle_buy_request(
1529		&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey, params: LSPS2BuyRequest,
1530	) -> Result<(), LightningError> {
1531		let mut message_queue_notifier = self.pending_messages.notifier();
1532		let event_queue_notifier = self.pending_events.notifier();
1533		if let Some(payment_size_msat) = params.payment_size_msat {
1534			if payment_size_msat < params.opening_fee_params.min_payment_size_msat {
1535				let response = LSPS2Response::BuyError(LSPSResponseError {
1536					code: LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_SMALL_ERROR_CODE,
1537					message: "payment size is below our minimum supported payment size".to_string(),
1538					data: None,
1539				});
1540				let msg = LSPS2Message::Response(request_id, response).into();
1541				message_queue_notifier.enqueue(counterparty_node_id, msg);
1542
1543				return Err(LightningError {
1544					err: "payment size is below our minimum supported payment size".to_string(),
1545					action: ErrorAction::IgnoreAndLog(Level::Info),
1546				});
1547			}
1548
1549			if payment_size_msat > params.opening_fee_params.max_payment_size_msat {
1550				let response = LSPS2Response::BuyError(LSPSResponseError {
1551					code: LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_LARGE_ERROR_CODE,
1552					message: "payment size is above our maximum supported payment size".to_string(),
1553					data: None,
1554				});
1555				let msg = LSPS2Message::Response(request_id, response).into();
1556				message_queue_notifier.enqueue(counterparty_node_id, msg);
1557				return Err(LightningError {
1558					err: "payment size is above our maximum supported payment size".to_string(),
1559					action: ErrorAction::IgnoreAndLog(Level::Info),
1560				});
1561			}
1562
1563			match compute_opening_fee(
1564				payment_size_msat,
1565				params.opening_fee_params.min_fee_msat,
1566				params.opening_fee_params.proportional.into(),
1567			) {
1568				Some(opening_fee) => {
1569					if opening_fee >= payment_size_msat {
1570						let response = LSPS2Response::BuyError(LSPSResponseError {
1571							code: LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_SMALL_ERROR_CODE,
1572							message: "payment size is too small to cover the opening fee"
1573								.to_string(),
1574							data: None,
1575						});
1576						let msg = LSPS2Message::Response(request_id, response).into();
1577						message_queue_notifier.enqueue(counterparty_node_id, msg);
1578						return Err(LightningError {
1579							err: "payment size is too small to cover the opening fee".to_string(),
1580							action: ErrorAction::IgnoreAndLog(Level::Info),
1581						});
1582					}
1583				},
1584				None => {
1585					let response = LSPS2Response::BuyError(LSPSResponseError {
1586						code: LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_LARGE_ERROR_CODE,
1587						message: "overflow error when calculating opening_fee".to_string(),
1588						data: None,
1589					});
1590					let msg = LSPS2Message::Response(request_id, response).into();
1591					message_queue_notifier.enqueue(counterparty_node_id, msg);
1592					return Err(LightningError {
1593						err: "overflow error when calculating opening_fee".to_string(),
1594						action: ErrorAction::IgnoreAndLog(Level::Info),
1595					});
1596				},
1597			}
1598		}
1599
1600		// TODO: if payment_size_msat is specified, make sure our node has sufficient incoming liquidity from public network to receive it.
1601		if !is_valid_opening_fee_params(
1602			&params.opening_fee_params,
1603			&self.config.promise_secret,
1604			counterparty_node_id,
1605		) {
1606			let response = LSPS2Response::BuyError(LSPSResponseError {
1607				code: LSPS2_BUY_REQUEST_INVALID_OPENING_FEE_PARAMS_ERROR_CODE,
1608				message: "valid_until is already past OR the promise did not match the provided parameters".to_string(),
1609				data: None,
1610			});
1611			let msg = LSPS2Message::Response(request_id, response).into();
1612			message_queue_notifier.enqueue(counterparty_node_id, msg);
1613			return Err(LightningError {
1614				err: "invalid opening fee parameters were supplied by client".to_string(),
1615				action: ErrorAction::IgnoreAndLog(Level::Info),
1616			});
1617		}
1618
1619		let mut outer_state_lock = self.per_peer_state.write().unwrap();
1620		let inner_state_lock = get_or_insert_peer_state_entry!(
1621			self,
1622			outer_state_lock,
1623			message_queue_notifier,
1624			counterparty_node_id
1625		);
1626		let mut peer_state_lock = inner_state_lock.lock().unwrap();
1627
1628		let request = LSPS2Request::Buy(params.clone());
1629
1630		self.insert_pending_request(
1631			&mut peer_state_lock,
1632			&mut message_queue_notifier,
1633			request_id.clone(),
1634			*counterparty_node_id,
1635			request,
1636		)?;
1637
1638		let event = LSPS2ServiceEvent::BuyRequest {
1639			request_id,
1640			counterparty_node_id: *counterparty_node_id,
1641			opening_fee_params: params.opening_fee_params,
1642			payment_size_msat: params.payment_size_msat,
1643		};
1644		event_queue_notifier.enqueue(event);
1645
1646		Ok(())
1647	}
1648
1649	fn insert_pending_request<'a>(
1650		&self, peer_state_lock: &mut MutexGuard<'a, PeerState>,
1651		message_queue_notifier: &mut MessageQueueNotifierGuard, request_id: LSPSRequestId,
1652		counterparty_node_id: PublicKey, request: LSPS2Request,
1653	) -> Result<(), LightningError> {
1654		let create_pending_request_limit_exceeded_response =
1655			|message_queue_notifier: &mut MessageQueueNotifierGuard, error_message: String| {
1656				let error_details = LSPSResponseError {
1657					code: LSPS0_CLIENT_REJECTED_ERROR_CODE,
1658					message: "Reached maximum number of pending requests. Please try again later."
1659						.to_string(),
1660					data: None,
1661				};
1662				let response = match &request {
1663					LSPS2Request::GetInfo(_) => LSPS2Response::GetInfoError(error_details),
1664					LSPS2Request::Buy(_) => LSPS2Response::BuyError(error_details),
1665				};
1666				let msg = LSPS2Message::Response(request_id.clone(), response).into();
1667				message_queue_notifier.enqueue(&counterparty_node_id, msg);
1668
1669				Err(LightningError {
1670					err: error_message,
1671					action: ErrorAction::IgnoreAndLog(Level::Debug),
1672				})
1673			};
1674
1675		if self.total_pending_requests.load(Ordering::Relaxed) >= MAX_TOTAL_PENDING_REQUESTS {
1676			let error_message = format!(
1677				"Reached maximum number of total pending requests: {}",
1678				MAX_TOTAL_PENDING_REQUESTS
1679			);
1680			return create_pending_request_limit_exceeded_response(
1681				message_queue_notifier,
1682				error_message,
1683			);
1684		}
1685
1686		if peer_state_lock.pending_requests_and_channels() < MAX_PENDING_REQUESTS_PER_PEER {
1687			peer_state_lock.pending_requests.insert(request_id, request);
1688			self.total_pending_requests.fetch_add(1, Ordering::Relaxed);
1689			Ok(())
1690		} else {
1691			let error_message = format!(
1692				"Peer {} reached maximum number of pending requests: {}",
1693				counterparty_node_id, MAX_PENDING_REQUESTS_PER_PEER
1694			);
1695			create_pending_request_limit_exceeded_response(message_queue_notifier, error_message)
1696		}
1697	}
1698
1699	fn remove_pending_request<'a>(
1700		&self, peer_state_lock: &mut MutexGuard<'a, PeerState>, request_id: &LSPSRequestId,
1701	) -> Option<LSPS2Request> {
1702		match peer_state_lock.pending_requests.remove(request_id) {
1703			Some(req) => {
1704				let res = self.total_pending_requests.fetch_update(
1705					Ordering::Relaxed,
1706					Ordering::Relaxed,
1707					|x| Some(x.saturating_sub(1)),
1708				);
1709				match res {
1710					Ok(previous_value) if previous_value == 0 => debug_assert!(
1711						false,
1712						"total_pending_requests counter out-of-sync! This should never happen!"
1713					),
1714					Err(previous_value) if previous_value == 0 => debug_assert!(
1715						false,
1716						"total_pending_requests counter out-of-sync! This should never happen!"
1717					),
1718					_ => {},
1719				}
1720				Some(req)
1721			},
1722			res => res,
1723		}
1724	}
1725
1726	#[cfg(debug_assertions)]
1727	fn verify_pending_request_counter(&self) {
1728		let mut num_requests = 0;
1729		let outer_state_lock = self.per_peer_state.read().unwrap();
1730		for (_, inner) in outer_state_lock.iter() {
1731			let inner_state_lock = inner.lock().unwrap();
1732			num_requests += inner_state_lock.pending_requests.len();
1733		}
1734		debug_assert_eq!(
1735			num_requests,
1736			self.total_pending_requests.load(Ordering::Relaxed),
1737			"total_pending_requests counter out-of-sync! This should never happen!"
1738		);
1739	}
1740
1741	async fn persist_peer_state(
1742		&self, counterparty_node_id: PublicKey,
1743	) -> Result<(), lightning::io::Error> {
1744		let fut = {
1745			let outer_state_lock = self.per_peer_state.read().unwrap();
1746			match outer_state_lock.get(&counterparty_node_id) {
1747				None => {
1748					// We dropped the peer state by now.
1749					return Ok(());
1750				},
1751				Some(entry) => {
1752					let mut peer_state_lock = entry.lock().unwrap();
1753					if !peer_state_lock.needs_persist {
1754						// We already have persisted otherwise by now.
1755						return Ok(());
1756					} else {
1757						peer_state_lock.needs_persist = false;
1758						let key = counterparty_node_id.to_string();
1759						let encoded = peer_state_lock.encode();
1760						// Begin the write with the entry lock held. This avoids racing with
1761						// potentially-in-flight `persist` calls writing state for the same peer.
1762						self.kv_store.write(
1763							LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1764							LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
1765							&key,
1766							encoded,
1767						)
1768					}
1769				},
1770			}
1771		};
1772
1773		fut.await.map_err(|e| {
1774			self.per_peer_state
1775				.read()
1776				.unwrap()
1777				.get(&counterparty_node_id)
1778				.map(|p| p.lock().unwrap().needs_persist = true);
1779			e
1780		})
1781	}
1782
1783	pub(crate) async fn persist(&self) -> Result<(), lightning::io::Error> {
1784		// TODO: We should eventually persist in parallel, however, when we do, we probably want to
1785		// introduce some batching to upper-bound the number of requests inflight at any given
1786		// time.
1787
1788		if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 {
1789			// If we're not the first event processor to get here, just return early, the increment
1790			// we just did will be treated as "go around again" at the end.
1791			return Ok(());
1792		}
1793
1794		loop {
1795			let mut need_remove = Vec::new();
1796			let mut need_persist = Vec::new();
1797
1798			{
1799				// First build a list of peers to persist and prune with the read lock. This allows
1800				// us to avoid the write lock unless we actually need to remove a node.
1801				let outer_state_lock = self.per_peer_state.read().unwrap();
1802				for (counterparty_node_id, inner_state_lock) in outer_state_lock.iter() {
1803					let mut peer_state_lock = inner_state_lock.lock().unwrap();
1804					peer_state_lock.prune_expired_request_state();
1805					let is_prunable = peer_state_lock.is_prunable();
1806					if is_prunable {
1807						need_remove.push(*counterparty_node_id);
1808					} else if peer_state_lock.needs_persist {
1809						need_persist.push(*counterparty_node_id);
1810					}
1811				}
1812			}
1813
1814			for counterparty_node_id in need_persist.into_iter() {
1815				debug_assert!(!need_remove.contains(&counterparty_node_id));
1816				self.persist_peer_state(counterparty_node_id).await?;
1817			}
1818
1819			for counterparty_node_id in need_remove {
1820				let mut future_opt = None;
1821				{
1822					// We need to take the `per_peer_state` write lock to remove an entry, but also
1823					// have to hold it until after the `remove` call returns (but not through
1824					// future completion) to ensure that writes for the peer's state are
1825					// well-ordered with other `persist_peer_state` calls even across the removal
1826					// itself.
1827					let mut per_peer_state = self.per_peer_state.write().unwrap();
1828					if let Entry::Occupied(mut entry) = per_peer_state.entry(counterparty_node_id) {
1829						let state = entry.get_mut().get_mut().unwrap();
1830						if state.is_prunable() {
1831							entry.remove();
1832							let key = counterparty_node_id.to_string();
1833							future_opt = Some(self.kv_store.remove(
1834								LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1835								LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
1836								&key,
1837							));
1838						} else {
1839							// If the peer got new state, force a re-persist of the current state.
1840							state.needs_persist = true;
1841						}
1842					} else {
1843						// This should never happen, we can only have one `persist` call
1844						// in-progress at once and map entries are only removed by it.
1845						debug_assert!(false);
1846					}
1847				}
1848				if let Some(future) = future_opt {
1849					future.await?;
1850				} else {
1851					self.persist_peer_state(counterparty_node_id).await?;
1852				}
1853			}
1854
1855			if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 {
1856				// If another thread incremented the state while we were running we should go
1857				// around again, but only once.
1858				self.persistence_in_flight.store(1, Ordering::Release);
1859				continue;
1860			}
1861			break;
1862		}
1863
1864		Ok(())
1865	}
1866
1867	pub(crate) fn peer_disconnected(&self, counterparty_node_id: PublicKey) {
1868		let outer_state_lock = self.per_peer_state.write().unwrap();
1869		if let Some(inner_state_lock) = outer_state_lock.get(&counterparty_node_id) {
1870			let mut peer_state_lock = inner_state_lock.lock().unwrap();
1871			// We clean up the peer state, but leave removing the peer entry to the prune logic in
1872			// `persist` which removes it from the store.
1873			peer_state_lock.prune_expired_request_state();
1874		}
1875	}
1876
1877	/// Checks if the JIT channel with the given `user_channel_id` needs manual broadcast.
1878	///
1879	/// Will be `true` if `client_trusts_lsp` is set to `true`.
1880	pub fn channel_needs_manual_broadcast(
1881		&self, user_channel_id: u128, counterparty_node_id: &PublicKey,
1882	) -> Result<bool, APIError> {
1883		let outer_state_lock = self.per_peer_state.read().unwrap();
1884		let inner_state_lock =
1885			outer_state_lock.get(counterparty_node_id).ok_or_else(|| APIError::APIMisuseError {
1886				err: format!("No counterparty state for: {}", counterparty_node_id),
1887			})?;
1888		let peer_state = inner_state_lock.lock().unwrap();
1889
1890		let intercept_scid = peer_state
1891			.intercept_scid_by_user_channel_id
1892			.get(&user_channel_id)
1893			.copied()
1894			.ok_or_else(|| APIError::APIMisuseError {
1895				err: format!("Could not find a channel with user_channel_id {}", user_channel_id),
1896			})?;
1897
1898		let jit_channel = peer_state
1899			.outbound_channels_by_intercept_scid
1900			.get(&intercept_scid)
1901			.ok_or_else(|| APIError::APIMisuseError {
1902				err: format!(
1903					"Failed to map intercept_scid {} for user_channel_id {} to a channel.",
1904					intercept_scid, user_channel_id,
1905				),
1906			})?;
1907
1908		Ok(jit_channel.is_client_trusts_lsp())
1909	}
1910
1911	/// Stores the funding transaction for a JIT channel.
1912	///
1913	/// Call this when the funding transaction is created.
1914	///
1915	/// In `client_trusts_lsp` the broadcasting of the funding transaction will be handled internally
1916	/// after you also mark it as broadcast-safe via
1917	/// [`set_funding_tx_broadcast_safe`] and once the opening fee has been collected. You do not need
1918	/// to broadcast the funding transaction yourself in this flow.
1919	///
1920	/// [`set_funding_tx_broadcast_safe`]: Self::set_funding_tx_broadcast_safe
1921	pub fn store_funding_transaction(
1922		&self, user_channel_id: u128, counterparty_node_id: &PublicKey, funding_tx: Transaction,
1923	) -> Result<(), APIError> {
1924		let outer_state_lock = self.per_peer_state.read().unwrap();
1925		let inner_state_lock =
1926			outer_state_lock.get(counterparty_node_id).ok_or_else(|| APIError::APIMisuseError {
1927				err: format!("No counterparty state for: {}", counterparty_node_id),
1928			})?;
1929		let mut peer_state = inner_state_lock.lock().unwrap();
1930
1931		let intercept_scid = peer_state
1932			.intercept_scid_by_user_channel_id
1933			.get(&user_channel_id)
1934			.copied()
1935			.ok_or_else(|| APIError::APIMisuseError {
1936				err: format!("Could not find a channel with user_channel_id {}", user_channel_id),
1937			})?;
1938
1939		let jit_channel = peer_state
1940			.outbound_channels_by_intercept_scid
1941			.get_mut(&intercept_scid)
1942			.ok_or_else(|| APIError::APIMisuseError {
1943			err: format!(
1944				"Failed to map intercept_scid {} for user_channel_id {} to a channel.",
1945				intercept_scid, user_channel_id,
1946			),
1947		})?;
1948
1949		jit_channel.set_funding_tx(funding_tx);
1950
1951		self.broadcast_funding_transaction_if_applies(jit_channel);
1952		Ok(())
1953	}
1954
1955	/// Marks that the funding transaction for the JIT channel identified by `user_channel_id`
1956	/// is now safe to broadcast.
1957	///
1958	/// In LDK call this when you receive [`Event::FundingTxBroadcastSafe`]. In other Lightning
1959	/// backends call it once the funding transaction is fully negotiated and signed (all
1960	/// signatures verified), your channel state machine will now proceed assuming the funding
1961	/// transaction will confirm, and you are intentionally deferring the actual broadcast so
1962	/// the LSPS2 flow (when `client_trusts_lsp = true`) can first collect the opening fee from
1963	/// the intercepted payment.
1964	///
1965	/// In a `client_trusts_lsp` flow, after this is set and the opening fee has been fully skimmed,
1966	/// the channel's funding transaction will be broadcasted if the channel is still usable.
1967	/// If the channel has been closed or force-closed before this point, the funding transaction will not be broadcasted.
1968	///
1969	/// [`Event::FundingTxBroadcastSafe`]: lightning::events::Event::FundingTxBroadcastSafe
1970	pub fn set_funding_tx_broadcast_safe(
1971		&self, user_channel_id: u128, counterparty_node_id: &PublicKey,
1972	) -> Result<(), APIError> {
1973		let outer_state_lock = self.per_peer_state.read().unwrap();
1974		let inner_state_lock =
1975			outer_state_lock.get(counterparty_node_id).ok_or_else(|| APIError::APIMisuseError {
1976				err: format!("No counterparty state for: {}", counterparty_node_id),
1977			})?;
1978		let mut peer_state = inner_state_lock.lock().unwrap();
1979
1980		let intercept_scid = peer_state
1981			.intercept_scid_by_user_channel_id
1982			.get(&user_channel_id)
1983			.copied()
1984			.ok_or_else(|| APIError::APIMisuseError {
1985				err: format!("Could not find a channel with user_channel_id {}", user_channel_id),
1986			})?;
1987
1988		let jit_channel = peer_state
1989			.outbound_channels_by_intercept_scid
1990			.get_mut(&intercept_scid)
1991			.ok_or_else(|| APIError::APIMisuseError {
1992			err: format!(
1993				"Failed to map intercept_scid {} for user_channel_id {} to a channel.",
1994				intercept_scid, user_channel_id,
1995			),
1996		})?;
1997
1998		jit_channel.set_funding_tx_broadcast_safe(true);
1999
2000		self.broadcast_funding_transaction_if_applies(jit_channel);
2001		Ok(())
2002	}
2003
2004	fn broadcast_funding_transaction_if_applies(&self, jit_channel: &OutboundJITChannel) {
2005		if !jit_channel.should_broadcast_funding_transaction() {
2006			return;
2007		}
2008
2009		// Broadcast the funding transaction only if the LDK channel is still usable. In
2010		// the `client_trusts_lsp` flow we delay funding broadcast until the opening fee is
2011		// collected. Before that happens, LDK may force-close the not‑yet‑funded channel
2012		// (for example when a forwarded HTLC nears expiry). Broadcasting funding after a
2013		// close could then confirm the commitment and trigger unintended on‑chain handling.
2014		// To avoid this, we check ChannelManager’s view (`is_channel_ready`) before broadcasting.
2015		let channel_id_opt = jit_channel.get_channel_id();
2016		if let Some(ch_id) = channel_id_opt {
2017			let is_channel_ready = self
2018				.channel_manager
2019				.get_cm()
2020				.list_channels()
2021				.into_iter()
2022				.any(|cd| cd.channel_id == ch_id && cd.is_channel_ready);
2023			if !is_channel_ready {
2024				return;
2025			}
2026		} else {
2027			return;
2028		}
2029
2030		if let Some(funding_tx) = jit_channel.get_funding_tx() {
2031			self.tx_broadcaster.broadcast_transactions(&[funding_tx]);
2032		}
2033	}
2034}
2035
2036impl<CM: Deref, K: Deref + Clone, T: Deref + Clone> LSPSProtocolMessageHandler
2037	for LSPS2ServiceHandler<CM, K, T>
2038where
2039	CM::Target: AChannelManager,
2040	K::Target: KVStore,
2041	T::Target: BroadcasterInterface,
2042{
2043	type ProtocolMessage = LSPS2Message;
2044	const PROTOCOL_NUMBER: Option<u16> = Some(2);
2045
2046	fn handle_message(
2047		&self, message: Self::ProtocolMessage, counterparty_node_id: &PublicKey,
2048	) -> Result<(), LightningError> {
2049		match message {
2050			LSPS2Message::Request(request_id, request) => {
2051				let res = match request {
2052					LSPS2Request::GetInfo(params) => {
2053						self.handle_get_info_request(request_id, counterparty_node_id, params)
2054					},
2055					LSPS2Request::Buy(params) => {
2056						self.handle_buy_request(request_id, counterparty_node_id, params)
2057					},
2058				};
2059				#[cfg(debug_assertions)]
2060				self.verify_pending_request_counter();
2061				res
2062			},
2063			_ => {
2064				debug_assert!(
2065					false,
2066					"Service handler received LSPS2 response message. This should never happen."
2067				);
2068				Err(LightningError { err: format!("Service handler received LSPS2 response message from node {}. This should never happen.", counterparty_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)})
2069			},
2070		}
2071	}
2072}
2073
2074fn calculate_amount_to_forward_per_htlc(
2075	htlcs: &[InterceptedHTLC], total_fee_msat: u64,
2076) -> Vec<(InterceptId, u64)> {
2077	// TODO: we should eventually make sure the HTLCs are all above ChannelDetails::next_outbound_minimum_msat
2078	let total_expected_outbound_msat: u64 =
2079		htlcs.iter().map(|htlc| htlc.expected_outbound_amount_msat).sum();
2080	if total_fee_msat > total_expected_outbound_msat {
2081		debug_assert!(false, "Fee is larger than the total expected outbound amount.");
2082		return Vec::new();
2083	}
2084
2085	let mut fee_remaining_msat = total_fee_msat;
2086	let mut per_htlc_forwards = vec![];
2087	for (index, htlc) in htlcs.iter().enumerate() {
2088		let proportional_fee_amt_msat = (total_fee_msat as u128
2089			* htlc.expected_outbound_amount_msat as u128
2090			/ total_expected_outbound_msat as u128) as u64;
2091
2092		let mut actual_fee_amt_msat = core::cmp::min(fee_remaining_msat, proportional_fee_amt_msat);
2093		actual_fee_amt_msat =
2094			core::cmp::min(actual_fee_amt_msat, htlc.expected_outbound_amount_msat);
2095		fee_remaining_msat -= actual_fee_amt_msat;
2096
2097		if index == htlcs.len() - 1 {
2098			actual_fee_amt_msat += fee_remaining_msat;
2099		}
2100
2101		let amount_to_forward_msat =
2102			htlc.expected_outbound_amount_msat.saturating_sub(actual_fee_amt_msat);
2103
2104		per_htlc_forwards.push((htlc.intercept_id, amount_to_forward_msat))
2105	}
2106	per_htlc_forwards
2107}
2108
2109/// A synchroneous wrapper around [`LSPS2ServiceHandler`] to be used in contexts where async is not
2110/// available.
2111pub struct LSPS2ServiceHandlerSync<'a, CM: Deref, K: Deref + Clone, T: Deref + Clone>
2112where
2113	CM::Target: AChannelManager,
2114	K::Target: KVStore,
2115	T::Target: BroadcasterInterface,
2116{
2117	inner: &'a LSPS2ServiceHandler<CM, K, T>,
2118}
2119
2120impl<'a, CM: Deref, K: Deref + Clone, T: Deref + Clone> LSPS2ServiceHandlerSync<'a, CM, K, T>
2121where
2122	CM::Target: AChannelManager,
2123	K::Target: KVStore,
2124	T::Target: BroadcasterInterface,
2125{
2126	pub(crate) fn from_inner(inner: &'a LSPS2ServiceHandler<CM, K, T>) -> Self {
2127		Self { inner }
2128	}
2129
2130	/// Returns a reference to the used config.
2131	///
2132	/// Wraps [`LSPS2ServiceHandler::config`].
2133	pub fn config(&self) -> &LSPS2ServiceConfig {
2134		&self.inner.config
2135	}
2136
2137	/// Used by LSP to inform a client requesting a JIT Channel the token they used is invalid.
2138	///
2139	/// Wraps [`LSPS2ServiceHandler::invalid_token_provided`].
2140	pub fn invalid_token_provided(
2141		&self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId,
2142	) -> Result<(), APIError> {
2143		self.inner.invalid_token_provided(counterparty_node_id, request_id)
2144	}
2145
2146	/// Used by LSP to provide fee parameters to a client requesting a JIT Channel.
2147	///
2148	/// Wraps [`LSPS2ServiceHandler::opening_fee_params_generated`].
2149	pub fn opening_fee_params_generated(
2150		&self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId,
2151		opening_fee_params_menu: Vec<LSPS2RawOpeningFeeParams>,
2152	) -> Result<(), APIError> {
2153		self.inner.opening_fee_params_generated(
2154			counterparty_node_id,
2155			request_id,
2156			opening_fee_params_menu,
2157		)
2158	}
2159
2160	/// Used by LSP to provide the client with the intercept scid, a unique `user_channel_id`, and
2161	/// `cltv_expiry_delta` to include in their invoice.
2162	///
2163	/// Wraps [`LSPS2ServiceHandler::invoice_parameters_generated`].
2164	pub fn invoice_parameters_generated(
2165		&self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId, intercept_scid: u64,
2166		cltv_expiry_delta: u32, client_trusts_lsp: bool, user_channel_id: u128,
2167	) -> Result<(), APIError> {
2168		let mut fut = Box::pin(self.inner.invoice_parameters_generated(
2169			counterparty_node_id,
2170			request_id,
2171			intercept_scid,
2172			cltv_expiry_delta,
2173			client_trusts_lsp,
2174			user_channel_id,
2175		));
2176
2177		let mut waker = dummy_waker();
2178		let mut ctx = task::Context::from_waker(&mut waker);
2179		match fut.as_mut().poll(&mut ctx) {
2180			task::Poll::Ready(result) => result,
2181			task::Poll::Pending => {
2182				// In a sync context, we can't wait for the future to complete.
2183				unreachable!("Should not be pending in a sync context");
2184			},
2185		}
2186	}
2187
2188	/// Forward [`Event::HTLCIntercepted`] event parameters into this function.
2189	///
2190	/// Wraps [`LSPS2ServiceHandler::htlc_intercepted`].
2191	///
2192	/// [`Event::HTLCIntercepted`]: lightning::events::Event::HTLCIntercepted
2193	pub fn htlc_intercepted(
2194		&self, intercept_scid: u64, intercept_id: InterceptId, expected_outbound_amount_msat: u64,
2195		payment_hash: PaymentHash,
2196	) -> Result<(), APIError> {
2197		let mut fut = Box::pin(self.inner.htlc_intercepted(
2198			intercept_scid,
2199			intercept_id,
2200			expected_outbound_amount_msat,
2201			payment_hash,
2202		));
2203
2204		let mut waker = dummy_waker();
2205		let mut ctx = task::Context::from_waker(&mut waker);
2206		match fut.as_mut().poll(&mut ctx) {
2207			task::Poll::Ready(result) => result,
2208			task::Poll::Pending => {
2209				// In a sync context, we can't wait for the future to complete.
2210				unreachable!("Should not be pending in a sync context");
2211			},
2212		}
2213	}
2214
2215	/// Forward [`Event::HTLCHandlingFailed`] event parameter into this function.
2216	///
2217	/// Wraps [`LSPS2ServiceHandler::htlc_handling_failed`].
2218	///
2219	/// [`Event::HTLCHandlingFailed`]: lightning::events::Event::HTLCHandlingFailed
2220	pub fn htlc_handling_failed(
2221		&self, failure_type: HTLCHandlingFailureType,
2222	) -> Result<(), APIError> {
2223		let mut fut = Box::pin(self.inner.htlc_handling_failed(failure_type));
2224
2225		let mut waker = dummy_waker();
2226		let mut ctx = task::Context::from_waker(&mut waker);
2227		match fut.as_mut().poll(&mut ctx) {
2228			task::Poll::Ready(result) => result,
2229			task::Poll::Pending => {
2230				// In a sync context, we can't wait for the future to complete.
2231				unreachable!("Should not be pending in a sync context");
2232			},
2233		}
2234	}
2235
2236	/// Forward [`Event::PaymentForwarded`] event parameter into this function.
2237	///
2238	/// Wraps [`LSPS2ServiceHandler::payment_forwarded`].
2239	///
2240	/// [`Event::PaymentForwarded`]: lightning::events::Event::PaymentForwarded
2241	pub fn payment_forwarded(
2242		&self, next_channel_id: ChannelId, skimmed_fee_msat: u64,
2243	) -> Result<(), APIError> {
2244		let mut fut = Box::pin(self.inner.payment_forwarded(next_channel_id, skimmed_fee_msat));
2245
2246		let mut waker = dummy_waker();
2247		let mut ctx = task::Context::from_waker(&mut waker);
2248		match fut.as_mut().poll(&mut ctx) {
2249			task::Poll::Ready(result) => result,
2250			task::Poll::Pending => {
2251				// In a sync context, we can't wait for the future to complete.
2252				unreachable!("Should not be pending in a sync context");
2253			},
2254		}
2255	}
2256
2257	/// Wraps [`LSPS2ServiceHandler::channel_needs_manual_broadcast`].
2258	pub fn channel_needs_manual_broadcast(
2259		&self, user_channel_id: u128, counterparty_node_id: &PublicKey,
2260	) -> Result<bool, APIError> {
2261		self.inner.channel_needs_manual_broadcast(user_channel_id, counterparty_node_id)
2262	}
2263
2264	/// Wraps [`LSPS2ServiceHandler::store_funding_transaction`].
2265	pub fn store_funding_transaction(
2266		&self, user_channel_id: u128, counterparty_node_id: &PublicKey, funding_tx: Transaction,
2267	) -> Result<(), APIError> {
2268		self.inner.store_funding_transaction(user_channel_id, counterparty_node_id, funding_tx)
2269	}
2270
2271	/// Wraps [`LSPS2ServiceHandler::set_funding_tx_broadcast_safe`].
2272	pub fn set_funding_tx_broadcast_safe(
2273		&self, user_channel_id: u128, counterparty_node_id: &PublicKey,
2274	) -> Result<(), APIError> {
2275		self.inner.set_funding_tx_broadcast_safe(user_channel_id, counterparty_node_id)
2276	}
2277
2278	/// Abandons a pending JIT‐open flow for `user_channel_id`, removing all local state.
2279	///
2280	/// Wraps [`LSPS2ServiceHandler::channel_open_abandoned`].
2281	pub fn channel_open_abandoned(
2282		&self, counterparty_node_id: &PublicKey, user_channel_id: u128,
2283	) -> Result<(), APIError> {
2284		let mut fut =
2285			Box::pin(self.inner.channel_open_abandoned(counterparty_node_id, user_channel_id));
2286
2287		let mut waker = dummy_waker();
2288		let mut ctx = task::Context::from_waker(&mut waker);
2289		match fut.as_mut().poll(&mut ctx) {
2290			task::Poll::Ready(result) => result,
2291			task::Poll::Pending => {
2292				// In a sync context, we can't wait for the future to complete.
2293				unreachable!("Should not be pending in a sync context");
2294			},
2295		}
2296	}
2297
2298	/// Used to fail intercepted HTLCs backwards when a channel open attempt ultimately fails.
2299	///
2300	/// Wraps [`LSPS2ServiceHandler::channel_open_failed`].
2301	pub fn channel_open_failed(
2302		&self, counterparty_node_id: &PublicKey, user_channel_id: u128,
2303	) -> Result<(), APIError> {
2304		let mut fut =
2305			Box::pin(self.inner.channel_open_failed(counterparty_node_id, user_channel_id));
2306
2307		let mut waker = dummy_waker();
2308		let mut ctx = task::Context::from_waker(&mut waker);
2309		match fut.as_mut().poll(&mut ctx) {
2310			task::Poll::Ready(result) => result,
2311			task::Poll::Pending => {
2312				// In a sync context, we can't wait for the future to complete.
2313				unreachable!("Should not be pending in a sync context");
2314			},
2315		}
2316	}
2317
2318	/// Forward [`Event::ChannelReady`] event parameters into this function.
2319	///
2320	/// Wraps [`LSPS2ServiceHandler::channel_ready`].
2321	///
2322	/// [`Event::ChannelReady`]: lightning::events::Event::ChannelReady
2323	pub fn channel_ready(
2324		&self, user_channel_id: u128, channel_id: &ChannelId, counterparty_node_id: &PublicKey,
2325	) -> Result<(), APIError> {
2326		let mut fut =
2327			Box::pin(self.inner.channel_ready(user_channel_id, channel_id, counterparty_node_id));
2328
2329		let mut waker = dummy_waker();
2330		let mut ctx = task::Context::from_waker(&mut waker);
2331		match fut.as_mut().poll(&mut ctx) {
2332			task::Poll::Ready(result) => result,
2333			task::Poll::Pending => {
2334				// In a sync context, we can't wait for the future to complete.
2335				unreachable!("Should not be pending in a sync context");
2336			},
2337		}
2338	}
2339}
2340
2341#[cfg(test)]
2342mod tests {
2343	use super::*;
2344
2345	use crate::lsps0::ser::LSPSDateTime;
2346
2347	use proptest::prelude::*;
2348
2349	use bitcoin::{absolute::LockTime, transaction::Version};
2350	use core::str::FromStr;
2351
2352	const MAX_VALUE_MSAT: u64 = 21_000_000_0000_0000_000;
2353
2354	fn arb_forward_amounts() -> impl Strategy<Value = (u64, u64, u64, u64)> {
2355		(1u64..MAX_VALUE_MSAT, 1u64..MAX_VALUE_MSAT, 1u64..MAX_VALUE_MSAT, 1u64..MAX_VALUE_MSAT)
2356			.prop_map(|(a, b, c, d)| {
2357				(a, b, c, core::cmp::min(d, a.saturating_add(b).saturating_add(c)))
2358			})
2359	}
2360
2361	proptest! {
2362		#[test]
2363		fn proptest_calculate_amount_to_forward((o_0, o_1, o_2, total_fee_msat) in arb_forward_amounts()) {
2364			let htlcs = vec![
2365				InterceptedHTLC {
2366					intercept_id: InterceptId([0; 32]),
2367					expected_outbound_amount_msat: o_0,
2368					payment_hash: PaymentHash([0; 32]),
2369				},
2370				InterceptedHTLC {
2371					intercept_id: InterceptId([1; 32]),
2372					expected_outbound_amount_msat: o_1,
2373					payment_hash: PaymentHash([0; 32]),
2374				},
2375				InterceptedHTLC {
2376					intercept_id: InterceptId([2; 32]),
2377					expected_outbound_amount_msat: o_2,
2378					payment_hash: PaymentHash([0; 32]),
2379				},
2380			];
2381
2382			let result = calculate_amount_to_forward_per_htlc(&htlcs, total_fee_msat);
2383			let total_received_msat = o_0 + o_1 + o_2;
2384
2385			if total_received_msat < total_fee_msat {
2386				assert_eq!(result.len(), 0);
2387			} else {
2388				assert_ne!(result.len(), 0);
2389				assert_eq!(result[0].0, htlcs[0].intercept_id);
2390				assert_eq!(result[1].0, htlcs[1].intercept_id);
2391				assert_eq!(result[2].0, htlcs[2].intercept_id);
2392				assert!(result[0].1 <= o_0);
2393				assert!(result[1].1 <= o_1);
2394				assert!(result[2].1 <= o_2);
2395
2396				let result_sum = result.iter().map(|(_, f)| f).sum::<u64>();
2397				assert_eq!(total_received_msat - result_sum, total_fee_msat);
2398				let five_pct = result_sum as f32 * 0.05;
2399				let fair_share_0 = (o_0 as f32 / total_received_msat as f32) * result_sum as f32;
2400				assert!(result[0].1 as f32 <= fair_share_0 + five_pct);
2401				let fair_share_1 = (o_1 as f32 / total_received_msat as f32) * result_sum as f32;
2402				assert!(result[1].1 as f32 <= fair_share_1 + five_pct);
2403				let fair_share_2 = (o_2 as f32 / total_received_msat as f32) * result_sum as f32;
2404				assert!(result[2].1 as f32 <= fair_share_2 + five_pct);
2405			}
2406		}
2407	}
2408
2409	#[test]
2410	fn test_calculate_amount_to_forward() {
2411		let htlcs = vec![
2412			InterceptedHTLC {
2413				intercept_id: InterceptId([0; 32]),
2414				expected_outbound_amount_msat: 2,
2415				payment_hash: PaymentHash([0; 32]),
2416			},
2417			InterceptedHTLC {
2418				intercept_id: InterceptId([1; 32]),
2419				expected_outbound_amount_msat: 6,
2420				payment_hash: PaymentHash([0; 32]),
2421			},
2422			InterceptedHTLC {
2423				intercept_id: InterceptId([2; 32]),
2424				expected_outbound_amount_msat: 2,
2425				payment_hash: PaymentHash([0; 32]),
2426			},
2427		];
2428		let result = calculate_amount_to_forward_per_htlc(&htlcs, 5);
2429		assert_eq!(
2430			result,
2431			vec![
2432				(htlcs[0].intercept_id, 1),
2433				(htlcs[1].intercept_id, 3),
2434				(htlcs[2].intercept_id, 1),
2435			]
2436		);
2437	}
2438
2439	#[test]
2440	fn test_jit_channel_state_mpp() {
2441		let payment_size_msat = Some(500_000_000);
2442		let opening_fee_params = LSPS2OpeningFeeParams {
2443			min_fee_msat: 10_000_000,
2444			proportional: 10_000,
2445			valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(),
2446			min_lifetime: 4032,
2447			max_client_to_self_delay: 2016,
2448			min_payment_size_msat: 10_000_000,
2449			max_payment_size_msat: 1_000_000_000,
2450			promise: "ignore".to_string(),
2451		};
2452		let mut state = OutboundJITChannelState::new();
2453		// Intercepts the first HTLC of a multipart payment A.
2454		{
2455			let action = state
2456				.htlc_intercepted(
2457					&opening_fee_params,
2458					&payment_size_msat,
2459					InterceptedHTLC {
2460						intercept_id: InterceptId([0; 32]),
2461						expected_outbound_amount_msat: 200_000_000,
2462						payment_hash: PaymentHash([100; 32]),
2463					},
2464				)
2465				.unwrap();
2466			assert!(matches!(state, OutboundJITChannelState::PendingInitialPayment { .. }));
2467			assert!(action.is_none());
2468		}
2469		// Intercepts the first HTLC of a different multipart payment B.
2470		{
2471			let action = state
2472				.htlc_intercepted(
2473					&opening_fee_params,
2474					&payment_size_msat,
2475					InterceptedHTLC {
2476						intercept_id: InterceptId([1; 32]),
2477						expected_outbound_amount_msat: 1_000_000,
2478						payment_hash: PaymentHash([101; 32]),
2479					},
2480				)
2481				.unwrap();
2482			assert!(matches!(state, OutboundJITChannelState::PendingInitialPayment { .. }));
2483			assert!(action.is_none());
2484		}
2485		// Intercepts the second HTLC of multipart payment A, completing the expected payment and
2486		// opening the channel.
2487		{
2488			let action = state
2489				.htlc_intercepted(
2490					&opening_fee_params,
2491					&payment_size_msat,
2492					InterceptedHTLC {
2493						intercept_id: InterceptId([2; 32]),
2494						expected_outbound_amount_msat: 300_000_000,
2495						payment_hash: PaymentHash([100; 32]),
2496					},
2497				)
2498				.unwrap();
2499			assert!(matches!(state, OutboundJITChannelState::PendingChannelOpen { .. }));
2500			assert!(matches!(action, Some(HTLCInterceptedAction::OpenChannel(_))));
2501		}
2502		// Channel opens, becomes ready, and multipart payment A gets forwarded.
2503		{
2504			let ForwardPaymentAction(channel_id, payment) =
2505				state.channel_ready(ChannelId([200; 32])).unwrap();
2506			assert_eq!(channel_id, ChannelId([200; 32]));
2507			assert_eq!(payment.opening_fee_msat, 10_000_000);
2508			assert_eq!(
2509				payment.htlcs,
2510				vec![
2511					InterceptedHTLC {
2512						intercept_id: InterceptId([0; 32]),
2513						expected_outbound_amount_msat: 200_000_000,
2514						payment_hash: PaymentHash([100; 32]),
2515					},
2516					InterceptedHTLC {
2517						intercept_id: InterceptId([2; 32]),
2518						expected_outbound_amount_msat: 300_000_000,
2519						payment_hash: PaymentHash([100; 32]),
2520					},
2521				]
2522			);
2523		}
2524		// Intercepts the first HTLC of a different payment C.
2525		{
2526			let action = state
2527				.htlc_intercepted(
2528					&opening_fee_params,
2529					&payment_size_msat,
2530					InterceptedHTLC {
2531						intercept_id: InterceptId([3; 32]),
2532						expected_outbound_amount_msat: 2_000_000,
2533						payment_hash: PaymentHash([102; 32]),
2534					},
2535				)
2536				.unwrap();
2537			assert!(matches!(state, OutboundJITChannelState::PendingPaymentForward { .. }));
2538			assert!(action.is_none());
2539		}
2540		// Payment A fails.
2541		{
2542			let action = state.htlc_handling_failed().unwrap();
2543			assert!(matches!(state, OutboundJITChannelState::PendingPayment { .. }));
2544			// No payments have received sufficient HTLCs yet.
2545			assert!(action.is_none());
2546		}
2547		// Additional HTLC of payment B arrives, completing the expectd payment.
2548		{
2549			let action = state
2550				.htlc_intercepted(
2551					&opening_fee_params,
2552					&payment_size_msat,
2553					InterceptedHTLC {
2554						intercept_id: InterceptId([4; 32]),
2555						expected_outbound_amount_msat: 500_000_000,
2556						payment_hash: PaymentHash([101; 32]),
2557					},
2558				)
2559				.unwrap();
2560			assert!(matches!(state, OutboundJITChannelState::PendingPaymentForward { .. }));
2561			match action {
2562				Some(HTLCInterceptedAction::ForwardPayment(channel_id, payment)) => {
2563					assert_eq!(channel_id, ChannelId([200; 32]));
2564					assert_eq!(payment.opening_fee_msat, 10_000_000);
2565					assert_eq!(
2566						payment.htlcs,
2567						vec![
2568							InterceptedHTLC {
2569								intercept_id: InterceptId([1; 32]),
2570								expected_outbound_amount_msat: 1_000_000,
2571								payment_hash: PaymentHash([101; 32]),
2572							},
2573							InterceptedHTLC {
2574								intercept_id: InterceptId([4; 32]),
2575								expected_outbound_amount_msat: 500_000_000,
2576								payment_hash: PaymentHash([101; 32]),
2577							},
2578						]
2579					);
2580				},
2581				_ => panic!("Unexpected action when intercepted HTLC."),
2582			}
2583		}
2584		// Payment completes, queued payments get forwarded.
2585		{
2586			let action = state.payment_forwarded(100000000000).unwrap();
2587			assert!(matches!(state, OutboundJITChannelState::PaymentForwarded { .. }));
2588			match action {
2589				Some(ForwardHTLCsAction(channel_id, htlcs)) => {
2590					assert_eq!(channel_id, ChannelId([200; 32]));
2591					assert_eq!(
2592						htlcs,
2593						vec![InterceptedHTLC {
2594							intercept_id: InterceptId([3; 32]),
2595							expected_outbound_amount_msat: 2_000_000,
2596							payment_hash: PaymentHash([102; 32]),
2597						}]
2598					);
2599				},
2600				_ => panic!("Unexpected action when forwarded payment."),
2601			}
2602		}
2603		// Any new HTLC gets automatically forwarded.
2604		{
2605			let action = state
2606				.htlc_intercepted(
2607					&opening_fee_params,
2608					&payment_size_msat,
2609					InterceptedHTLC {
2610						intercept_id: InterceptId([5; 32]),
2611						expected_outbound_amount_msat: 200_000_000,
2612						payment_hash: PaymentHash([103; 32]),
2613					},
2614				)
2615				.unwrap();
2616			assert!(matches!(state, OutboundJITChannelState::PaymentForwarded { .. }));
2617			assert!(
2618				matches!(action, Some(HTLCInterceptedAction::ForwardHTLC(channel_id)) if channel_id == ChannelId([200; 32]))
2619			);
2620		}
2621	}
2622
2623	#[test]
2624	fn test_jit_channel_state_no_mpp() {
2625		let payment_size_msat = None;
2626		let opening_fee_params = LSPS2OpeningFeeParams {
2627			min_fee_msat: 10_000_000,
2628			proportional: 10_000,
2629			valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(),
2630			min_lifetime: 4032,
2631			max_client_to_self_delay: 2016,
2632			min_payment_size_msat: 10_000_000,
2633			max_payment_size_msat: 1_000_000_000,
2634			promise: "ignore".to_string(),
2635		};
2636		let mut state = OutboundJITChannelState::new();
2637		// Intercepts payment A, opening the channel.
2638		{
2639			let action = state
2640				.htlc_intercepted(
2641					&opening_fee_params,
2642					&payment_size_msat,
2643					InterceptedHTLC {
2644						intercept_id: InterceptId([0; 32]),
2645						expected_outbound_amount_msat: 500_000_000,
2646						payment_hash: PaymentHash([100; 32]),
2647					},
2648				)
2649				.unwrap();
2650			assert!(matches!(state, OutboundJITChannelState::PendingChannelOpen { .. }));
2651			assert!(matches!(action, Some(HTLCInterceptedAction::OpenChannel(_))));
2652		}
2653		// Intercepts payment B.
2654		{
2655			let action = state
2656				.htlc_intercepted(
2657					&opening_fee_params,
2658					&payment_size_msat,
2659					InterceptedHTLC {
2660						intercept_id: InterceptId([1; 32]),
2661						expected_outbound_amount_msat: 600_000_000,
2662						payment_hash: PaymentHash([101; 32]),
2663					},
2664				)
2665				.unwrap();
2666			assert!(matches!(state, OutboundJITChannelState::PendingChannelOpen { .. }));
2667			assert!(action.is_none());
2668		}
2669		// Channel opens, becomes ready, and payment A gets forwarded.
2670		{
2671			let ForwardPaymentAction(channel_id, payment) =
2672				state.channel_ready(ChannelId([200; 32])).unwrap();
2673			assert_eq!(channel_id, ChannelId([200; 32]));
2674			assert_eq!(payment.opening_fee_msat, 10_000_000);
2675			assert_eq!(
2676				payment.htlcs,
2677				vec![InterceptedHTLC {
2678					intercept_id: InterceptId([0; 32]),
2679					expected_outbound_amount_msat: 500_000_000,
2680					payment_hash: PaymentHash([100; 32]),
2681				},]
2682			);
2683		}
2684		// Intercepts payment C.
2685		{
2686			let action = state
2687				.htlc_intercepted(
2688					&opening_fee_params,
2689					&payment_size_msat,
2690					InterceptedHTLC {
2691						intercept_id: InterceptId([2; 32]),
2692						expected_outbound_amount_msat: 500_000_000,
2693						payment_hash: PaymentHash([102; 32]),
2694					},
2695				)
2696				.unwrap();
2697			assert!(matches!(state, OutboundJITChannelState::PendingPaymentForward { .. }));
2698			assert!(action.is_none());
2699		}
2700		// Payment A fails, and payment B is forwarded.
2701		{
2702			let action = state.htlc_handling_failed().unwrap();
2703			assert!(matches!(state, OutboundJITChannelState::PendingPaymentForward { .. }));
2704			match action {
2705				Some(ForwardPaymentAction(channel_id, payment)) => {
2706					assert_eq!(channel_id, ChannelId([200; 32]));
2707					assert_eq!(
2708						payment.htlcs,
2709						vec![InterceptedHTLC {
2710							intercept_id: InterceptId([1; 32]),
2711							expected_outbound_amount_msat: 600_000_000,
2712							payment_hash: PaymentHash([101; 32]),
2713						},]
2714					);
2715				},
2716				_ => panic!("Unexpected action when HTLC handling failed."),
2717			}
2718		}
2719		// Payment completes, queued payments get forwarded.
2720		{
2721			let action = state.payment_forwarded(10000000000).unwrap();
2722			assert!(matches!(state, OutboundJITChannelState::PaymentForwarded { .. }));
2723			match action {
2724				Some(ForwardHTLCsAction(channel_id, htlcs)) => {
2725					assert_eq!(channel_id, ChannelId([200; 32]));
2726					assert_eq!(
2727						htlcs,
2728						vec![InterceptedHTLC {
2729							intercept_id: InterceptId([2; 32]),
2730							expected_outbound_amount_msat: 500_000_000,
2731							payment_hash: PaymentHash([102; 32]),
2732						}]
2733					);
2734				},
2735				_ => panic!("Unexpected action when forwarded payment."),
2736			}
2737		}
2738		// Any new HTLC gets automatically forwarded.
2739		{
2740			let action = state
2741				.htlc_intercepted(
2742					&opening_fee_params,
2743					&payment_size_msat,
2744					InterceptedHTLC {
2745						intercept_id: InterceptId([3; 32]),
2746						expected_outbound_amount_msat: 200_000_000,
2747						payment_hash: PaymentHash([103; 32]),
2748					},
2749				)
2750				.unwrap();
2751			assert!(matches!(state, OutboundJITChannelState::PaymentForwarded { .. }));
2752			assert!(
2753				matches!(action, Some(HTLCInterceptedAction::ForwardHTLC(channel_id)) if channel_id == ChannelId([200; 32]))
2754			);
2755		}
2756	}
2757
2758	#[test]
2759	fn broadcast_not_allowed_after_non_paying_fee_payment_claimed() {
2760		let min_fee_msat: u64 = 12345;
2761		let opening_fee_params = LSPS2OpeningFeeParams {
2762			min_fee_msat,
2763			proportional: 0,
2764			valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(),
2765			min_lifetime: 144,
2766			max_client_to_self_delay: 128,
2767			min_payment_size_msat: 1,
2768			max_payment_size_msat: 10_000_000_000,
2769			promise: "ignore".to_string(),
2770		};
2771
2772		let payment_size_msat = Some(1_000_000);
2773		let user_channel_id = 4242u128;
2774		let mut jit_channel = OutboundJITChannel::new(
2775			payment_size_msat,
2776			opening_fee_params.clone(),
2777			user_channel_id,
2778			true,
2779		);
2780
2781		let opening_payment_hash = PaymentHash([42; 32]);
2782		let htlcs_for_opening = [
2783			InterceptedHTLC {
2784				intercept_id: InterceptId([0; 32]),
2785				expected_outbound_amount_msat: 400_000,
2786				payment_hash: opening_payment_hash,
2787			},
2788			InterceptedHTLC {
2789				intercept_id: InterceptId([1; 32]),
2790				expected_outbound_amount_msat: 600_000,
2791				payment_hash: opening_payment_hash,
2792			},
2793		];
2794
2795		assert!(jit_channel.htlc_intercepted(htlcs_for_opening[0].clone()).unwrap().is_none());
2796		let action = jit_channel.htlc_intercepted(htlcs_for_opening[1].clone()).unwrap();
2797		match action {
2798			Some(HTLCInterceptedAction::OpenChannel(_)) => {},
2799			other => panic!("Expected OpenChannel action, got {:?}", other),
2800		}
2801
2802		let channel_id = ChannelId([7; 32]);
2803		let ForwardPaymentAction(_, fee_payment) = jit_channel.channel_ready(channel_id).unwrap();
2804		assert_eq!(fee_payment.opening_fee_msat, min_fee_msat);
2805
2806		let followup = jit_channel.htlc_handling_failed().unwrap();
2807		assert!(followup.is_none());
2808
2809		let dummy_tx = Transaction {
2810			version: Version(2),
2811			lock_time: LockTime::ZERO,
2812			input: vec![],
2813			output: vec![],
2814		};
2815		jit_channel.set_funding_tx(dummy_tx);
2816		jit_channel.set_funding_tx_broadcast_safe(true);
2817		assert!(
2818			!jit_channel.should_broadcast_funding_transaction(),
2819			"Should not broadcast before any successful payment is claimed"
2820		);
2821
2822		let second_payment_hash = PaymentHash([99; 32]);
2823		let second_htlc = InterceptedHTLC {
2824			intercept_id: InterceptId([2; 32]),
2825			expected_outbound_amount_msat: min_fee_msat,
2826			payment_hash: second_payment_hash,
2827		};
2828		let action2 = jit_channel.htlc_intercepted(second_htlc).unwrap();
2829		let (forwarded_channel_id, fee_payment2) = match action2 {
2830			Some(HTLCInterceptedAction::ForwardPayment(cid, fp)) => (cid, fp),
2831			other => panic!("Expected ForwardPayment for second HTLC, got {:?}", other),
2832		};
2833		assert_eq!(forwarded_channel_id, channel_id);
2834		assert_eq!(fee_payment2.opening_fee_msat, min_fee_msat);
2835
2836		assert!(
2837			!jit_channel.should_broadcast_funding_transaction(),
2838			"Should not broadcast before any successful payment is claimed"
2839		);
2840
2841		// Forward a payment that is not enough to cover the fees
2842		let _ = jit_channel.payment_forwarded(min_fee_msat - 1).unwrap();
2843
2844		assert!(
2845			!jit_channel.should_broadcast_funding_transaction(),
2846			"Should not broadcast before all the fees are collected"
2847		);
2848
2849		let _ = jit_channel.payment_forwarded(min_fee_msat).unwrap();
2850
2851		let broadcast_allowed = jit_channel.should_broadcast_funding_transaction();
2852
2853		assert!(
2854			broadcast_allowed,
2855			"Broadcast was not allowed even though all the skimmed fees were collected"
2856		);
2857	}
2858}