lightning_liquidity/lsps2/
service.rs

1// This file is Copyright its original authors, visible in version control
2// history.
3//
4// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7// You may not use this file except in accordance with one or both of these
8// licenses.
9
10//! Contains the main bLIP-52 / LSPS2 server-side object, [`LSPS2ServiceHandler`].
11
12use alloc::boxed::Box;
13use alloc::string::{String, ToString};
14use alloc::vec::Vec;
15use lightning::util::persist::KVStore;
16
17use core::cmp::Ordering as CmpOrdering;
18use core::future::Future as StdFuture;
19use core::ops::Deref;
20use core::sync::atomic::{AtomicUsize, Ordering};
21use core::task;
22
23use crate::events::EventQueue;
24use crate::lsps0::ser::{
25	LSPSMessage, LSPSProtocolMessageHandler, LSPSRequestId, LSPSResponseError,
26	JSONRPC_INTERNAL_ERROR_ERROR_CODE, JSONRPC_INTERNAL_ERROR_ERROR_MESSAGE,
27	LSPS0_CLIENT_REJECTED_ERROR_CODE,
28};
29use crate::lsps2::event::LSPS2ServiceEvent;
30use crate::lsps2::payment_queue::{InterceptedHTLC, PaymentQueue};
31use crate::lsps2::utils::{
32	compute_opening_fee, is_expired_opening_fee_params, is_valid_opening_fee_params,
33};
34use crate::message_queue::{MessageQueue, MessageQueueNotifierGuard};
35use crate::persist::{
36	LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
37};
38use crate::prelude::hash_map::Entry;
39use crate::prelude::{new_hash_map, HashMap};
40use crate::sync::{Arc, Mutex, MutexGuard, RwLock};
41use crate::utils::async_poll::dummy_waker;
42
43use lightning::chain::chaininterface::BroadcasterInterface;
44use lightning::events::HTLCHandlingFailureType;
45use lightning::ln::channelmanager::{AChannelManager, FailureCode, InterceptId};
46use lightning::ln::msgs::{ErrorAction, LightningError};
47use lightning::ln::types::ChannelId;
48use lightning::util::errors::APIError;
49use lightning::util::logger::Level;
50use lightning::util::ser::Writeable;
51use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum};
52
53use lightning_types::payment::PaymentHash;
54
55use bitcoin::secp256k1::PublicKey;
56use bitcoin::Transaction;
57
58use crate::lsps2::msgs::{
59	LSPS2BuyRequest, LSPS2BuyResponse, LSPS2GetInfoRequest, LSPS2GetInfoResponse, LSPS2Message,
60	LSPS2OpeningFeeParams, LSPS2RawOpeningFeeParams, LSPS2Request, LSPS2Response,
61	LSPS2_BUY_REQUEST_INVALID_OPENING_FEE_PARAMS_ERROR_CODE,
62	LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_LARGE_ERROR_CODE,
63	LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_SMALL_ERROR_CODE,
64	LSPS2_GET_INFO_REQUEST_UNRECOGNIZED_OR_STALE_TOKEN_ERROR_CODE,
65};
66
67const MAX_PENDING_REQUESTS_PER_PEER: usize = 10;
68const MAX_TOTAL_PENDING_REQUESTS: usize = 1000;
69const MAX_TOTAL_PEERS: usize = 100000;
70
71/// Server-side configuration options for JIT channels.
72#[derive(Clone, Debug)]
73pub struct LSPS2ServiceConfig {
74	/// Used to calculate the promise for channel parameters supplied to clients.
75	///
76	/// Note: If this changes then old promises given out will be considered invalid.
77	pub promise_secret: [u8; 32],
78}
79
80/// Information about the initial payment size and JIT channel opening fee.
81/// This will be provided in the `OpenChannel` event.
82#[derive(Clone, Debug, PartialEq)]
83struct OpenChannelParams {
84	opening_fee_msat: u64,
85	amt_to_forward_msat: u64,
86}
87
88/// A payment that will be forwarded while skimming the given JIT channel opening fee.
89#[derive(Clone, Debug, PartialEq)]
90struct FeePayment {
91	htlcs: Vec<InterceptedHTLC>,
92	opening_fee_msat: u64,
93}
94
95#[derive(Debug)]
96struct ChannelStateError(String);
97
98impl From<ChannelStateError> for LightningError {
99	fn from(value: ChannelStateError) -> Self {
100		LightningError { err: value.0, action: ErrorAction::IgnoreAndLog(Level::Info) }
101	}
102}
103
104/// Possible actions that need to be taken when an HTLC is intercepted.
105#[derive(Debug, PartialEq)]
106enum HTLCInterceptedAction {
107	/// The opening of the JIT channel.
108	OpenChannel(OpenChannelParams),
109	/// The forwarding of the intercepted HTLC.
110	ForwardHTLC(ChannelId),
111	ForwardPayment(ChannelId, FeePayment),
112}
113
114/// The forwarding of a payment while skimming the JIT channel opening fee.
115#[derive(Debug, PartialEq)]
116struct ForwardPaymentAction(ChannelId, FeePayment);
117
118/// The forwarding of previously intercepted HTLCs without skimming any further fees.
119#[derive(Debug, PartialEq)]
120struct ForwardHTLCsAction(ChannelId, Vec<InterceptedHTLC>);
121
122#[derive(Debug, Clone)]
123enum TrustModel {
124	ClientTrustsLsp { funding_tx_broadcast_safe: bool, funding_tx: Option<Transaction> },
125	LspTrustsClient,
126}
127
128impl TrustModel {
129	fn should_manually_broadcast(&self, state_is_payment_forward: bool) -> bool {
130		match self {
131			TrustModel::ClientTrustsLsp { funding_tx_broadcast_safe, funding_tx } => {
132				*funding_tx_broadcast_safe && state_is_payment_forward && funding_tx.is_some()
133			},
134			// in lsp-trusts-client, the broadcast is automatic, so we never need to manually broadcast.
135			TrustModel::LspTrustsClient => false,
136		}
137	}
138
139	fn new(client_trusts_lsp: bool) -> Self {
140		if client_trusts_lsp {
141			TrustModel::ClientTrustsLsp { funding_tx_broadcast_safe: false, funding_tx: None }
142		} else {
143			TrustModel::LspTrustsClient
144		}
145	}
146
147	fn set_funding_tx(&mut self, funding_tx: Transaction) {
148		match self {
149			TrustModel::ClientTrustsLsp { funding_tx: tx, .. } => {
150				*tx = Some(funding_tx);
151			},
152			TrustModel::LspTrustsClient => {
153				// No-op
154			},
155		}
156	}
157
158	fn set_funding_tx_broadcast_safe(&mut self, funding_tx_broadcast_safe: bool) {
159		match self {
160			TrustModel::ClientTrustsLsp { funding_tx_broadcast_safe: safe, .. } => {
161				*safe = funding_tx_broadcast_safe;
162			},
163			TrustModel::LspTrustsClient => {
164				// No-op
165			},
166		}
167	}
168
169	fn get_funding_tx(&self) -> Option<&Transaction> {
170		match self {
171			TrustModel::ClientTrustsLsp { funding_tx, .. } => funding_tx.as_ref(),
172			_ => None,
173		}
174	}
175
176	fn is_client_trusts_lsp(&self) -> bool {
177		match self {
178			TrustModel::ClientTrustsLsp { .. } => true,
179			TrustModel::LspTrustsClient => false,
180		}
181	}
182}
183
184impl_writeable_tlv_based_enum!(TrustModel,
185	(0, ClientTrustsLsp) => {
186		(0, funding_tx_broadcast_safe, required),
187		(2, funding_tx, option),
188	},
189	(2, LspTrustsClient) => {
190	},
191);
192
193/// The different states a requested JIT channel can be in.
194#[derive(Debug)]
195enum OutboundJITChannelState {
196	/// The JIT channel SCID was created after a buy request, and we are awaiting an initial payment
197	/// of sufficient size to open the channel.
198	PendingInitialPayment { payment_queue: PaymentQueue },
199	/// An initial payment of sufficient size was intercepted to the JIT channel SCID, triggering the
200	/// opening of the channel. We are awaiting the completion of the channel establishment.
201	PendingChannelOpen { payment_queue: PaymentQueue, opening_fee_msat: u64 },
202	/// The channel is open and a payment was forwarded while skimming the JIT channel fee.
203	/// No further payments can be forwarded until the pending payment succeeds or fails, as we need
204	/// to know whether the JIT channel fee needs to be skimmed from a next payment or not.
205	PendingPaymentForward {
206		payment_queue: PaymentQueue,
207		opening_fee_msat: u64,
208		channel_id: ChannelId,
209	},
210	/// The channel is open, no payment is currently being forwarded, and the JIT channel fee still
211	/// needs to be paid. This state can occur when the initial payment fails, e.g. due to a
212	/// prepayment probe. We are awaiting a next payment of sufficient size to forward and skim the
213	/// JIT channel fee.
214	PendingPayment { payment_queue: PaymentQueue, opening_fee_msat: u64, channel_id: ChannelId },
215	/// The channel is open and a payment was successfully forwarded while skimming the JIT channel
216	/// fee. Any subsequent HTLCs can be forwarded without additional logic.
217	PaymentForwarded { channel_id: ChannelId },
218}
219
220impl OutboundJITChannelState {
221	fn new() -> Self {
222		OutboundJITChannelState::PendingInitialPayment { payment_queue: PaymentQueue::new() }
223	}
224
225	fn htlc_intercepted(
226		&mut self, opening_fee_params: &LSPS2OpeningFeeParams, payment_size_msat: &Option<u64>,
227		htlc: InterceptedHTLC,
228	) -> Result<Option<HTLCInterceptedAction>, ChannelStateError> {
229		match self {
230			OutboundJITChannelState::PendingInitialPayment { payment_queue } => {
231				let (total_expected_outbound_amount_msat, num_htlcs) = payment_queue.add_htlc(htlc);
232
233				let (expected_payment_size_msat, mpp_mode) =
234					if let Some(payment_size_msat) = payment_size_msat {
235						(*payment_size_msat, true)
236					} else {
237						debug_assert_eq!(num_htlcs, 1);
238						if num_htlcs != 1 {
239							return Err(ChannelStateError(
240								"Paying via multiple HTLCs is disallowed in \"no-MPP+var-invoice\" mode.".to_string()
241							));
242						}
243						(total_expected_outbound_amount_msat, false)
244					};
245
246				if expected_payment_size_msat < opening_fee_params.min_payment_size_msat
247					|| expected_payment_size_msat > opening_fee_params.max_payment_size_msat
248				{
249					return Err(ChannelStateError(
250							format!("Payment size violates our limits: expected_payment_size_msat = {}, min_payment_size_msat = {}, max_payment_size_msat = {}",
251									expected_payment_size_msat,
252									opening_fee_params.min_payment_size_msat,
253									opening_fee_params.max_payment_size_msat
254							)));
255				}
256
257				let opening_fee_msat = compute_opening_fee(
258					expected_payment_size_msat,
259					opening_fee_params.min_fee_msat,
260					opening_fee_params.proportional.into(),
261				).ok_or(ChannelStateError(
262					format!("Could not compute valid opening fee with min_fee_msat = {}, proportional = {}, and expected_payment_size_msat = {}",
263						opening_fee_params.min_fee_msat,
264						opening_fee_params.proportional,
265						expected_payment_size_msat
266					))
267				)?;
268
269				let amt_to_forward_msat =
270					expected_payment_size_msat.saturating_sub(opening_fee_msat);
271
272				// Go ahead and open the channel if we intercepted sufficient HTLCs.
273				if total_expected_outbound_amount_msat >= expected_payment_size_msat
274					&& amt_to_forward_msat > 0
275				{
276					*self = OutboundJITChannelState::PendingChannelOpen {
277						payment_queue: core::mem::take(payment_queue),
278						opening_fee_msat,
279					};
280					let open_channel = HTLCInterceptedAction::OpenChannel(OpenChannelParams {
281						opening_fee_msat,
282						amt_to_forward_msat,
283					});
284					Ok(Some(open_channel))
285				} else {
286					if mpp_mode {
287						*self = OutboundJITChannelState::PendingInitialPayment {
288							payment_queue: core::mem::take(payment_queue),
289						};
290						Ok(None)
291					} else {
292						Err(ChannelStateError(
293							"Intercepted HTLC is too small to pay opening fee".to_string(),
294						))
295					}
296				}
297			},
298			OutboundJITChannelState::PendingChannelOpen { payment_queue, opening_fee_msat } => {
299				let mut payment_queue = core::mem::take(payment_queue);
300				payment_queue.add_htlc(htlc);
301				*self = OutboundJITChannelState::PendingChannelOpen {
302					payment_queue,
303					opening_fee_msat: *opening_fee_msat,
304				};
305				Ok(None)
306			},
307			OutboundJITChannelState::PendingPaymentForward {
308				payment_queue,
309				opening_fee_msat,
310				channel_id,
311			} => {
312				let mut payment_queue = core::mem::take(payment_queue);
313				payment_queue.add_htlc(htlc);
314				*self = OutboundJITChannelState::PendingPaymentForward {
315					payment_queue,
316					opening_fee_msat: *opening_fee_msat,
317					channel_id: *channel_id,
318				};
319				Ok(None)
320			},
321			OutboundJITChannelState::PendingPayment {
322				payment_queue,
323				opening_fee_msat,
324				channel_id,
325			} => {
326				let mut payment_queue = core::mem::take(payment_queue);
327				payment_queue.add_htlc(htlc);
328				if let Some(entry) = payment_queue.pop_greater_than_msat(*opening_fee_msat) {
329					let forward_payment = HTLCInterceptedAction::ForwardPayment(
330						*channel_id,
331						FeePayment { htlcs: entry.htlcs, opening_fee_msat: *opening_fee_msat },
332					);
333					*self = OutboundJITChannelState::PendingPaymentForward {
334						payment_queue,
335						opening_fee_msat: *opening_fee_msat,
336						channel_id: *channel_id,
337					};
338					Ok(Some(forward_payment))
339				} else {
340					*self = OutboundJITChannelState::PendingPayment {
341						payment_queue,
342						opening_fee_msat: *opening_fee_msat,
343						channel_id: *channel_id,
344					};
345					Ok(None)
346				}
347			},
348			OutboundJITChannelState::PaymentForwarded { channel_id } => {
349				let forward = HTLCInterceptedAction::ForwardHTLC(*channel_id);
350				*self = OutboundJITChannelState::PaymentForwarded { channel_id: *channel_id };
351				Ok(Some(forward))
352			},
353		}
354	}
355
356	fn channel_ready(
357		&mut self, channel_id: ChannelId,
358	) -> Result<ForwardPaymentAction, ChannelStateError> {
359		match self {
360			OutboundJITChannelState::PendingChannelOpen { payment_queue, opening_fee_msat } => {
361				if let Some(entry) = payment_queue.pop_greater_than_msat(*opening_fee_msat) {
362					let forward_payment = ForwardPaymentAction(
363						channel_id,
364						FeePayment { htlcs: entry.htlcs, opening_fee_msat: *opening_fee_msat },
365					);
366					*self = OutboundJITChannelState::PendingPaymentForward {
367						payment_queue: core::mem::take(payment_queue),
368						opening_fee_msat: *opening_fee_msat,
369						channel_id,
370					};
371					Ok(forward_payment)
372				} else {
373					return Err(ChannelStateError(
374						"No forwardable payment available when moving to channel ready."
375							.to_string(),
376					));
377				}
378			},
379			state => Err(ChannelStateError(format!(
380				"Channel ready received when JIT Channel was in state: {:?}",
381				state
382			))),
383		}
384	}
385
386	fn htlc_handling_failed(&mut self) -> Result<Option<ForwardPaymentAction>, ChannelStateError> {
387		match self {
388			OutboundJITChannelState::PendingPaymentForward {
389				payment_queue,
390				opening_fee_msat,
391				channel_id,
392			} => {
393				if let Some(entry) = payment_queue.pop_greater_than_msat(*opening_fee_msat) {
394					let forward_payment = ForwardPaymentAction(
395						*channel_id,
396						FeePayment { htlcs: entry.htlcs, opening_fee_msat: *opening_fee_msat },
397					);
398					*self = OutboundJITChannelState::PendingPaymentForward {
399						payment_queue: core::mem::take(payment_queue),
400						opening_fee_msat: *opening_fee_msat,
401						channel_id: *channel_id,
402					};
403					Ok(Some(forward_payment))
404				} else {
405					*self = OutboundJITChannelState::PendingPayment {
406						payment_queue: core::mem::take(payment_queue),
407						opening_fee_msat: *opening_fee_msat,
408						channel_id: *channel_id,
409					};
410					Ok(None)
411				}
412			},
413			OutboundJITChannelState::PendingPayment {
414				payment_queue,
415				opening_fee_msat,
416				channel_id,
417			} => {
418				*self = OutboundJITChannelState::PendingPayment {
419					payment_queue: core::mem::take(payment_queue),
420					opening_fee_msat: *opening_fee_msat,
421					channel_id: *channel_id,
422				};
423				Ok(None)
424			},
425			OutboundJITChannelState::PaymentForwarded { channel_id } => {
426				*self = OutboundJITChannelState::PaymentForwarded { channel_id: *channel_id };
427				Ok(None)
428			},
429			state => Err(ChannelStateError(format!(
430				"HTLC handling failed when JIT Channel was in state: {:?}",
431				state
432			))),
433		}
434	}
435
436	fn payment_forwarded(
437		&mut self, skimmed_fee_msat: u64,
438	) -> Result<Option<ForwardHTLCsAction>, ChannelStateError> {
439		match self {
440			OutboundJITChannelState::PendingPaymentForward {
441				payment_queue,
442				channel_id,
443				opening_fee_msat,
444			} => {
445				if skimmed_fee_msat >= *opening_fee_msat {
446					let mut pq = core::mem::take(payment_queue);
447					let forward_htlcs = ForwardHTLCsAction(*channel_id, pq.clear());
448					*self = OutboundJITChannelState::PaymentForwarded { channel_id: *channel_id };
449					Ok(Some(forward_htlcs))
450				} else {
451					*self = OutboundJITChannelState::PendingPaymentForward {
452						payment_queue: core::mem::take(payment_queue),
453						opening_fee_msat: *opening_fee_msat,
454						channel_id: *channel_id,
455					};
456					Ok(None)
457				}
458			},
459			OutboundJITChannelState::PaymentForwarded { channel_id } => {
460				*self = OutboundJITChannelState::PaymentForwarded { channel_id: *channel_id };
461				Ok(None)
462			},
463			state => Err(ChannelStateError(format!(
464				"Payment forwarded when JIT Channel was in state: {:?}",
465				state
466			))),
467		}
468	}
469}
470
471impl_writeable_tlv_based_enum!(OutboundJITChannelState,
472	(0, PendingInitialPayment) => {
473		(0, payment_queue, required),
474	},
475	(2, PendingChannelOpen) => {
476		(0, payment_queue, required),
477		(2, opening_fee_msat, required),
478	},
479	(4, PendingPaymentForward) => {
480		(0, payment_queue, required),
481		(2, opening_fee_msat, required),
482		(4, channel_id, required),
483	},
484	(6, PendingPayment) => {
485		(0, payment_queue, required),
486		(2, opening_fee_msat, required),
487		(4, channel_id, required),
488	},
489	(8, PaymentForwarded) => {
490		(0, channel_id, required),
491	},
492);
493
494struct OutboundJITChannel {
495	state: OutboundJITChannelState,
496	user_channel_id: u128,
497	opening_fee_params: LSPS2OpeningFeeParams,
498	payment_size_msat: Option<u64>,
499	trust_model: TrustModel,
500}
501
502impl_writeable_tlv_based!(OutboundJITChannel, {
503	(0, state, required),
504	(2, user_channel_id, required),
505	(4, opening_fee_params, required),
506	(6, payment_size_msat, option),
507	(8, trust_model, required),
508});
509
510impl OutboundJITChannel {
511	fn new(
512		payment_size_msat: Option<u64>, opening_fee_params: LSPS2OpeningFeeParams,
513		user_channel_id: u128, client_trusts_lsp: bool,
514	) -> Self {
515		Self {
516			user_channel_id,
517			state: OutboundJITChannelState::new(),
518			opening_fee_params,
519			payment_size_msat,
520			trust_model: TrustModel::new(client_trusts_lsp),
521		}
522	}
523
524	fn htlc_intercepted(
525		&mut self, htlc: InterceptedHTLC,
526	) -> Result<Option<HTLCInterceptedAction>, LightningError> {
527		let action =
528			self.state.htlc_intercepted(&self.opening_fee_params, &self.payment_size_msat, htlc)?;
529		Ok(action)
530	}
531
532	fn htlc_handling_failed(&mut self) -> Result<Option<ForwardPaymentAction>, LightningError> {
533		let action = self.state.htlc_handling_failed()?;
534		Ok(action)
535	}
536
537	fn channel_ready(
538		&mut self, channel_id: ChannelId,
539	) -> Result<ForwardPaymentAction, LightningError> {
540		let action = self.state.channel_ready(channel_id)?;
541		Ok(action)
542	}
543
544	fn payment_forwarded(
545		&mut self, skimmed_fee_msat: u64,
546	) -> Result<Option<ForwardHTLCsAction>, LightningError> {
547		let action = self.state.payment_forwarded(skimmed_fee_msat)?;
548		Ok(action)
549	}
550
551	fn is_pending_initial_payment(&self) -> bool {
552		matches!(self.state, OutboundJITChannelState::PendingInitialPayment { .. })
553	}
554
555	fn is_prunable(&self) -> bool {
556		// We deem an OutboundJITChannel prunable if our offer expired and we haven't intercepted
557		// any HTLCs initiating the flow yet.
558		let is_expired = is_expired_opening_fee_params(&self.opening_fee_params);
559		self.is_pending_initial_payment() && is_expired
560	}
561
562	fn set_funding_tx(&mut self, funding_tx: Transaction) {
563		self.trust_model.set_funding_tx(funding_tx);
564	}
565
566	fn set_funding_tx_broadcast_safe(&mut self, funding_tx_broadcast_safe: bool) {
567		self.trust_model.set_funding_tx_broadcast_safe(funding_tx_broadcast_safe);
568	}
569
570	fn should_broadcast_funding_transaction(&self) -> bool {
571		self.trust_model.should_manually_broadcast(matches!(
572			self.state,
573			OutboundJITChannelState::PaymentForwarded { .. }
574		))
575	}
576
577	fn get_channel_id(&self) -> Option<ChannelId> {
578		match self.state {
579			OutboundJITChannelState::PaymentForwarded { channel_id } => Some(channel_id),
580			_ => None,
581		}
582	}
583
584	fn get_funding_tx(&self) -> Option<&Transaction> {
585		self.trust_model.get_funding_tx()
586	}
587
588	fn is_client_trusts_lsp(&self) -> bool {
589		self.trust_model.is_client_trusts_lsp()
590	}
591}
592
593pub(crate) struct PeerState {
594	outbound_channels_by_intercept_scid: HashMap<u64, OutboundJITChannel>,
595	intercept_scid_by_user_channel_id: HashMap<u128, u64>,
596	intercept_scid_by_channel_id: HashMap<ChannelId, u64>,
597	pending_requests: HashMap<LSPSRequestId, LSPS2Request>,
598	needs_persist: bool,
599}
600
601impl PeerState {
602	fn new() -> Self {
603		let outbound_channels_by_intercept_scid = new_hash_map();
604		let pending_requests = new_hash_map();
605		let intercept_scid_by_user_channel_id = new_hash_map();
606		let intercept_scid_by_channel_id = new_hash_map();
607		let needs_persist = false;
608		Self {
609			outbound_channels_by_intercept_scid,
610			pending_requests,
611			intercept_scid_by_user_channel_id,
612			intercept_scid_by_channel_id,
613			needs_persist,
614		}
615	}
616
617	fn insert_outbound_channel(&mut self, intercept_scid: u64, channel: OutboundJITChannel) {
618		self.outbound_channels_by_intercept_scid.insert(intercept_scid, channel);
619		self.needs_persist |= true;
620	}
621
622	fn prune_pending_requests(&mut self) {
623		self.pending_requests.retain(|_, entry| {
624			match entry {
625				LSPS2Request::GetInfo(_) => false,
626				LSPS2Request::Buy(request) => {
627					// Prune any expired buy requests.
628					!is_expired_opening_fee_params(&request.opening_fee_params)
629				},
630			}
631		});
632	}
633
634	fn prune_expired_request_state(&mut self) {
635		self.outbound_channels_by_intercept_scid.retain(|intercept_scid, entry| {
636			if entry.is_prunable() {
637				// We abort the flow, and prune any data kept.
638				self.intercept_scid_by_channel_id.retain(|_, iscid| intercept_scid != iscid);
639				self.intercept_scid_by_user_channel_id.retain(|_, iscid| intercept_scid != iscid);
640				self.needs_persist |= true;
641				return false;
642			}
643			true
644		});
645	}
646
647	fn pending_requests_and_channels(&self) -> usize {
648		let pending_requests = self.pending_requests.len();
649		let pending_outbound_channels = self
650			.outbound_channels_by_intercept_scid
651			.iter()
652			.filter(|(_, v)| v.is_pending_initial_payment())
653			.count();
654		pending_requests + pending_outbound_channels
655	}
656
657	fn is_prunable(&self) -> bool {
658		// Return whether the entire state is empty.
659		self.pending_requests.is_empty() && self.outbound_channels_by_intercept_scid.is_empty()
660	}
661}
662
663impl_writeable_tlv_based!(PeerState, {
664	(0, outbound_channels_by_intercept_scid, required),
665	(2, intercept_scid_by_user_channel_id, required),
666	(4, intercept_scid_by_channel_id, required),
667	(_unused, pending_requests, (static_value, new_hash_map())),
668	(_unused, needs_persist, (static_value, false)),
669});
670
671macro_rules! get_or_insert_peer_state_entry {
672	($self: ident, $outer_state_lock: expr, $message_queue_notifier: expr, $counterparty_node_id: expr) => {{
673		// Return an internal error and abort if we hit the maximum allowed number of total peers.
674		let is_limited_by_max_total_peers = $outer_state_lock.len() >= MAX_TOTAL_PEERS;
675		match $outer_state_lock.entry(*$counterparty_node_id) {
676			Entry::Vacant(e) => {
677				if is_limited_by_max_total_peers {
678					let error_response = LSPSResponseError {
679						code: JSONRPC_INTERNAL_ERROR_ERROR_CODE,
680						message: JSONRPC_INTERNAL_ERROR_ERROR_MESSAGE.to_string(), data: None,
681					};
682
683					let msg = LSPSMessage::Invalid(error_response);
684					$message_queue_notifier.enqueue($counterparty_node_id, msg);
685
686					let err = format!(
687						"Dropping request from peer {} due to reaching maximally allowed number of total peers: {}",
688						$counterparty_node_id, MAX_TOTAL_PEERS
689					);
690
691					return Err(LightningError { err, action: ErrorAction::IgnoreAndLog(Level::Error) });
692				} else {
693					e.insert(Mutex::new(PeerState::new()))
694				}
695			}
696			Entry::Occupied(e) => {
697				e.into_mut()
698			}
699		}
700
701	}}
702}
703
704/// The main object allowing to send and receive bLIP-52 / LSPS2 messages.
705pub struct LSPS2ServiceHandler<CM: Deref, K: Deref + Clone, T: Deref>
706where
707	CM::Target: AChannelManager,
708	K::Target: KVStore,
709	T::Target: BroadcasterInterface,
710{
711	channel_manager: CM,
712	kv_store: K,
713	tx_broadcaster: T,
714	pending_messages: Arc<MessageQueue>,
715	pending_events: Arc<EventQueue<K>>,
716	per_peer_state: RwLock<HashMap<PublicKey, Mutex<PeerState>>>,
717	peer_by_intercept_scid: RwLock<HashMap<u64, PublicKey>>,
718	peer_by_channel_id: RwLock<HashMap<ChannelId, PublicKey>>,
719	total_pending_requests: AtomicUsize,
720	config: LSPS2ServiceConfig,
721	persistence_in_flight: AtomicUsize,
722}
723
724impl<CM: Deref, K: Deref + Clone, T: Deref + Clone> LSPS2ServiceHandler<CM, K, T>
725where
726	CM::Target: AChannelManager,
727	K::Target: KVStore,
728	T::Target: BroadcasterInterface,
729{
730	/// Constructs a `LSPS2ServiceHandler`.
731	pub(crate) fn new(
732		per_peer_state: HashMap<PublicKey, Mutex<PeerState>>, pending_messages: Arc<MessageQueue>,
733		pending_events: Arc<EventQueue<K>>, channel_manager: CM, kv_store: K, tx_broadcaster: T,
734		config: LSPS2ServiceConfig,
735	) -> Result<Self, lightning::io::Error> {
736		let mut peer_by_intercept_scid = new_hash_map();
737		let mut peer_by_channel_id = new_hash_map();
738		for (node_id, peer_state) in per_peer_state.iter() {
739			let peer_state_lock = peer_state.lock().unwrap();
740			for (intercept_scid, _) in peer_state_lock.outbound_channels_by_intercept_scid.iter() {
741				let res = peer_by_intercept_scid.insert(*intercept_scid, *node_id);
742				debug_assert!(res.is_none(), "Intercept SCIDs should never collide");
743				if res.is_some() {
744					return Err(lightning::io::Error::new(
745						lightning::io::ErrorKind::InvalidData,
746						"Failed to read LSPS2 peer state due to data inconsistencies: Intercept SCIDs should never collide",
747					));
748				}
749			}
750
751			for (channel_id, _) in peer_state_lock.intercept_scid_by_channel_id.iter() {
752				let res = peer_by_channel_id.insert(*channel_id, *node_id);
753				debug_assert!(res.is_none(), "Channel IDs should never collide");
754				if res.is_some() {
755					return Err(lightning::io::Error::new(
756							lightning::io::ErrorKind::InvalidData,
757							"Failed to read LSPS2 peer state due to data inconsistencies: Channel IDs should never collide",
758					));
759				}
760			}
761		}
762
763		Ok(Self {
764			pending_messages,
765			pending_events,
766			per_peer_state: RwLock::new(per_peer_state),
767			peer_by_intercept_scid: RwLock::new(peer_by_intercept_scid),
768			peer_by_channel_id: RwLock::new(peer_by_channel_id),
769			total_pending_requests: AtomicUsize::new(0),
770			persistence_in_flight: AtomicUsize::new(0),
771			channel_manager,
772			kv_store,
773			tx_broadcaster,
774			config,
775		})
776	}
777
778	/// Returns a reference to the used config.
779	pub fn config(&self) -> &LSPS2ServiceConfig {
780		&self.config
781	}
782
783	/// Returns whether the peer has any active LSPS2 requests.
784	pub(crate) fn has_active_requests(&self, counterparty_node_id: &PublicKey) -> bool {
785		let outer_state_lock = self.per_peer_state.read().unwrap();
786		outer_state_lock.get(counterparty_node_id).map_or(false, |inner| {
787			let peer_state = inner.lock().unwrap();
788			!peer_state.outbound_channels_by_intercept_scid.is_empty()
789		})
790	}
791
792	/// Used by LSP to inform a client requesting a JIT Channel the token they used is invalid.
793	///
794	/// Should be called in response to receiving a [`LSPS2ServiceEvent::GetInfo`] event.
795	///
796	/// [`LSPS2ServiceEvent::GetInfo`]: crate::lsps2::event::LSPS2ServiceEvent::GetInfo
797	pub fn invalid_token_provided(
798		&self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId,
799	) -> Result<(), APIError> {
800		let mut message_queue_notifier = self.pending_messages.notifier();
801
802		let outer_state_lock = self.per_peer_state.read().unwrap();
803
804		match outer_state_lock.get(counterparty_node_id) {
805			Some(inner_state_lock) => {
806				let mut peer_state_lock = inner_state_lock.lock().unwrap();
807
808				match self.remove_pending_request(&mut peer_state_lock, &request_id) {
809					Some(LSPS2Request::GetInfo(_)) => {
810						let response = LSPS2Response::GetInfoError(LSPSResponseError {
811							code: LSPS2_GET_INFO_REQUEST_UNRECOGNIZED_OR_STALE_TOKEN_ERROR_CODE,
812							message: "an unrecognized or stale token was provided".to_string(),
813							data: None,
814						});
815						let msg = LSPS2Message::Response(request_id, response).into();
816						message_queue_notifier.enqueue(counterparty_node_id, msg);
817						Ok(())
818					},
819					_ => Err(APIError::APIMisuseError {
820						err: format!(
821							"No pending get_info request for request_id: {:?}",
822							request_id
823						),
824					}),
825				}
826			},
827			None => Err(APIError::APIMisuseError {
828				err: format!("No state for the counterparty exists: {}", counterparty_node_id),
829			}),
830		}
831	}
832
833	/// Used by LSP to provide fee parameters to a client requesting a JIT Channel.
834	///
835	/// Should be called in response to receiving a [`LSPS2ServiceEvent::GetInfo`] event.
836	///
837	/// [`LSPS2ServiceEvent::GetInfo`]: crate::lsps2::event::LSPS2ServiceEvent::GetInfo
838	pub fn opening_fee_params_generated(
839		&self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId,
840		opening_fee_params_menu: Vec<LSPS2RawOpeningFeeParams>,
841	) -> Result<(), APIError> {
842		let mut message_queue_notifier = self.pending_messages.notifier();
843
844		let outer_state_lock = self.per_peer_state.read().unwrap();
845
846		match outer_state_lock.get(counterparty_node_id) {
847			Some(inner_state_lock) => {
848				let mut peer_state_lock = inner_state_lock.lock().unwrap();
849
850				match self.remove_pending_request(&mut peer_state_lock, &request_id) {
851					Some(LSPS2Request::GetInfo(_)) => {
852						let mut opening_fee_params_menu: Vec<LSPS2OpeningFeeParams> =
853							opening_fee_params_menu
854								.into_iter()
855								.map(|param| {
856									param.into_opening_fee_params(
857										&self.config.promise_secret,
858										counterparty_node_id,
859									)
860								})
861								.collect();
862						opening_fee_params_menu.sort_by(|a, b| {
863							match a.min_fee_msat.cmp(&b.min_fee_msat) {
864								CmpOrdering::Equal => a.proportional.cmp(&b.proportional),
865								other => other,
866							}
867						});
868						let response = LSPS2Response::GetInfo(LSPS2GetInfoResponse {
869							opening_fee_params_menu,
870						});
871						let msg = LSPS2Message::Response(request_id, response).into();
872						message_queue_notifier.enqueue(counterparty_node_id, msg);
873						Ok(())
874					},
875					_ => Err(APIError::APIMisuseError {
876						err: format!(
877							"No pending get_info request for request_id: {:?}",
878							request_id
879						),
880					}),
881				}
882			},
883			None => Err(APIError::APIMisuseError {
884				err: format!("No state for the counterparty exists: {}", counterparty_node_id),
885			}),
886		}
887	}
888
889	/// Used by LSP to provide the client with the intercept scid, a unique `user_channel_id`, and
890	/// `cltv_expiry_delta` to include in their invoice.
891	///
892	/// The intercept scid must be retrieved from [`ChannelManager::get_intercept_scid`]. The given
893	/// `user_channel_id` must be locally unique and will eventually be returned via events to be
894	/// used when opening the channel via [`ChannelManager::create_channel`]. Note implementors
895	/// will need to ensure their calls to [`ChannelManager::create_channel`] are idempotent based
896	/// on this identifier.
897	///
898	/// Should be called in response to receiving a [`LSPS2ServiceEvent::BuyRequest`] event.
899	///
900	/// `client_trusts_lsp`:
901	/// * false (default) => "LSP trusts client": LSP broadcasts the funding
902	///   transaction as soon as it is safe and forwards the payment normally.
903	/// * true => "Client trusts LSP": LSP may defer broadcasting the funding
904	///   transaction until after the client claims the forwarded HTLC(s).
905	///
906	/// [`ChannelManager::create_channel`]: lightning::ln::channelmanager::ChannelManager::create_channel
907	/// [`ChannelManager::get_intercept_scid`]: lightning::ln::channelmanager::ChannelManager::get_intercept_scid
908	/// [`LSPS2ServiceEvent::BuyRequest`]: crate::lsps2::event::LSPS2ServiceEvent::BuyRequest
909	pub async fn invoice_parameters_generated(
910		&self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId, intercept_scid: u64,
911		cltv_expiry_delta: u32, client_trusts_lsp: bool, user_channel_id: u128,
912	) -> Result<(), APIError> {
913		let mut message_queue_notifier = self.pending_messages.notifier();
914		let mut should_persist = false;
915
916		match self.per_peer_state.read().unwrap().get(counterparty_node_id) {
917			Some(inner_state_lock) => {
918				let mut peer_state_lock = inner_state_lock.lock().unwrap();
919
920				match self.remove_pending_request(&mut peer_state_lock, &request_id) {
921					Some(LSPS2Request::Buy(buy_request)) => {
922						{
923							let mut peer_by_intercept_scid =
924								self.peer_by_intercept_scid.write().unwrap();
925							peer_by_intercept_scid.insert(intercept_scid, *counterparty_node_id);
926						}
927
928						let outbound_jit_channel = OutboundJITChannel::new(
929							buy_request.payment_size_msat,
930							buy_request.opening_fee_params,
931							user_channel_id,
932							client_trusts_lsp,
933						);
934
935						peer_state_lock
936							.intercept_scid_by_user_channel_id
937							.insert(user_channel_id, intercept_scid);
938						peer_state_lock
939							.insert_outbound_channel(intercept_scid, outbound_jit_channel);
940						should_persist |= peer_state_lock.needs_persist;
941
942						let response = LSPS2Response::Buy(LSPS2BuyResponse {
943							jit_channel_scid: intercept_scid.into(),
944							lsp_cltv_expiry_delta: cltv_expiry_delta,
945							client_trusts_lsp,
946						});
947						let msg = LSPS2Message::Response(request_id, response).into();
948						message_queue_notifier.enqueue(counterparty_node_id, msg);
949					},
950					_ => {
951						return Err(APIError::APIMisuseError {
952							err: format!("No pending buy request for request_id: {:?}", request_id),
953						})
954					},
955				}
956			},
957			None => {
958				return Err(APIError::APIMisuseError {
959					err: format!("No state for the counterparty exists: {}", counterparty_node_id),
960				})
961			},
962		};
963
964		if should_persist {
965			self.persist_peer_state(*counterparty_node_id).await.map_err(|e| {
966				APIError::APIMisuseError {
967					err: format!(
968						"Failed to persist peer state for {}: {}",
969						counterparty_node_id, e
970					),
971				}
972			})?;
973		}
974
975		Ok(())
976	}
977
978	/// Forward [`Event::HTLCIntercepted`] event parameters into this function.
979	///
980	/// Will fail the intercepted HTLC if the intercept scid matches a payment we are expecting
981	/// but the payment amount is incorrect or the expiry has passed.
982	///
983	/// Will generate a [`LSPS2ServiceEvent::OpenChannel`] event if the intercept scid matches a payment we are expected
984	/// and the payment amount is correct and the offer has not expired.
985	///
986	/// Will do nothing if the intercept scid does not match any of the ones we gave out.
987	///
988	/// [`Event::HTLCIntercepted`]: lightning::events::Event::HTLCIntercepted
989	/// [`LSPS2ServiceEvent::OpenChannel`]: crate::lsps2::event::LSPS2ServiceEvent::OpenChannel
990	pub async fn htlc_intercepted(
991		&self, intercept_scid: u64, intercept_id: InterceptId, expected_outbound_amount_msat: u64,
992		payment_hash: PaymentHash,
993	) -> Result<(), APIError> {
994		let event_queue_notifier = self.pending_events.notifier();
995		let mut should_persist = None;
996
997		if let Some(counterparty_node_id) =
998			self.peer_by_intercept_scid.read().unwrap().get(&intercept_scid)
999		{
1000			let outer_state_lock = self.per_peer_state.read().unwrap();
1001			match outer_state_lock.get(counterparty_node_id) {
1002				Some(inner_state_lock) => {
1003					let mut peer_state = inner_state_lock.lock().unwrap();
1004					if let Some(jit_channel) =
1005						peer_state.outbound_channels_by_intercept_scid.get_mut(&intercept_scid)
1006					{
1007						should_persist = Some(*counterparty_node_id);
1008						let htlc = InterceptedHTLC {
1009							intercept_id,
1010							expected_outbound_amount_msat,
1011							payment_hash,
1012						};
1013						match jit_channel.htlc_intercepted(htlc) {
1014							Ok(Some(HTLCInterceptedAction::OpenChannel(open_channel_params))) => {
1015								let event = LSPS2ServiceEvent::OpenChannel {
1016									their_network_key: counterparty_node_id.clone(),
1017									amt_to_forward_msat: open_channel_params.amt_to_forward_msat,
1018									opening_fee_msat: open_channel_params.opening_fee_msat,
1019									user_channel_id: jit_channel.user_channel_id,
1020									intercept_scid,
1021								};
1022								event_queue_notifier.enqueue(event);
1023							},
1024							Ok(Some(HTLCInterceptedAction::ForwardHTLC(channel_id))) => {
1025								self.channel_manager.get_cm().forward_intercepted_htlc(
1026									intercept_id,
1027									&channel_id,
1028									*counterparty_node_id,
1029									expected_outbound_amount_msat,
1030								)?;
1031							},
1032							Ok(Some(HTLCInterceptedAction::ForwardPayment(
1033								channel_id,
1034								FeePayment { opening_fee_msat, htlcs },
1035							))) => {
1036								let amounts_to_forward_msat =
1037									calculate_amount_to_forward_per_htlc(&htlcs, opening_fee_msat);
1038
1039								for (intercept_id, amount_to_forward_msat) in
1040									amounts_to_forward_msat
1041								{
1042									self.channel_manager.get_cm().forward_intercepted_htlc(
1043										intercept_id,
1044										&channel_id,
1045										*counterparty_node_id,
1046										amount_to_forward_msat,
1047									)?;
1048								}
1049							},
1050							Ok(None) => {},
1051							Err(e) => {
1052								self.channel_manager
1053									.get_cm()
1054									.fail_intercepted_htlc(intercept_id)?;
1055								peer_state
1056									.outbound_channels_by_intercept_scid
1057									.remove(&intercept_scid);
1058								// TODO: cleanup peer_by_intercept_scid
1059								return Err(APIError::APIMisuseError { err: e.err });
1060							},
1061						}
1062					}
1063
1064					peer_state.needs_persist |= should_persist.is_some();
1065				},
1066				None => {
1067					return Err(APIError::APIMisuseError {
1068						err: format!("No counterparty found for scid: {}", intercept_scid),
1069					});
1070				},
1071			}
1072		}
1073
1074		if let Some(counterparty_node_id) = should_persist {
1075			self.persist_peer_state(counterparty_node_id).await.map_err(|e| {
1076				APIError::APIMisuseError {
1077					err: format!(
1078						"Failed to persist peer state for {}: {}",
1079						counterparty_node_id, e
1080					),
1081				}
1082			})?;
1083		}
1084
1085		Ok(())
1086	}
1087
1088	/// Forward [`Event::HTLCHandlingFailed`] event parameter into this function.
1089	///
1090	/// Will attempt to forward the next payment in the queue if one is present.
1091	/// Will do nothing if the intercept scid does not match any of the ones we gave out
1092	/// or if the payment queue is empty
1093	///
1094	/// [`Event::HTLCHandlingFailed`]: lightning::events::Event::HTLCHandlingFailed
1095	pub async fn htlc_handling_failed(
1096		&self, failure_type: HTLCHandlingFailureType,
1097	) -> Result<(), APIError> {
1098		let mut should_persist = None;
1099		if let HTLCHandlingFailureType::Forward { channel_id, .. } = failure_type {
1100			let peer_by_channel_id = self.peer_by_channel_id.read().unwrap();
1101			if let Some(counterparty_node_id) = peer_by_channel_id.get(&channel_id) {
1102				let outer_state_lock = self.per_peer_state.read().unwrap();
1103				match outer_state_lock.get(counterparty_node_id) {
1104					Some(inner_state_lock) => {
1105						let mut peer_state = inner_state_lock.lock().unwrap();
1106						if let Some(intercept_scid) =
1107							peer_state.intercept_scid_by_channel_id.get(&channel_id).copied()
1108						{
1109							should_persist = Some(*counterparty_node_id);
1110
1111							if let Some(jit_channel) = peer_state
1112								.outbound_channels_by_intercept_scid
1113								.get_mut(&intercept_scid)
1114							{
1115								match jit_channel.htlc_handling_failed() {
1116									Ok(Some(ForwardPaymentAction(
1117										channel_id,
1118										FeePayment { opening_fee_msat, htlcs },
1119									))) => {
1120										let amounts_to_forward_msat =
1121											calculate_amount_to_forward_per_htlc(
1122												&htlcs,
1123												opening_fee_msat,
1124											);
1125
1126										for (intercept_id, amount_to_forward_msat) in
1127											amounts_to_forward_msat
1128										{
1129											self.channel_manager
1130												.get_cm()
1131												.forward_intercepted_htlc(
1132													intercept_id,
1133													&channel_id,
1134													*counterparty_node_id,
1135													amount_to_forward_msat,
1136												)?;
1137										}
1138									},
1139									Ok(None) => {},
1140									Err(e) => {
1141										return Err(APIError::APIMisuseError {
1142											err: format!("Unable to fail HTLC: {}.", e.err),
1143										});
1144									},
1145								}
1146							}
1147						}
1148						peer_state.needs_persist |= should_persist.is_some();
1149					},
1150					None => {},
1151				}
1152			}
1153		}
1154
1155		if let Some(counterparty_node_id) = should_persist {
1156			self.persist_peer_state(counterparty_node_id).await.map_err(|e| {
1157				APIError::APIMisuseError {
1158					err: format!(
1159						"Failed to persist peer state for {}: {}",
1160						counterparty_node_id, e
1161					),
1162				}
1163			})?;
1164		}
1165
1166		Ok(())
1167	}
1168
1169	/// Forward [`Event::PaymentForwarded`] event parameter into this function.
1170	///
1171	/// Will register the forwarded payment as having paid the JIT channel fee, and forward any held
1172	/// and future HTLCs for the SCID of the initial invoice.
1173	///
1174	/// When the reported skimmed fee equals or exceeds the promised opening fee, any HTLCs that
1175	/// were being held for that JIT channel are forwarded. In a `client_trusts_lsp` flow, once
1176	/// the fee has been fully paid, the channel's funding transaction will be broadcasted.
1177	///
1178	/// Note that `next_channel_id` and `skimmed_fee_msat` are required to be provided.
1179	/// Therefore, the corresponding [`Event::PaymentForwarded`] events need to be generated and
1180	/// serialized by LDK versions greater or equal to 0.0.122.
1181	///
1182	/// [`Event::PaymentForwarded`]: lightning::events::Event::PaymentForwarded
1183	pub async fn payment_forwarded(
1184		&self, next_channel_id: ChannelId, skimmed_fee_msat: u64,
1185	) -> Result<(), APIError> {
1186		let mut should_persist = None;
1187		if let Some(counterparty_node_id) =
1188			self.peer_by_channel_id.read().unwrap().get(&next_channel_id)
1189		{
1190			let outer_state_lock = self.per_peer_state.read().unwrap();
1191			match outer_state_lock.get(counterparty_node_id) {
1192				Some(inner_state_lock) => {
1193					let mut peer_state = inner_state_lock.lock().unwrap();
1194					if let Some(intercept_scid) =
1195						peer_state.intercept_scid_by_channel_id.get(&next_channel_id).copied()
1196					{
1197						should_persist = Some(*counterparty_node_id);
1198
1199						if let Some(jit_channel) =
1200							peer_state.outbound_channels_by_intercept_scid.get_mut(&intercept_scid)
1201						{
1202							match jit_channel.payment_forwarded(skimmed_fee_msat) {
1203								Ok(Some(ForwardHTLCsAction(channel_id, htlcs))) => {
1204									for htlc in htlcs {
1205										self.channel_manager.get_cm().forward_intercepted_htlc(
1206											htlc.intercept_id,
1207											&channel_id,
1208											*counterparty_node_id,
1209											htlc.expected_outbound_amount_msat,
1210										)?;
1211									}
1212								},
1213								Ok(None) => {},
1214								Err(e) => {
1215									return Err(APIError::APIMisuseError {
1216										err: format!(
1217											"Forwarded payment was not applicable for JIT channel: {}",
1218											e.err
1219										),
1220									})
1221								},
1222							}
1223
1224							self.broadcast_funding_transaction_if_applies(jit_channel);
1225						}
1226					} else {
1227						return Err(APIError::APIMisuseError {
1228							err: format!("No state for for channel id: {}", next_channel_id),
1229						});
1230					}
1231					peer_state.needs_persist |= should_persist.is_some();
1232				},
1233				None => {
1234					return Err(APIError::APIMisuseError {
1235						err: format!("No counterparty state for: {}", counterparty_node_id),
1236					});
1237				},
1238			}
1239		}
1240
1241		if let Some(counterparty_node_id) = should_persist {
1242			self.persist_peer_state(counterparty_node_id).await.map_err(|e| {
1243				APIError::APIMisuseError {
1244					err: format!(
1245						"Failed to persist peer state for {}: {}",
1246						counterparty_node_id, e
1247					),
1248				}
1249			})?;
1250		}
1251
1252		Ok(())
1253	}
1254
1255	/// Abandons a pending JIT‐open flow for `user_channel_id`, removing all local state.
1256	///
1257	/// This removes the intercept SCID, any outbound channel state, and associated
1258	/// channel‐ID mappings for the specified `user_channel_id`, but only while no payment
1259	/// has been forwarded yet and no channel has been opened on-chain.
1260	///
1261	/// Returns an error if:
1262	///  - there is no channel matching `user_channel_id`, or
1263	///  - a payment has already been forwarded or a channel has already been opened
1264	///
1265	/// Note: this does *not* close or roll back any on‐chain channel which may already
1266	/// have been opened. The caller must call this before or instead of initiating the channel
1267	/// open, as it only affects the local LSPS2 state and doesn't affect any channels that
1268	/// might already exist on-chain. Any pending channel open attempts must be managed
1269	/// separately.
1270	pub async fn channel_open_abandoned(
1271		&self, counterparty_node_id: &PublicKey, user_channel_id: u128,
1272	) -> Result<(), APIError> {
1273		{
1274			let outer_state_lock = self.per_peer_state.read().unwrap();
1275			let inner_state_lock = outer_state_lock.get(counterparty_node_id).ok_or_else(|| {
1276				APIError::APIMisuseError {
1277					err: format!("No counterparty state for: {}", counterparty_node_id),
1278				}
1279			})?;
1280			let mut peer_state = inner_state_lock.lock().unwrap();
1281
1282			let intercept_scid = peer_state
1283				.intercept_scid_by_user_channel_id
1284				.get(&user_channel_id)
1285				.copied()
1286				.ok_or_else(|| APIError::APIMisuseError {
1287					err: format!(
1288						"Could not find a channel with user_channel_id {}",
1289						user_channel_id
1290					),
1291				})?;
1292
1293			let jit_channel = peer_state
1294				.outbound_channels_by_intercept_scid
1295				.get(&intercept_scid)
1296				.ok_or_else(|| APIError::APIMisuseError {
1297				err: format!(
1298					"Failed to map intercept_scid {} for user_channel_id {} to a channel.",
1299					intercept_scid, user_channel_id,
1300				),
1301			})?;
1302
1303			let is_pending = matches!(
1304				jit_channel.state,
1305				OutboundJITChannelState::PendingInitialPayment { .. }
1306					| OutboundJITChannelState::PendingChannelOpen { .. }
1307			);
1308
1309			if !is_pending {
1310				return Err(APIError::APIMisuseError {
1311					err: "Cannot abandon channel open after channel creation or payment forwarding"
1312						.to_string(),
1313				});
1314			}
1315
1316			peer_state.intercept_scid_by_user_channel_id.remove(&user_channel_id);
1317			peer_state.outbound_channels_by_intercept_scid.remove(&intercept_scid);
1318			peer_state.intercept_scid_by_channel_id.retain(|_, &mut scid| scid != intercept_scid);
1319			peer_state.needs_persist |= true;
1320		}
1321
1322		self.persist_peer_state(*counterparty_node_id).await.map_err(|e| {
1323			APIError::APIMisuseError {
1324				err: format!("Failed to persist peer state for {}: {}", counterparty_node_id, e),
1325			}
1326		})?;
1327
1328		Ok(())
1329	}
1330
1331	/// Used to fail intercepted HTLCs backwards when a channel open attempt ultimately fails.
1332	///
1333	/// This function should be called after receiving an [`LSPS2ServiceEvent::OpenChannel`] event
1334	/// but only if the channel could not be successfully established. It resets the JIT channel
1335	/// state so that the payer may try the payment again.
1336	///
1337	/// [`LSPS2ServiceEvent::OpenChannel`]: crate::lsps2::event::LSPS2ServiceEvent::OpenChannel
1338	pub async fn channel_open_failed(
1339		&self, counterparty_node_id: &PublicKey, user_channel_id: u128,
1340	) -> Result<(), APIError> {
1341		{
1342			let outer_state_lock = self.per_peer_state.read().unwrap();
1343
1344			let inner_state_lock = outer_state_lock.get(counterparty_node_id).ok_or_else(|| {
1345				APIError::APIMisuseError {
1346					err: format!("No counterparty state for: {}", counterparty_node_id),
1347				}
1348			})?;
1349
1350			let mut peer_state = inner_state_lock.lock().unwrap();
1351
1352			let intercept_scid = peer_state
1353				.intercept_scid_by_user_channel_id
1354				.get(&user_channel_id)
1355				.copied()
1356				.ok_or_else(|| APIError::APIMisuseError {
1357					err: format!(
1358						"Could not find a channel with user_channel_id {}",
1359						user_channel_id
1360					),
1361				})?;
1362
1363			let jit_channel = peer_state
1364				.outbound_channels_by_intercept_scid
1365				.get_mut(&intercept_scid)
1366				.ok_or_else(|| APIError::APIMisuseError {
1367					err: format!(
1368						"Failed to map intercept_scid {} for user_channel_id {} to a channel.",
1369						intercept_scid, user_channel_id,
1370					),
1371				})?;
1372
1373			if let OutboundJITChannelState::PendingChannelOpen { payment_queue, .. } =
1374				&mut jit_channel.state
1375			{
1376				let intercepted_htlcs = payment_queue.clear();
1377				for htlc in intercepted_htlcs {
1378					self.channel_manager.get_cm().fail_htlc_backwards_with_reason(
1379						&htlc.payment_hash,
1380						FailureCode::TemporaryNodeFailure,
1381					);
1382				}
1383
1384				jit_channel.state = OutboundJITChannelState::PendingInitialPayment {
1385					payment_queue: PaymentQueue::new(),
1386				};
1387			} else {
1388				return Err(APIError::APIMisuseError {
1389					err: "Channel is not in the PendingChannelOpen state.".to_string(),
1390				});
1391			}
1392
1393			peer_state.needs_persist |= true;
1394		}
1395		self.persist_peer_state(*counterparty_node_id).await.map_err(|e| {
1396			APIError::APIMisuseError {
1397				err: format!("Failed to persist peer state for {}: {}", counterparty_node_id, e),
1398			}
1399		})?;
1400
1401		Ok(())
1402	}
1403
1404	/// Forward [`Event::ChannelReady`] event parameters into this function.
1405	///
1406	/// Will forward the intercepted HTLC if it matches a channel
1407	/// we need to forward a payment over otherwise it will be ignored.
1408	///
1409	/// [`Event::ChannelReady`]: lightning::events::Event::ChannelReady
1410	pub async fn channel_ready(
1411		&self, user_channel_id: u128, channel_id: &ChannelId, counterparty_node_id: &PublicKey,
1412	) -> Result<(), APIError> {
1413		let mut should_persist = false;
1414		{
1415			let mut peer_by_channel_id = self.peer_by_channel_id.write().unwrap();
1416			peer_by_channel_id.insert(*channel_id, *counterparty_node_id);
1417		}
1418		match self.per_peer_state.read().unwrap().get(counterparty_node_id) {
1419			Some(inner_state_lock) => {
1420				let mut peer_state = inner_state_lock.lock().unwrap();
1421				if let Some(intercept_scid) =
1422					peer_state.intercept_scid_by_user_channel_id.get(&user_channel_id).copied()
1423				{
1424					should_persist |= true;
1425					peer_state.intercept_scid_by_channel_id.insert(*channel_id, intercept_scid);
1426					if let Some(jit_channel) =
1427						peer_state.outbound_channels_by_intercept_scid.get_mut(&intercept_scid)
1428					{
1429						match jit_channel.channel_ready(*channel_id) {
1430							Ok(ForwardPaymentAction(
1431								channel_id,
1432								FeePayment { opening_fee_msat, htlcs },
1433							)) => {
1434								let amounts_to_forward_msat =
1435									calculate_amount_to_forward_per_htlc(&htlcs, opening_fee_msat);
1436
1437								for (intercept_id, amount_to_forward_msat) in
1438									amounts_to_forward_msat
1439								{
1440									self.channel_manager.get_cm().forward_intercepted_htlc(
1441										intercept_id,
1442										&channel_id,
1443										*counterparty_node_id,
1444										amount_to_forward_msat,
1445									)?;
1446								}
1447							},
1448							Err(e) => {
1449								return Err(APIError::APIMisuseError {
1450									err: format!(
1451										"Failed to transition to channel ready: {}",
1452										e.err
1453									),
1454								})
1455							},
1456						}
1457					} else {
1458						return Err(APIError::APIMisuseError {
1459							err: format!(
1460								"Could not find a channel with user_channel_id {}",
1461								user_channel_id
1462							),
1463						});
1464					}
1465				} else {
1466					return Err(APIError::APIMisuseError {
1467						err: format!(
1468							"Could not find a channel with that user_channel_id {}",
1469							user_channel_id
1470						),
1471					});
1472				}
1473				peer_state.needs_persist |= should_persist;
1474			},
1475			None => {
1476				return Err(APIError::APIMisuseError {
1477					err: format!("No counterparty state for: {}", counterparty_node_id),
1478				});
1479			},
1480		}
1481
1482		if should_persist {
1483			self.persist_peer_state(*counterparty_node_id).await.map_err(|e| {
1484				APIError::APIMisuseError {
1485					err: format!(
1486						"Failed to persist peer state for {}: {}",
1487						counterparty_node_id, e
1488					),
1489				}
1490			})?;
1491		}
1492
1493		Ok(())
1494	}
1495
1496	fn handle_get_info_request(
1497		&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
1498		params: LSPS2GetInfoRequest,
1499	) -> Result<(), LightningError> {
1500		let mut message_queue_notifier = self.pending_messages.notifier();
1501		let event_queue_notifier = self.pending_events.notifier();
1502
1503		let mut outer_state_lock = self.per_peer_state.write().unwrap();
1504		let inner_state_lock = get_or_insert_peer_state_entry!(
1505			self,
1506			outer_state_lock,
1507			message_queue_notifier,
1508			counterparty_node_id
1509		);
1510		let mut peer_state_lock = inner_state_lock.lock().unwrap();
1511		let request = LSPS2Request::GetInfo(params.clone());
1512		self.insert_pending_request(
1513			&mut peer_state_lock,
1514			&mut message_queue_notifier,
1515			request_id.clone(),
1516			*counterparty_node_id,
1517			request,
1518		)?;
1519
1520		let event = LSPS2ServiceEvent::GetInfo {
1521			request_id,
1522			counterparty_node_id: *counterparty_node_id,
1523			token: params.token,
1524		};
1525		event_queue_notifier.enqueue(event);
1526
1527		Ok(())
1528	}
1529
1530	fn handle_buy_request(
1531		&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey, params: LSPS2BuyRequest,
1532	) -> Result<(), LightningError> {
1533		let mut message_queue_notifier = self.pending_messages.notifier();
1534		let event_queue_notifier = self.pending_events.notifier();
1535		if let Some(payment_size_msat) = params.payment_size_msat {
1536			if payment_size_msat < params.opening_fee_params.min_payment_size_msat {
1537				let response = LSPS2Response::BuyError(LSPSResponseError {
1538					code: LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_SMALL_ERROR_CODE,
1539					message: "payment size is below our minimum supported payment size".to_string(),
1540					data: None,
1541				});
1542				let msg = LSPS2Message::Response(request_id, response).into();
1543				message_queue_notifier.enqueue(counterparty_node_id, msg);
1544
1545				return Err(LightningError {
1546					err: "payment size is below our minimum supported payment size".to_string(),
1547					action: ErrorAction::IgnoreAndLog(Level::Info),
1548				});
1549			}
1550
1551			if payment_size_msat > params.opening_fee_params.max_payment_size_msat {
1552				let response = LSPS2Response::BuyError(LSPSResponseError {
1553					code: LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_LARGE_ERROR_CODE,
1554					message: "payment size is above our maximum supported payment size".to_string(),
1555					data: None,
1556				});
1557				let msg = LSPS2Message::Response(request_id, response).into();
1558				message_queue_notifier.enqueue(counterparty_node_id, msg);
1559				return Err(LightningError {
1560					err: "payment size is above our maximum supported payment size".to_string(),
1561					action: ErrorAction::IgnoreAndLog(Level::Info),
1562				});
1563			}
1564
1565			match compute_opening_fee(
1566				payment_size_msat,
1567				params.opening_fee_params.min_fee_msat,
1568				params.opening_fee_params.proportional.into(),
1569			) {
1570				Some(opening_fee) => {
1571					if opening_fee >= payment_size_msat {
1572						let response = LSPS2Response::BuyError(LSPSResponseError {
1573							code: LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_SMALL_ERROR_CODE,
1574							message: "payment size is too small to cover the opening fee"
1575								.to_string(),
1576							data: None,
1577						});
1578						let msg = LSPS2Message::Response(request_id, response).into();
1579						message_queue_notifier.enqueue(counterparty_node_id, msg);
1580						return Err(LightningError {
1581							err: "payment size is too small to cover the opening fee".to_string(),
1582							action: ErrorAction::IgnoreAndLog(Level::Info),
1583						});
1584					}
1585				},
1586				None => {
1587					let response = LSPS2Response::BuyError(LSPSResponseError {
1588						code: LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_LARGE_ERROR_CODE,
1589						message: "overflow error when calculating opening_fee".to_string(),
1590						data: None,
1591					});
1592					let msg = LSPS2Message::Response(request_id, response).into();
1593					message_queue_notifier.enqueue(counterparty_node_id, msg);
1594					return Err(LightningError {
1595						err: "overflow error when calculating opening_fee".to_string(),
1596						action: ErrorAction::IgnoreAndLog(Level::Info),
1597					});
1598				},
1599			}
1600		}
1601
1602		// TODO: if payment_size_msat is specified, make sure our node has sufficient incoming liquidity from public network to receive it.
1603		if !is_valid_opening_fee_params(
1604			&params.opening_fee_params,
1605			&self.config.promise_secret,
1606			counterparty_node_id,
1607		) {
1608			let response = LSPS2Response::BuyError(LSPSResponseError {
1609				code: LSPS2_BUY_REQUEST_INVALID_OPENING_FEE_PARAMS_ERROR_CODE,
1610				message: "valid_until is already past OR the promise did not match the provided parameters".to_string(),
1611				data: None,
1612			});
1613			let msg = LSPS2Message::Response(request_id, response).into();
1614			message_queue_notifier.enqueue(counterparty_node_id, msg);
1615			return Err(LightningError {
1616				err: "invalid opening fee parameters were supplied by client".to_string(),
1617				action: ErrorAction::IgnoreAndLog(Level::Info),
1618			});
1619		}
1620
1621		let mut outer_state_lock = self.per_peer_state.write().unwrap();
1622		let inner_state_lock = get_or_insert_peer_state_entry!(
1623			self,
1624			outer_state_lock,
1625			message_queue_notifier,
1626			counterparty_node_id
1627		);
1628		let mut peer_state_lock = inner_state_lock.lock().unwrap();
1629
1630		let request = LSPS2Request::Buy(params.clone());
1631
1632		self.insert_pending_request(
1633			&mut peer_state_lock,
1634			&mut message_queue_notifier,
1635			request_id.clone(),
1636			*counterparty_node_id,
1637			request,
1638		)?;
1639
1640		let event = LSPS2ServiceEvent::BuyRequest {
1641			request_id,
1642			counterparty_node_id: *counterparty_node_id,
1643			opening_fee_params: params.opening_fee_params,
1644			payment_size_msat: params.payment_size_msat,
1645		};
1646		event_queue_notifier.enqueue(event);
1647
1648		Ok(())
1649	}
1650
1651	fn insert_pending_request<'a>(
1652		&self, peer_state_lock: &mut MutexGuard<'a, PeerState>,
1653		message_queue_notifier: &mut MessageQueueNotifierGuard, request_id: LSPSRequestId,
1654		counterparty_node_id: PublicKey, request: LSPS2Request,
1655	) -> Result<(), LightningError> {
1656		let create_pending_request_limit_exceeded_response =
1657			|message_queue_notifier: &mut MessageQueueNotifierGuard, error_message: String| {
1658				let error_details = LSPSResponseError {
1659					code: LSPS0_CLIENT_REJECTED_ERROR_CODE,
1660					message: "Reached maximum number of pending requests. Please try again later."
1661						.to_string(),
1662					data: None,
1663				};
1664				let response = match &request {
1665					LSPS2Request::GetInfo(_) => LSPS2Response::GetInfoError(error_details),
1666					LSPS2Request::Buy(_) => LSPS2Response::BuyError(error_details),
1667				};
1668				let msg = LSPS2Message::Response(request_id.clone(), response).into();
1669				message_queue_notifier.enqueue(&counterparty_node_id, msg);
1670
1671				Err(LightningError {
1672					err: error_message,
1673					action: ErrorAction::IgnoreAndLog(Level::Debug),
1674				})
1675			};
1676
1677		if self.total_pending_requests.load(Ordering::Relaxed) >= MAX_TOTAL_PENDING_REQUESTS {
1678			let error_message = format!(
1679				"Reached maximum number of total pending requests: {}",
1680				MAX_TOTAL_PENDING_REQUESTS
1681			);
1682			return create_pending_request_limit_exceeded_response(
1683				message_queue_notifier,
1684				error_message,
1685			);
1686		}
1687
1688		if peer_state_lock.pending_requests_and_channels() < MAX_PENDING_REQUESTS_PER_PEER {
1689			peer_state_lock.pending_requests.insert(request_id, request);
1690			self.total_pending_requests.fetch_add(1, Ordering::Relaxed);
1691			Ok(())
1692		} else {
1693			let error_message = format!(
1694				"Peer {} reached maximum number of pending requests: {}",
1695				counterparty_node_id, MAX_PENDING_REQUESTS_PER_PEER
1696			);
1697			create_pending_request_limit_exceeded_response(message_queue_notifier, error_message)
1698		}
1699	}
1700
1701	fn remove_pending_request<'a>(
1702		&self, peer_state_lock: &mut MutexGuard<'a, PeerState>, request_id: &LSPSRequestId,
1703	) -> Option<LSPS2Request> {
1704		match peer_state_lock.pending_requests.remove(request_id) {
1705			Some(req) => {
1706				let res = self.total_pending_requests.fetch_update(
1707					Ordering::Relaxed,
1708					Ordering::Relaxed,
1709					|x| Some(x.saturating_sub(1)),
1710				);
1711				match res {
1712					Ok(previous_value) if previous_value == 0 => debug_assert!(
1713						false,
1714						"total_pending_requests counter out-of-sync! This should never happen!"
1715					),
1716					Err(previous_value) if previous_value == 0 => debug_assert!(
1717						false,
1718						"total_pending_requests counter out-of-sync! This should never happen!"
1719					),
1720					_ => {},
1721				}
1722				Some(req)
1723			},
1724			res => res,
1725		}
1726	}
1727
1728	#[cfg(debug_assertions)]
1729	fn verify_pending_request_counter(&self) {
1730		let mut num_requests = 0;
1731		let outer_state_lock = self.per_peer_state.read().unwrap();
1732		for (_, inner) in outer_state_lock.iter() {
1733			let inner_state_lock = inner.lock().unwrap();
1734			num_requests += inner_state_lock.pending_requests.len();
1735		}
1736		debug_assert_eq!(
1737			num_requests,
1738			self.total_pending_requests.load(Ordering::Relaxed),
1739			"total_pending_requests counter out-of-sync! This should never happen!"
1740		);
1741	}
1742
1743	async fn persist_peer_state(
1744		&self, counterparty_node_id: PublicKey,
1745	) -> Result<(), lightning::io::Error> {
1746		let fut = {
1747			let outer_state_lock = self.per_peer_state.read().unwrap();
1748			match outer_state_lock.get(&counterparty_node_id) {
1749				None => {
1750					// We dropped the peer state by now.
1751					return Ok(());
1752				},
1753				Some(entry) => {
1754					let mut peer_state_lock = entry.lock().unwrap();
1755					if !peer_state_lock.needs_persist {
1756						// We already have persisted otherwise by now.
1757						return Ok(());
1758					} else {
1759						peer_state_lock.needs_persist = false;
1760						let key = counterparty_node_id.to_string();
1761						let encoded = peer_state_lock.encode();
1762						// Begin the write with the entry lock held. This avoids racing with
1763						// potentially-in-flight `persist` calls writing state for the same peer.
1764						self.kv_store.write(
1765							LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1766							LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
1767							&key,
1768							encoded,
1769						)
1770					}
1771				},
1772			}
1773		};
1774
1775		fut.await.map_err(|e| {
1776			self.per_peer_state
1777				.read()
1778				.unwrap()
1779				.get(&counterparty_node_id)
1780				.map(|p| p.lock().unwrap().needs_persist = true);
1781			e
1782		})
1783	}
1784
1785	pub(crate) async fn persist(&self) -> Result<(), lightning::io::Error> {
1786		// TODO: We should eventually persist in parallel, however, when we do, we probably want to
1787		// introduce some batching to upper-bound the number of requests inflight at any given
1788		// time.
1789
1790		if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 {
1791			// If we're not the first event processor to get here, just return early, the increment
1792			// we just did will be treated as "go around again" at the end.
1793			return Ok(());
1794		}
1795
1796		loop {
1797			let mut need_remove = Vec::new();
1798			let mut need_persist = Vec::new();
1799
1800			{
1801				// First build a list of peers to persist and prune with the read lock. This allows
1802				// us to avoid the write lock unless we actually need to remove a node.
1803				let outer_state_lock = self.per_peer_state.read().unwrap();
1804				for (counterparty_node_id, inner_state_lock) in outer_state_lock.iter() {
1805					let mut peer_state_lock = inner_state_lock.lock().unwrap();
1806					peer_state_lock.prune_expired_request_state();
1807					let is_prunable = peer_state_lock.is_prunable();
1808					if is_prunable {
1809						need_remove.push(*counterparty_node_id);
1810					} else if peer_state_lock.needs_persist {
1811						need_persist.push(*counterparty_node_id);
1812					}
1813				}
1814			}
1815
1816			for counterparty_node_id in need_persist.into_iter() {
1817				debug_assert!(!need_remove.contains(&counterparty_node_id));
1818				self.persist_peer_state(counterparty_node_id).await?;
1819			}
1820
1821			for counterparty_node_id in need_remove {
1822				let mut future_opt = None;
1823				{
1824					// We need to take the `per_peer_state` write lock to remove an entry, but also
1825					// have to hold it until after the `remove` call returns (but not through
1826					// future completion) to ensure that writes for the peer's state are
1827					// well-ordered with other `persist_peer_state` calls even across the removal
1828					// itself.
1829					let mut per_peer_state = self.per_peer_state.write().unwrap();
1830					if let Entry::Occupied(mut entry) = per_peer_state.entry(counterparty_node_id) {
1831						let state = entry.get_mut().get_mut().unwrap();
1832						if state.is_prunable() {
1833							entry.remove();
1834							let key = counterparty_node_id.to_string();
1835							future_opt = Some(self.kv_store.remove(
1836								LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1837								LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
1838								&key,
1839								true,
1840							));
1841						} else {
1842							// If the peer got new state, force a re-persist of the current state.
1843							state.needs_persist = true;
1844						}
1845					} else {
1846						// This should never happen, we can only have one `persist` call
1847						// in-progress at once and map entries are only removed by it.
1848						debug_assert!(false);
1849					}
1850				}
1851				if let Some(future) = future_opt {
1852					future.await?;
1853				} else {
1854					self.persist_peer_state(counterparty_node_id).await?;
1855				}
1856			}
1857
1858			if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 {
1859				// If another thread incremented the state while we were running we should go
1860				// around again, but only once.
1861				self.persistence_in_flight.store(1, Ordering::Release);
1862				continue;
1863			}
1864			break;
1865		}
1866
1867		Ok(())
1868	}
1869
1870	pub(crate) fn peer_disconnected(&self, counterparty_node_id: PublicKey) {
1871		let outer_state_lock = self.per_peer_state.write().unwrap();
1872		if let Some(inner_state_lock) = outer_state_lock.get(&counterparty_node_id) {
1873			let mut peer_state_lock = inner_state_lock.lock().unwrap();
1874			// We clean up the peer state, but leave removing the peer entry to the prune logic in
1875			// `persist` which removes it from the store.
1876			peer_state_lock.prune_pending_requests();
1877			peer_state_lock.prune_expired_request_state();
1878		}
1879	}
1880
1881	/// Checks if the JIT channel with the given `user_channel_id` needs manual broadcast.
1882	///
1883	/// Will be `true` if `client_trusts_lsp` is set to `true`.
1884	pub fn channel_needs_manual_broadcast(
1885		&self, user_channel_id: u128, counterparty_node_id: &PublicKey,
1886	) -> Result<bool, APIError> {
1887		let outer_state_lock = self.per_peer_state.read().unwrap();
1888		let inner_state_lock =
1889			outer_state_lock.get(counterparty_node_id).ok_or_else(|| APIError::APIMisuseError {
1890				err: format!("No counterparty state for: {}", counterparty_node_id),
1891			})?;
1892		let peer_state = inner_state_lock.lock().unwrap();
1893
1894		let intercept_scid = peer_state
1895			.intercept_scid_by_user_channel_id
1896			.get(&user_channel_id)
1897			.copied()
1898			.ok_or_else(|| APIError::APIMisuseError {
1899				err: format!("Could not find a channel with user_channel_id {}", user_channel_id),
1900			})?;
1901
1902		let jit_channel = peer_state
1903			.outbound_channels_by_intercept_scid
1904			.get(&intercept_scid)
1905			.ok_or_else(|| APIError::APIMisuseError {
1906				err: format!(
1907					"Failed to map intercept_scid {} for user_channel_id {} to a channel.",
1908					intercept_scid, user_channel_id,
1909				),
1910			})?;
1911
1912		Ok(jit_channel.is_client_trusts_lsp())
1913	}
1914
1915	/// Stores the funding transaction for a JIT channel.
1916	///
1917	/// Call this when the funding transaction is created.
1918	///
1919	/// In `client_trusts_lsp` the broadcasting of the funding transaction will be handled internally
1920	/// after you also mark it as broadcast-safe via
1921	/// [`set_funding_tx_broadcast_safe`] and once the opening fee has been collected. You do not need
1922	/// to broadcast the funding transaction yourself in this flow.
1923	///
1924	/// [`set_funding_tx_broadcast_safe`]: Self::set_funding_tx_broadcast_safe
1925	pub fn store_funding_transaction(
1926		&self, user_channel_id: u128, counterparty_node_id: &PublicKey, funding_tx: Transaction,
1927	) -> Result<(), APIError> {
1928		let outer_state_lock = self.per_peer_state.read().unwrap();
1929		let inner_state_lock =
1930			outer_state_lock.get(counterparty_node_id).ok_or_else(|| APIError::APIMisuseError {
1931				err: format!("No counterparty state for: {}", counterparty_node_id),
1932			})?;
1933		let mut peer_state = inner_state_lock.lock().unwrap();
1934
1935		let intercept_scid = peer_state
1936			.intercept_scid_by_user_channel_id
1937			.get(&user_channel_id)
1938			.copied()
1939			.ok_or_else(|| APIError::APIMisuseError {
1940				err: format!("Could not find a channel with user_channel_id {}", user_channel_id),
1941			})?;
1942
1943		let jit_channel = peer_state
1944			.outbound_channels_by_intercept_scid
1945			.get_mut(&intercept_scid)
1946			.ok_or_else(|| APIError::APIMisuseError {
1947			err: format!(
1948				"Failed to map intercept_scid {} for user_channel_id {} to a channel.",
1949				intercept_scid, user_channel_id,
1950			),
1951		})?;
1952
1953		jit_channel.set_funding_tx(funding_tx);
1954
1955		self.broadcast_funding_transaction_if_applies(jit_channel);
1956		Ok(())
1957	}
1958
1959	/// Marks that the funding transaction for the JIT channel identified by `user_channel_id`
1960	/// is now safe to broadcast.
1961	///
1962	/// In LDK call this when you receive [`Event::FundingTxBroadcastSafe`]. In other Lightning
1963	/// backends call it once the funding transaction is fully negotiated and signed (all
1964	/// signatures verified), your channel state machine will now proceed assuming the funding
1965	/// transaction will confirm, and you are intentionally deferring the actual broadcast so
1966	/// the LSPS2 flow (when `client_trusts_lsp = true`) can first collect the opening fee from
1967	/// the intercepted payment.
1968	///
1969	/// In a `client_trusts_lsp` flow, after this is set and the opening fee has been fully skimmed,
1970	/// the channel's funding transaction will be broadcasted if the channel is still usable.
1971	/// If the channel has been closed or force-closed before this point, the funding transaction will not be broadcasted.
1972	///
1973	/// [`Event::FundingTxBroadcastSafe`]: lightning::events::Event::FundingTxBroadcastSafe
1974	pub fn set_funding_tx_broadcast_safe(
1975		&self, user_channel_id: u128, counterparty_node_id: &PublicKey,
1976	) -> Result<(), APIError> {
1977		let outer_state_lock = self.per_peer_state.read().unwrap();
1978		let inner_state_lock =
1979			outer_state_lock.get(counterparty_node_id).ok_or_else(|| APIError::APIMisuseError {
1980				err: format!("No counterparty state for: {}", counterparty_node_id),
1981			})?;
1982		let mut peer_state = inner_state_lock.lock().unwrap();
1983
1984		let intercept_scid = peer_state
1985			.intercept_scid_by_user_channel_id
1986			.get(&user_channel_id)
1987			.copied()
1988			.ok_or_else(|| APIError::APIMisuseError {
1989				err: format!("Could not find a channel with user_channel_id {}", user_channel_id),
1990			})?;
1991
1992		let jit_channel = peer_state
1993			.outbound_channels_by_intercept_scid
1994			.get_mut(&intercept_scid)
1995			.ok_or_else(|| APIError::APIMisuseError {
1996			err: format!(
1997				"Failed to map intercept_scid {} for user_channel_id {} to a channel.",
1998				intercept_scid, user_channel_id,
1999			),
2000		})?;
2001
2002		jit_channel.set_funding_tx_broadcast_safe(true);
2003
2004		self.broadcast_funding_transaction_if_applies(jit_channel);
2005		Ok(())
2006	}
2007
2008	fn broadcast_funding_transaction_if_applies(&self, jit_channel: &OutboundJITChannel) {
2009		if !jit_channel.should_broadcast_funding_transaction() {
2010			return;
2011		}
2012
2013		// Broadcast the funding transaction only if the LDK channel is still usable. In
2014		// the `client_trusts_lsp` flow we delay funding broadcast until the opening fee is
2015		// collected. Before that happens, LDK may force-close the not‑yet‑funded channel
2016		// (for example when a forwarded HTLC nears expiry). Broadcasting funding after a
2017		// close could then confirm the commitment and trigger unintended on‑chain handling.
2018		// To avoid this, we check ChannelManager’s view (`is_channel_ready`) before broadcasting.
2019		let channel_id_opt = jit_channel.get_channel_id();
2020		if let Some(ch_id) = channel_id_opt {
2021			let is_channel_ready = self
2022				.channel_manager
2023				.get_cm()
2024				.list_channels()
2025				.into_iter()
2026				.any(|cd| cd.channel_id == ch_id && cd.is_channel_ready);
2027			if !is_channel_ready {
2028				return;
2029			}
2030		} else {
2031			return;
2032		}
2033
2034		if let Some(funding_tx) = jit_channel.get_funding_tx() {
2035			self.tx_broadcaster.broadcast_transactions(&[funding_tx]);
2036		}
2037	}
2038}
2039
2040impl<CM: Deref, K: Deref + Clone, T: Deref + Clone> LSPSProtocolMessageHandler
2041	for LSPS2ServiceHandler<CM, K, T>
2042where
2043	CM::Target: AChannelManager,
2044	K::Target: KVStore,
2045	T::Target: BroadcasterInterface,
2046{
2047	type ProtocolMessage = LSPS2Message;
2048	const PROTOCOL_NUMBER: Option<u16> = Some(2);
2049
2050	fn handle_message(
2051		&self, message: Self::ProtocolMessage, counterparty_node_id: &PublicKey,
2052	) -> Result<(), LightningError> {
2053		match message {
2054			LSPS2Message::Request(request_id, request) => {
2055				let res = match request {
2056					LSPS2Request::GetInfo(params) => {
2057						self.handle_get_info_request(request_id, counterparty_node_id, params)
2058					},
2059					LSPS2Request::Buy(params) => {
2060						self.handle_buy_request(request_id, counterparty_node_id, params)
2061					},
2062				};
2063				#[cfg(debug_assertions)]
2064				self.verify_pending_request_counter();
2065				res
2066			},
2067			_ => {
2068				debug_assert!(
2069					false,
2070					"Service handler received LSPS2 response message. This should never happen."
2071				);
2072				Err(LightningError { err: format!("Service handler received LSPS2 response message from node {}. This should never happen.", counterparty_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)})
2073			},
2074		}
2075	}
2076}
2077
2078fn calculate_amount_to_forward_per_htlc(
2079	htlcs: &[InterceptedHTLC], total_fee_msat: u64,
2080) -> Vec<(InterceptId, u64)> {
2081	// TODO: we should eventually make sure the HTLCs are all above ChannelDetails::next_outbound_minimum_msat
2082	let total_expected_outbound_msat: u64 =
2083		htlcs.iter().map(|htlc| htlc.expected_outbound_amount_msat).sum();
2084	if total_fee_msat > total_expected_outbound_msat {
2085		debug_assert!(false, "Fee is larger than the total expected outbound amount.");
2086		return Vec::new();
2087	}
2088
2089	let mut fee_remaining_msat = total_fee_msat;
2090	let mut per_htlc_forwards = vec![];
2091	for (index, htlc) in htlcs.iter().enumerate() {
2092		let proportional_fee_amt_msat = (total_fee_msat as u128
2093			* htlc.expected_outbound_amount_msat as u128
2094			/ total_expected_outbound_msat as u128) as u64;
2095
2096		let mut actual_fee_amt_msat = core::cmp::min(fee_remaining_msat, proportional_fee_amt_msat);
2097		actual_fee_amt_msat =
2098			core::cmp::min(actual_fee_amt_msat, htlc.expected_outbound_amount_msat);
2099		fee_remaining_msat -= actual_fee_amt_msat;
2100
2101		if index == htlcs.len() - 1 {
2102			actual_fee_amt_msat += fee_remaining_msat;
2103		}
2104
2105		let amount_to_forward_msat =
2106			htlc.expected_outbound_amount_msat.saturating_sub(actual_fee_amt_msat);
2107
2108		per_htlc_forwards.push((htlc.intercept_id, amount_to_forward_msat))
2109	}
2110	per_htlc_forwards
2111}
2112
2113/// A synchroneous wrapper around [`LSPS2ServiceHandler`] to be used in contexts where async is not
2114/// available.
2115pub struct LSPS2ServiceHandlerSync<'a, CM: Deref, K: Deref + Clone, T: Deref + Clone>
2116where
2117	CM::Target: AChannelManager,
2118	K::Target: KVStore,
2119	T::Target: BroadcasterInterface,
2120{
2121	inner: &'a LSPS2ServiceHandler<CM, K, T>,
2122}
2123
2124impl<'a, CM: Deref, K: Deref + Clone, T: Deref + Clone> LSPS2ServiceHandlerSync<'a, CM, K, T>
2125where
2126	CM::Target: AChannelManager,
2127	K::Target: KVStore,
2128	T::Target: BroadcasterInterface,
2129{
2130	pub(crate) fn from_inner(inner: &'a LSPS2ServiceHandler<CM, K, T>) -> Self {
2131		Self { inner }
2132	}
2133
2134	/// Returns a reference to the used config.
2135	///
2136	/// Wraps [`LSPS2ServiceHandler::config`].
2137	pub fn config(&self) -> &LSPS2ServiceConfig {
2138		&self.inner.config
2139	}
2140
2141	/// Used by LSP to inform a client requesting a JIT Channel the token they used is invalid.
2142	///
2143	/// Wraps [`LSPS2ServiceHandler::invalid_token_provided`].
2144	pub fn invalid_token_provided(
2145		&self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId,
2146	) -> Result<(), APIError> {
2147		self.inner.invalid_token_provided(counterparty_node_id, request_id)
2148	}
2149
2150	/// Used by LSP to provide fee parameters to a client requesting a JIT Channel.
2151	///
2152	/// Wraps [`LSPS2ServiceHandler::opening_fee_params_generated`].
2153	pub fn opening_fee_params_generated(
2154		&self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId,
2155		opening_fee_params_menu: Vec<LSPS2RawOpeningFeeParams>,
2156	) -> Result<(), APIError> {
2157		self.inner.opening_fee_params_generated(
2158			counterparty_node_id,
2159			request_id,
2160			opening_fee_params_menu,
2161		)
2162	}
2163
2164	/// Used by LSP to provide the client with the intercept scid, a unique `user_channel_id`, and
2165	/// `cltv_expiry_delta` to include in their invoice.
2166	///
2167	/// Wraps [`LSPS2ServiceHandler::invoice_parameters_generated`].
2168	pub fn invoice_parameters_generated(
2169		&self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId, intercept_scid: u64,
2170		cltv_expiry_delta: u32, client_trusts_lsp: bool, user_channel_id: u128,
2171	) -> Result<(), APIError> {
2172		let mut fut = Box::pin(self.inner.invoice_parameters_generated(
2173			counterparty_node_id,
2174			request_id,
2175			intercept_scid,
2176			cltv_expiry_delta,
2177			client_trusts_lsp,
2178			user_channel_id,
2179		));
2180
2181		let mut waker = dummy_waker();
2182		let mut ctx = task::Context::from_waker(&mut waker);
2183		match fut.as_mut().poll(&mut ctx) {
2184			task::Poll::Ready(result) => result,
2185			task::Poll::Pending => {
2186				// In a sync context, we can't wait for the future to complete.
2187				unreachable!("Should not be pending in a sync context");
2188			},
2189		}
2190	}
2191
2192	/// Forward [`Event::HTLCIntercepted`] event parameters into this function.
2193	///
2194	/// Wraps [`LSPS2ServiceHandler::htlc_intercepted`].
2195	///
2196	/// [`Event::HTLCIntercepted`]: lightning::events::Event::HTLCIntercepted
2197	pub fn htlc_intercepted(
2198		&self, intercept_scid: u64, intercept_id: InterceptId, expected_outbound_amount_msat: u64,
2199		payment_hash: PaymentHash,
2200	) -> Result<(), APIError> {
2201		let mut fut = Box::pin(self.inner.htlc_intercepted(
2202			intercept_scid,
2203			intercept_id,
2204			expected_outbound_amount_msat,
2205			payment_hash,
2206		));
2207
2208		let mut waker = dummy_waker();
2209		let mut ctx = task::Context::from_waker(&mut waker);
2210		match fut.as_mut().poll(&mut ctx) {
2211			task::Poll::Ready(result) => result,
2212			task::Poll::Pending => {
2213				// In a sync context, we can't wait for the future to complete.
2214				unreachable!("Should not be pending in a sync context");
2215			},
2216		}
2217	}
2218
2219	/// Forward [`Event::HTLCHandlingFailed`] event parameter into this function.
2220	///
2221	/// Wraps [`LSPS2ServiceHandler::htlc_handling_failed`].
2222	///
2223	/// [`Event::HTLCHandlingFailed`]: lightning::events::Event::HTLCHandlingFailed
2224	pub fn htlc_handling_failed(
2225		&self, failure_type: HTLCHandlingFailureType,
2226	) -> Result<(), APIError> {
2227		let mut fut = Box::pin(self.inner.htlc_handling_failed(failure_type));
2228
2229		let mut waker = dummy_waker();
2230		let mut ctx = task::Context::from_waker(&mut waker);
2231		match fut.as_mut().poll(&mut ctx) {
2232			task::Poll::Ready(result) => result,
2233			task::Poll::Pending => {
2234				// In a sync context, we can't wait for the future to complete.
2235				unreachable!("Should not be pending in a sync context");
2236			},
2237		}
2238	}
2239
2240	/// Forward [`Event::PaymentForwarded`] event parameter into this function.
2241	///
2242	/// Wraps [`LSPS2ServiceHandler::payment_forwarded`].
2243	///
2244	/// [`Event::PaymentForwarded`]: lightning::events::Event::PaymentForwarded
2245	pub fn payment_forwarded(
2246		&self, next_channel_id: ChannelId, skimmed_fee_msat: u64,
2247	) -> Result<(), APIError> {
2248		let mut fut = Box::pin(self.inner.payment_forwarded(next_channel_id, skimmed_fee_msat));
2249
2250		let mut waker = dummy_waker();
2251		let mut ctx = task::Context::from_waker(&mut waker);
2252		match fut.as_mut().poll(&mut ctx) {
2253			task::Poll::Ready(result) => result,
2254			task::Poll::Pending => {
2255				// In a sync context, we can't wait for the future to complete.
2256				unreachable!("Should not be pending in a sync context");
2257			},
2258		}
2259	}
2260
2261	/// Wraps [`LSPS2ServiceHandler::channel_needs_manual_broadcast`].
2262	pub fn channel_needs_manual_broadcast(
2263		&self, user_channel_id: u128, counterparty_node_id: &PublicKey,
2264	) -> Result<bool, APIError> {
2265		self.inner.channel_needs_manual_broadcast(user_channel_id, counterparty_node_id)
2266	}
2267
2268	/// Wraps [`LSPS2ServiceHandler::store_funding_transaction`].
2269	pub fn store_funding_transaction(
2270		&self, user_channel_id: u128, counterparty_node_id: &PublicKey, funding_tx: Transaction,
2271	) -> Result<(), APIError> {
2272		self.inner.store_funding_transaction(user_channel_id, counterparty_node_id, funding_tx)
2273	}
2274
2275	/// Wraps [`LSPS2ServiceHandler::set_funding_tx_broadcast_safe`].
2276	pub fn set_funding_tx_broadcast_safe(
2277		&self, user_channel_id: u128, counterparty_node_id: &PublicKey,
2278	) -> Result<(), APIError> {
2279		self.inner.set_funding_tx_broadcast_safe(user_channel_id, counterparty_node_id)
2280	}
2281
2282	/// Abandons a pending JIT‐open flow for `user_channel_id`, removing all local state.
2283	///
2284	/// Wraps [`LSPS2ServiceHandler::channel_open_abandoned`].
2285	pub fn channel_open_abandoned(
2286		&self, counterparty_node_id: &PublicKey, user_channel_id: u128,
2287	) -> Result<(), APIError> {
2288		let mut fut =
2289			Box::pin(self.inner.channel_open_abandoned(counterparty_node_id, user_channel_id));
2290
2291		let mut waker = dummy_waker();
2292		let mut ctx = task::Context::from_waker(&mut waker);
2293		match fut.as_mut().poll(&mut ctx) {
2294			task::Poll::Ready(result) => result,
2295			task::Poll::Pending => {
2296				// In a sync context, we can't wait for the future to complete.
2297				unreachable!("Should not be pending in a sync context");
2298			},
2299		}
2300	}
2301
2302	/// Used to fail intercepted HTLCs backwards when a channel open attempt ultimately fails.
2303	///
2304	/// Wraps [`LSPS2ServiceHandler::channel_open_failed`].
2305	pub fn channel_open_failed(
2306		&self, counterparty_node_id: &PublicKey, user_channel_id: u128,
2307	) -> Result<(), APIError> {
2308		let mut fut =
2309			Box::pin(self.inner.channel_open_failed(counterparty_node_id, user_channel_id));
2310
2311		let mut waker = dummy_waker();
2312		let mut ctx = task::Context::from_waker(&mut waker);
2313		match fut.as_mut().poll(&mut ctx) {
2314			task::Poll::Ready(result) => result,
2315			task::Poll::Pending => {
2316				// In a sync context, we can't wait for the future to complete.
2317				unreachable!("Should not be pending in a sync context");
2318			},
2319		}
2320	}
2321
2322	/// Forward [`Event::ChannelReady`] event parameters into this function.
2323	///
2324	/// Wraps [`LSPS2ServiceHandler::channel_ready`].
2325	///
2326	/// [`Event::ChannelReady`]: lightning::events::Event::ChannelReady
2327	pub fn channel_ready(
2328		&self, user_channel_id: u128, channel_id: &ChannelId, counterparty_node_id: &PublicKey,
2329	) -> Result<(), APIError> {
2330		let mut fut =
2331			Box::pin(self.inner.channel_ready(user_channel_id, channel_id, counterparty_node_id));
2332
2333		let mut waker = dummy_waker();
2334		let mut ctx = task::Context::from_waker(&mut waker);
2335		match fut.as_mut().poll(&mut ctx) {
2336			task::Poll::Ready(result) => result,
2337			task::Poll::Pending => {
2338				// In a sync context, we can't wait for the future to complete.
2339				unreachable!("Should not be pending in a sync context");
2340			},
2341		}
2342	}
2343}
2344
2345#[cfg(test)]
2346mod tests {
2347	use super::*;
2348
2349	use crate::lsps0::ser::LSPSDateTime;
2350
2351	use proptest::prelude::*;
2352
2353	use bitcoin::{absolute::LockTime, transaction::Version};
2354	use core::str::FromStr;
2355
2356	const MAX_VALUE_MSAT: u64 = 21_000_000_0000_0000_000;
2357
2358	fn arb_forward_amounts() -> impl Strategy<Value = (u64, u64, u64, u64)> {
2359		(1u64..MAX_VALUE_MSAT, 1u64..MAX_VALUE_MSAT, 1u64..MAX_VALUE_MSAT, 1u64..MAX_VALUE_MSAT)
2360			.prop_map(|(a, b, c, d)| {
2361				(a, b, c, core::cmp::min(d, a.saturating_add(b).saturating_add(c)))
2362			})
2363	}
2364
2365	proptest! {
2366		#[test]
2367		fn proptest_calculate_amount_to_forward((o_0, o_1, o_2, total_fee_msat) in arb_forward_amounts()) {
2368			let htlcs = vec![
2369				InterceptedHTLC {
2370					intercept_id: InterceptId([0; 32]),
2371					expected_outbound_amount_msat: o_0,
2372					payment_hash: PaymentHash([0; 32]),
2373				},
2374				InterceptedHTLC {
2375					intercept_id: InterceptId([1; 32]),
2376					expected_outbound_amount_msat: o_1,
2377					payment_hash: PaymentHash([0; 32]),
2378				},
2379				InterceptedHTLC {
2380					intercept_id: InterceptId([2; 32]),
2381					expected_outbound_amount_msat: o_2,
2382					payment_hash: PaymentHash([0; 32]),
2383				},
2384			];
2385
2386			let result = calculate_amount_to_forward_per_htlc(&htlcs, total_fee_msat);
2387			let total_received_msat = o_0 + o_1 + o_2;
2388
2389			if total_received_msat < total_fee_msat {
2390				assert_eq!(result.len(), 0);
2391			} else {
2392				assert_ne!(result.len(), 0);
2393				assert_eq!(result[0].0, htlcs[0].intercept_id);
2394				assert_eq!(result[1].0, htlcs[1].intercept_id);
2395				assert_eq!(result[2].0, htlcs[2].intercept_id);
2396				assert!(result[0].1 <= o_0);
2397				assert!(result[1].1 <= o_1);
2398				assert!(result[2].1 <= o_2);
2399
2400				let result_sum = result.iter().map(|(_, f)| f).sum::<u64>();
2401				assert_eq!(total_received_msat - result_sum, total_fee_msat);
2402				let five_pct = result_sum as f32 * 0.05;
2403				let fair_share_0 = (o_0 as f32 / total_received_msat as f32) * result_sum as f32;
2404				assert!(result[0].1 as f32 <= fair_share_0 + five_pct);
2405				let fair_share_1 = (o_1 as f32 / total_received_msat as f32) * result_sum as f32;
2406				assert!(result[1].1 as f32 <= fair_share_1 + five_pct);
2407				let fair_share_2 = (o_2 as f32 / total_received_msat as f32) * result_sum as f32;
2408				assert!(result[2].1 as f32 <= fair_share_2 + five_pct);
2409			}
2410		}
2411	}
2412
2413	#[test]
2414	fn test_calculate_amount_to_forward() {
2415		let htlcs = vec![
2416			InterceptedHTLC {
2417				intercept_id: InterceptId([0; 32]),
2418				expected_outbound_amount_msat: 2,
2419				payment_hash: PaymentHash([0; 32]),
2420			},
2421			InterceptedHTLC {
2422				intercept_id: InterceptId([1; 32]),
2423				expected_outbound_amount_msat: 6,
2424				payment_hash: PaymentHash([0; 32]),
2425			},
2426			InterceptedHTLC {
2427				intercept_id: InterceptId([2; 32]),
2428				expected_outbound_amount_msat: 2,
2429				payment_hash: PaymentHash([0; 32]),
2430			},
2431		];
2432		let result = calculate_amount_to_forward_per_htlc(&htlcs, 5);
2433		assert_eq!(
2434			result,
2435			vec![
2436				(htlcs[0].intercept_id, 1),
2437				(htlcs[1].intercept_id, 3),
2438				(htlcs[2].intercept_id, 1),
2439			]
2440		);
2441	}
2442
2443	#[test]
2444	fn test_jit_channel_state_mpp() {
2445		let payment_size_msat = Some(500_000_000);
2446		let opening_fee_params = LSPS2OpeningFeeParams {
2447			min_fee_msat: 10_000_000,
2448			proportional: 10_000,
2449			valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(),
2450			min_lifetime: 4032,
2451			max_client_to_self_delay: 2016,
2452			min_payment_size_msat: 10_000_000,
2453			max_payment_size_msat: 1_000_000_000,
2454			promise: "ignore".to_string(),
2455		};
2456		let mut state = OutboundJITChannelState::new();
2457		// Intercepts the first HTLC of a multipart payment A.
2458		{
2459			let action = state
2460				.htlc_intercepted(
2461					&opening_fee_params,
2462					&payment_size_msat,
2463					InterceptedHTLC {
2464						intercept_id: InterceptId([0; 32]),
2465						expected_outbound_amount_msat: 200_000_000,
2466						payment_hash: PaymentHash([100; 32]),
2467					},
2468				)
2469				.unwrap();
2470			assert!(matches!(state, OutboundJITChannelState::PendingInitialPayment { .. }));
2471			assert!(action.is_none());
2472		}
2473		// Intercepts the first HTLC of a different multipart payment B.
2474		{
2475			let action = state
2476				.htlc_intercepted(
2477					&opening_fee_params,
2478					&payment_size_msat,
2479					InterceptedHTLC {
2480						intercept_id: InterceptId([1; 32]),
2481						expected_outbound_amount_msat: 1_000_000,
2482						payment_hash: PaymentHash([101; 32]),
2483					},
2484				)
2485				.unwrap();
2486			assert!(matches!(state, OutboundJITChannelState::PendingInitialPayment { .. }));
2487			assert!(action.is_none());
2488		}
2489		// Intercepts the second HTLC of multipart payment A, completing the expected payment and
2490		// opening the channel.
2491		{
2492			let action = state
2493				.htlc_intercepted(
2494					&opening_fee_params,
2495					&payment_size_msat,
2496					InterceptedHTLC {
2497						intercept_id: InterceptId([2; 32]),
2498						expected_outbound_amount_msat: 300_000_000,
2499						payment_hash: PaymentHash([100; 32]),
2500					},
2501				)
2502				.unwrap();
2503			assert!(matches!(state, OutboundJITChannelState::PendingChannelOpen { .. }));
2504			assert!(matches!(action, Some(HTLCInterceptedAction::OpenChannel(_))));
2505		}
2506		// Channel opens, becomes ready, and multipart payment A gets forwarded.
2507		{
2508			let ForwardPaymentAction(channel_id, payment) =
2509				state.channel_ready(ChannelId([200; 32])).unwrap();
2510			assert_eq!(channel_id, ChannelId([200; 32]));
2511			assert_eq!(payment.opening_fee_msat, 10_000_000);
2512			assert_eq!(
2513				payment.htlcs,
2514				vec![
2515					InterceptedHTLC {
2516						intercept_id: InterceptId([0; 32]),
2517						expected_outbound_amount_msat: 200_000_000,
2518						payment_hash: PaymentHash([100; 32]),
2519					},
2520					InterceptedHTLC {
2521						intercept_id: InterceptId([2; 32]),
2522						expected_outbound_amount_msat: 300_000_000,
2523						payment_hash: PaymentHash([100; 32]),
2524					},
2525				]
2526			);
2527		}
2528		// Intercepts the first HTLC of a different payment C.
2529		{
2530			let action = state
2531				.htlc_intercepted(
2532					&opening_fee_params,
2533					&payment_size_msat,
2534					InterceptedHTLC {
2535						intercept_id: InterceptId([3; 32]),
2536						expected_outbound_amount_msat: 2_000_000,
2537						payment_hash: PaymentHash([102; 32]),
2538					},
2539				)
2540				.unwrap();
2541			assert!(matches!(state, OutboundJITChannelState::PendingPaymentForward { .. }));
2542			assert!(action.is_none());
2543		}
2544		// Payment A fails.
2545		{
2546			let action = state.htlc_handling_failed().unwrap();
2547			assert!(matches!(state, OutboundJITChannelState::PendingPayment { .. }));
2548			// No payments have received sufficient HTLCs yet.
2549			assert!(action.is_none());
2550		}
2551		// Additional HTLC of payment B arrives, completing the expectd payment.
2552		{
2553			let action = state
2554				.htlc_intercepted(
2555					&opening_fee_params,
2556					&payment_size_msat,
2557					InterceptedHTLC {
2558						intercept_id: InterceptId([4; 32]),
2559						expected_outbound_amount_msat: 500_000_000,
2560						payment_hash: PaymentHash([101; 32]),
2561					},
2562				)
2563				.unwrap();
2564			assert!(matches!(state, OutboundJITChannelState::PendingPaymentForward { .. }));
2565			match action {
2566				Some(HTLCInterceptedAction::ForwardPayment(channel_id, payment)) => {
2567					assert_eq!(channel_id, ChannelId([200; 32]));
2568					assert_eq!(payment.opening_fee_msat, 10_000_000);
2569					assert_eq!(
2570						payment.htlcs,
2571						vec![
2572							InterceptedHTLC {
2573								intercept_id: InterceptId([1; 32]),
2574								expected_outbound_amount_msat: 1_000_000,
2575								payment_hash: PaymentHash([101; 32]),
2576							},
2577							InterceptedHTLC {
2578								intercept_id: InterceptId([4; 32]),
2579								expected_outbound_amount_msat: 500_000_000,
2580								payment_hash: PaymentHash([101; 32]),
2581							},
2582						]
2583					);
2584				},
2585				_ => panic!("Unexpected action when intercepted HTLC."),
2586			}
2587		}
2588		// Payment completes, queued payments get forwarded.
2589		{
2590			let action = state.payment_forwarded(100000000000).unwrap();
2591			assert!(matches!(state, OutboundJITChannelState::PaymentForwarded { .. }));
2592			match action {
2593				Some(ForwardHTLCsAction(channel_id, htlcs)) => {
2594					assert_eq!(channel_id, ChannelId([200; 32]));
2595					assert_eq!(
2596						htlcs,
2597						vec![InterceptedHTLC {
2598							intercept_id: InterceptId([3; 32]),
2599							expected_outbound_amount_msat: 2_000_000,
2600							payment_hash: PaymentHash([102; 32]),
2601						}]
2602					);
2603				},
2604				_ => panic!("Unexpected action when forwarded payment."),
2605			}
2606		}
2607		// Any new HTLC gets automatically forwarded.
2608		{
2609			let action = state
2610				.htlc_intercepted(
2611					&opening_fee_params,
2612					&payment_size_msat,
2613					InterceptedHTLC {
2614						intercept_id: InterceptId([5; 32]),
2615						expected_outbound_amount_msat: 200_000_000,
2616						payment_hash: PaymentHash([103; 32]),
2617					},
2618				)
2619				.unwrap();
2620			assert!(matches!(state, OutboundJITChannelState::PaymentForwarded { .. }));
2621			assert!(
2622				matches!(action, Some(HTLCInterceptedAction::ForwardHTLC(channel_id)) if channel_id == ChannelId([200; 32]))
2623			);
2624		}
2625	}
2626
2627	#[test]
2628	fn test_jit_channel_state_no_mpp() {
2629		let payment_size_msat = None;
2630		let opening_fee_params = LSPS2OpeningFeeParams {
2631			min_fee_msat: 10_000_000,
2632			proportional: 10_000,
2633			valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(),
2634			min_lifetime: 4032,
2635			max_client_to_self_delay: 2016,
2636			min_payment_size_msat: 10_000_000,
2637			max_payment_size_msat: 1_000_000_000,
2638			promise: "ignore".to_string(),
2639		};
2640		let mut state = OutboundJITChannelState::new();
2641		// Intercepts payment A, opening the channel.
2642		{
2643			let action = state
2644				.htlc_intercepted(
2645					&opening_fee_params,
2646					&payment_size_msat,
2647					InterceptedHTLC {
2648						intercept_id: InterceptId([0; 32]),
2649						expected_outbound_amount_msat: 500_000_000,
2650						payment_hash: PaymentHash([100; 32]),
2651					},
2652				)
2653				.unwrap();
2654			assert!(matches!(state, OutboundJITChannelState::PendingChannelOpen { .. }));
2655			assert!(matches!(action, Some(HTLCInterceptedAction::OpenChannel(_))));
2656		}
2657		// Intercepts payment B.
2658		{
2659			let action = state
2660				.htlc_intercepted(
2661					&opening_fee_params,
2662					&payment_size_msat,
2663					InterceptedHTLC {
2664						intercept_id: InterceptId([1; 32]),
2665						expected_outbound_amount_msat: 600_000_000,
2666						payment_hash: PaymentHash([101; 32]),
2667					},
2668				)
2669				.unwrap();
2670			assert!(matches!(state, OutboundJITChannelState::PendingChannelOpen { .. }));
2671			assert!(action.is_none());
2672		}
2673		// Channel opens, becomes ready, and payment A gets forwarded.
2674		{
2675			let ForwardPaymentAction(channel_id, payment) =
2676				state.channel_ready(ChannelId([200; 32])).unwrap();
2677			assert_eq!(channel_id, ChannelId([200; 32]));
2678			assert_eq!(payment.opening_fee_msat, 10_000_000);
2679			assert_eq!(
2680				payment.htlcs,
2681				vec![InterceptedHTLC {
2682					intercept_id: InterceptId([0; 32]),
2683					expected_outbound_amount_msat: 500_000_000,
2684					payment_hash: PaymentHash([100; 32]),
2685				},]
2686			);
2687		}
2688		// Intercepts payment C.
2689		{
2690			let action = state
2691				.htlc_intercepted(
2692					&opening_fee_params,
2693					&payment_size_msat,
2694					InterceptedHTLC {
2695						intercept_id: InterceptId([2; 32]),
2696						expected_outbound_amount_msat: 500_000_000,
2697						payment_hash: PaymentHash([102; 32]),
2698					},
2699				)
2700				.unwrap();
2701			assert!(matches!(state, OutboundJITChannelState::PendingPaymentForward { .. }));
2702			assert!(action.is_none());
2703		}
2704		// Payment A fails, and payment B is forwarded.
2705		{
2706			let action = state.htlc_handling_failed().unwrap();
2707			assert!(matches!(state, OutboundJITChannelState::PendingPaymentForward { .. }));
2708			match action {
2709				Some(ForwardPaymentAction(channel_id, payment)) => {
2710					assert_eq!(channel_id, ChannelId([200; 32]));
2711					assert_eq!(
2712						payment.htlcs,
2713						vec![InterceptedHTLC {
2714							intercept_id: InterceptId([1; 32]),
2715							expected_outbound_amount_msat: 600_000_000,
2716							payment_hash: PaymentHash([101; 32]),
2717						},]
2718					);
2719				},
2720				_ => panic!("Unexpected action when HTLC handling failed."),
2721			}
2722		}
2723		// Payment completes, queued payments get forwarded.
2724		{
2725			let action = state.payment_forwarded(10000000000).unwrap();
2726			assert!(matches!(state, OutboundJITChannelState::PaymentForwarded { .. }));
2727			match action {
2728				Some(ForwardHTLCsAction(channel_id, htlcs)) => {
2729					assert_eq!(channel_id, ChannelId([200; 32]));
2730					assert_eq!(
2731						htlcs,
2732						vec![InterceptedHTLC {
2733							intercept_id: InterceptId([2; 32]),
2734							expected_outbound_amount_msat: 500_000_000,
2735							payment_hash: PaymentHash([102; 32]),
2736						}]
2737					);
2738				},
2739				_ => panic!("Unexpected action when forwarded payment."),
2740			}
2741		}
2742		// Any new HTLC gets automatically forwarded.
2743		{
2744			let action = state
2745				.htlc_intercepted(
2746					&opening_fee_params,
2747					&payment_size_msat,
2748					InterceptedHTLC {
2749						intercept_id: InterceptId([3; 32]),
2750						expected_outbound_amount_msat: 200_000_000,
2751						payment_hash: PaymentHash([103; 32]),
2752					},
2753				)
2754				.unwrap();
2755			assert!(matches!(state, OutboundJITChannelState::PaymentForwarded { .. }));
2756			assert!(
2757				matches!(action, Some(HTLCInterceptedAction::ForwardHTLC(channel_id)) if channel_id == ChannelId([200; 32]))
2758			);
2759		}
2760	}
2761
2762	#[test]
2763	fn broadcast_not_allowed_after_non_paying_fee_payment_claimed() {
2764		let min_fee_msat: u64 = 12345;
2765		let opening_fee_params = LSPS2OpeningFeeParams {
2766			min_fee_msat,
2767			proportional: 0,
2768			valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(),
2769			min_lifetime: 144,
2770			max_client_to_self_delay: 128,
2771			min_payment_size_msat: 1,
2772			max_payment_size_msat: 10_000_000_000,
2773			promise: "ignore".to_string(),
2774		};
2775
2776		let payment_size_msat = Some(1_000_000);
2777		let user_channel_id = 4242u128;
2778		let mut jit_channel = OutboundJITChannel::new(
2779			payment_size_msat,
2780			opening_fee_params.clone(),
2781			user_channel_id,
2782			true,
2783		);
2784
2785		let opening_payment_hash = PaymentHash([42; 32]);
2786		let htlcs_for_opening = [
2787			InterceptedHTLC {
2788				intercept_id: InterceptId([0; 32]),
2789				expected_outbound_amount_msat: 400_000,
2790				payment_hash: opening_payment_hash,
2791			},
2792			InterceptedHTLC {
2793				intercept_id: InterceptId([1; 32]),
2794				expected_outbound_amount_msat: 600_000,
2795				payment_hash: opening_payment_hash,
2796			},
2797		];
2798
2799		assert!(jit_channel.htlc_intercepted(htlcs_for_opening[0].clone()).unwrap().is_none());
2800		let action = jit_channel.htlc_intercepted(htlcs_for_opening[1].clone()).unwrap();
2801		match action {
2802			Some(HTLCInterceptedAction::OpenChannel(_)) => {},
2803			other => panic!("Expected OpenChannel action, got {:?}", other),
2804		}
2805
2806		let channel_id = ChannelId([7; 32]);
2807		let ForwardPaymentAction(_, fee_payment) = jit_channel.channel_ready(channel_id).unwrap();
2808		assert_eq!(fee_payment.opening_fee_msat, min_fee_msat);
2809
2810		let followup = jit_channel.htlc_handling_failed().unwrap();
2811		assert!(followup.is_none());
2812
2813		let dummy_tx = Transaction {
2814			version: Version(2),
2815			lock_time: LockTime::ZERO,
2816			input: vec![],
2817			output: vec![],
2818		};
2819		jit_channel.set_funding_tx(dummy_tx);
2820		jit_channel.set_funding_tx_broadcast_safe(true);
2821		assert!(
2822			!jit_channel.should_broadcast_funding_transaction(),
2823			"Should not broadcast before any successful payment is claimed"
2824		);
2825
2826		let second_payment_hash = PaymentHash([99; 32]);
2827		let second_htlc = InterceptedHTLC {
2828			intercept_id: InterceptId([2; 32]),
2829			expected_outbound_amount_msat: min_fee_msat,
2830			payment_hash: second_payment_hash,
2831		};
2832		let action2 = jit_channel.htlc_intercepted(second_htlc).unwrap();
2833		let (forwarded_channel_id, fee_payment2) = match action2 {
2834			Some(HTLCInterceptedAction::ForwardPayment(cid, fp)) => (cid, fp),
2835			other => panic!("Expected ForwardPayment for second HTLC, got {:?}", other),
2836		};
2837		assert_eq!(forwarded_channel_id, channel_id);
2838		assert_eq!(fee_payment2.opening_fee_msat, min_fee_msat);
2839
2840		assert!(
2841			!jit_channel.should_broadcast_funding_transaction(),
2842			"Should not broadcast before any successful payment is claimed"
2843		);
2844
2845		// Forward a payment that is not enough to cover the fees
2846		let _ = jit_channel.payment_forwarded(min_fee_msat - 1).unwrap();
2847
2848		assert!(
2849			!jit_channel.should_broadcast_funding_transaction(),
2850			"Should not broadcast before all the fees are collected"
2851		);
2852
2853		let _ = jit_channel.payment_forwarded(min_fee_msat).unwrap();
2854
2855		let broadcast_allowed = jit_channel.should_broadcast_funding_transaction();
2856
2857		assert!(
2858			broadcast_allowed,
2859			"Broadcast was not allowed even though all the skimmed fees were collected"
2860		);
2861	}
2862}