1use alloc::boxed::Box;
13use alloc::string::{String, ToString};
14use alloc::vec::Vec;
15use lightning::util::persist::KVStore;
16
17use core::cmp::Ordering as CmpOrdering;
18use core::future::Future as StdFuture;
19use core::ops::Deref;
20use core::sync::atomic::{AtomicUsize, Ordering};
21use core::task;
22
23use crate::events::EventQueue;
24use crate::lsps0::ser::{
25 LSPSMessage, LSPSProtocolMessageHandler, LSPSRequestId, LSPSResponseError,
26 JSONRPC_INTERNAL_ERROR_ERROR_CODE, JSONRPC_INTERNAL_ERROR_ERROR_MESSAGE,
27 LSPS0_CLIENT_REJECTED_ERROR_CODE,
28};
29use crate::lsps2::event::LSPS2ServiceEvent;
30use crate::lsps2::payment_queue::{InterceptedHTLC, PaymentQueue};
31use crate::lsps2::utils::{
32 compute_opening_fee, is_expired_opening_fee_params, is_valid_opening_fee_params,
33};
34use crate::message_queue::{MessageQueue, MessageQueueNotifierGuard};
35use crate::persist::{
36 LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
37};
38use crate::prelude::hash_map::Entry;
39use crate::prelude::{new_hash_map, HashMap};
40use crate::sync::{Arc, Mutex, MutexGuard, RwLock};
41use crate::utils::async_poll::dummy_waker;
42
43use lightning::chain::chaininterface::BroadcasterInterface;
44use lightning::events::HTLCHandlingFailureType;
45use lightning::ln::channelmanager::{AChannelManager, FailureCode, InterceptId};
46use lightning::ln::msgs::{ErrorAction, LightningError};
47use lightning::ln::types::ChannelId;
48use lightning::util::errors::APIError;
49use lightning::util::logger::Level;
50use lightning::util::ser::Writeable;
51use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum};
52
53use lightning_types::payment::PaymentHash;
54
55use bitcoin::secp256k1::PublicKey;
56use bitcoin::Transaction;
57
58use crate::lsps2::msgs::{
59 LSPS2BuyRequest, LSPS2BuyResponse, LSPS2GetInfoRequest, LSPS2GetInfoResponse, LSPS2Message,
60 LSPS2OpeningFeeParams, LSPS2RawOpeningFeeParams, LSPS2Request, LSPS2Response,
61 LSPS2_BUY_REQUEST_INVALID_OPENING_FEE_PARAMS_ERROR_CODE,
62 LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_LARGE_ERROR_CODE,
63 LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_SMALL_ERROR_CODE,
64 LSPS2_GET_INFO_REQUEST_UNRECOGNIZED_OR_STALE_TOKEN_ERROR_CODE,
65};
66
67const MAX_PENDING_REQUESTS_PER_PEER: usize = 10;
68const MAX_TOTAL_PENDING_REQUESTS: usize = 1000;
69const MAX_TOTAL_PEERS: usize = 100000;
70
71#[derive(Clone, Debug)]
73pub struct LSPS2ServiceConfig {
74 pub promise_secret: [u8; 32],
78}
79
80#[derive(Clone, Debug, PartialEq)]
83struct OpenChannelParams {
84 opening_fee_msat: u64,
85 amt_to_forward_msat: u64,
86}
87
88#[derive(Clone, Debug, PartialEq)]
90struct FeePayment {
91 htlcs: Vec<InterceptedHTLC>,
92 opening_fee_msat: u64,
93}
94
95#[derive(Debug)]
96struct ChannelStateError(String);
97
98impl From<ChannelStateError> for LightningError {
99 fn from(value: ChannelStateError) -> Self {
100 LightningError { err: value.0, action: ErrorAction::IgnoreAndLog(Level::Info) }
101 }
102}
103
104#[derive(Debug, PartialEq)]
106enum HTLCInterceptedAction {
107 OpenChannel(OpenChannelParams),
109 ForwardHTLC(ChannelId),
111 ForwardPayment(ChannelId, FeePayment),
112}
113
114#[derive(Debug, PartialEq)]
116struct ForwardPaymentAction(ChannelId, FeePayment);
117
118#[derive(Debug, PartialEq)]
120struct ForwardHTLCsAction(ChannelId, Vec<InterceptedHTLC>);
121
122#[derive(Debug, Clone)]
123enum TrustModel {
124 ClientTrustsLsp { funding_tx_broadcast_safe: bool, funding_tx: Option<Transaction> },
125 LspTrustsClient,
126}
127
128impl TrustModel {
129 fn should_manually_broadcast(&self, state_is_payment_forward: bool) -> bool {
130 match self {
131 TrustModel::ClientTrustsLsp { funding_tx_broadcast_safe, funding_tx } => {
132 *funding_tx_broadcast_safe && state_is_payment_forward && funding_tx.is_some()
133 },
134 TrustModel::LspTrustsClient => false,
136 }
137 }
138
139 fn new(client_trusts_lsp: bool) -> Self {
140 if client_trusts_lsp {
141 TrustModel::ClientTrustsLsp { funding_tx_broadcast_safe: false, funding_tx: None }
142 } else {
143 TrustModel::LspTrustsClient
144 }
145 }
146
147 fn set_funding_tx(&mut self, funding_tx: Transaction) {
148 match self {
149 TrustModel::ClientTrustsLsp { funding_tx: tx, .. } => {
150 *tx = Some(funding_tx);
151 },
152 TrustModel::LspTrustsClient => {
153 },
155 }
156 }
157
158 fn set_funding_tx_broadcast_safe(&mut self, funding_tx_broadcast_safe: bool) {
159 match self {
160 TrustModel::ClientTrustsLsp { funding_tx_broadcast_safe: safe, .. } => {
161 *safe = funding_tx_broadcast_safe;
162 },
163 TrustModel::LspTrustsClient => {
164 },
166 }
167 }
168
169 fn get_funding_tx(&self) -> Option<&Transaction> {
170 match self {
171 TrustModel::ClientTrustsLsp { funding_tx, .. } => funding_tx.as_ref(),
172 _ => None,
173 }
174 }
175
176 fn is_client_trusts_lsp(&self) -> bool {
177 match self {
178 TrustModel::ClientTrustsLsp { .. } => true,
179 TrustModel::LspTrustsClient => false,
180 }
181 }
182}
183
184impl_writeable_tlv_based_enum!(TrustModel,
185 (0, ClientTrustsLsp) => {
186 (0, funding_tx_broadcast_safe, required),
187 (2, funding_tx, option),
188 },
189 (2, LspTrustsClient) => {
190 },
191);
192
193#[derive(Debug)]
195enum OutboundJITChannelState {
196 PendingInitialPayment { payment_queue: PaymentQueue },
199 PendingChannelOpen { payment_queue: PaymentQueue, opening_fee_msat: u64 },
202 PendingPaymentForward {
206 payment_queue: PaymentQueue,
207 opening_fee_msat: u64,
208 channel_id: ChannelId,
209 },
210 PendingPayment { payment_queue: PaymentQueue, opening_fee_msat: u64, channel_id: ChannelId },
215 PaymentForwarded { channel_id: ChannelId },
218}
219
220impl OutboundJITChannelState {
221 fn new() -> Self {
222 OutboundJITChannelState::PendingInitialPayment { payment_queue: PaymentQueue::new() }
223 }
224
225 fn htlc_intercepted(
226 &mut self, opening_fee_params: &LSPS2OpeningFeeParams, payment_size_msat: &Option<u64>,
227 htlc: InterceptedHTLC,
228 ) -> Result<Option<HTLCInterceptedAction>, ChannelStateError> {
229 match self {
230 OutboundJITChannelState::PendingInitialPayment { payment_queue } => {
231 let (total_expected_outbound_amount_msat, num_htlcs) = payment_queue.add_htlc(htlc);
232
233 let (expected_payment_size_msat, mpp_mode) =
234 if let Some(payment_size_msat) = payment_size_msat {
235 (*payment_size_msat, true)
236 } else {
237 debug_assert_eq!(num_htlcs, 1);
238 if num_htlcs != 1 {
239 return Err(ChannelStateError(
240 "Paying via multiple HTLCs is disallowed in \"no-MPP+var-invoice\" mode.".to_string()
241 ));
242 }
243 (total_expected_outbound_amount_msat, false)
244 };
245
246 if expected_payment_size_msat < opening_fee_params.min_payment_size_msat
247 || expected_payment_size_msat > opening_fee_params.max_payment_size_msat
248 {
249 return Err(ChannelStateError(
250 format!("Payment size violates our limits: expected_payment_size_msat = {}, min_payment_size_msat = {}, max_payment_size_msat = {}",
251 expected_payment_size_msat,
252 opening_fee_params.min_payment_size_msat,
253 opening_fee_params.max_payment_size_msat
254 )));
255 }
256
257 let opening_fee_msat = compute_opening_fee(
258 expected_payment_size_msat,
259 opening_fee_params.min_fee_msat,
260 opening_fee_params.proportional.into(),
261 ).ok_or(ChannelStateError(
262 format!("Could not compute valid opening fee with min_fee_msat = {}, proportional = {}, and expected_payment_size_msat = {}",
263 opening_fee_params.min_fee_msat,
264 opening_fee_params.proportional,
265 expected_payment_size_msat
266 ))
267 )?;
268
269 let amt_to_forward_msat =
270 expected_payment_size_msat.saturating_sub(opening_fee_msat);
271
272 if total_expected_outbound_amount_msat >= expected_payment_size_msat
274 && amt_to_forward_msat > 0
275 {
276 *self = OutboundJITChannelState::PendingChannelOpen {
277 payment_queue: core::mem::take(payment_queue),
278 opening_fee_msat,
279 };
280 let open_channel = HTLCInterceptedAction::OpenChannel(OpenChannelParams {
281 opening_fee_msat,
282 amt_to_forward_msat,
283 });
284 Ok(Some(open_channel))
285 } else {
286 if mpp_mode {
287 *self = OutboundJITChannelState::PendingInitialPayment {
288 payment_queue: core::mem::take(payment_queue),
289 };
290 Ok(None)
291 } else {
292 Err(ChannelStateError(
293 "Intercepted HTLC is too small to pay opening fee".to_string(),
294 ))
295 }
296 }
297 },
298 OutboundJITChannelState::PendingChannelOpen { payment_queue, opening_fee_msat } => {
299 let mut payment_queue = core::mem::take(payment_queue);
300 payment_queue.add_htlc(htlc);
301 *self = OutboundJITChannelState::PendingChannelOpen {
302 payment_queue,
303 opening_fee_msat: *opening_fee_msat,
304 };
305 Ok(None)
306 },
307 OutboundJITChannelState::PendingPaymentForward {
308 payment_queue,
309 opening_fee_msat,
310 channel_id,
311 } => {
312 let mut payment_queue = core::mem::take(payment_queue);
313 payment_queue.add_htlc(htlc);
314 *self = OutboundJITChannelState::PendingPaymentForward {
315 payment_queue,
316 opening_fee_msat: *opening_fee_msat,
317 channel_id: *channel_id,
318 };
319 Ok(None)
320 },
321 OutboundJITChannelState::PendingPayment {
322 payment_queue,
323 opening_fee_msat,
324 channel_id,
325 } => {
326 let mut payment_queue = core::mem::take(payment_queue);
327 payment_queue.add_htlc(htlc);
328 if let Some(entry) = payment_queue.pop_greater_than_msat(*opening_fee_msat) {
329 let forward_payment = HTLCInterceptedAction::ForwardPayment(
330 *channel_id,
331 FeePayment { htlcs: entry.htlcs, opening_fee_msat: *opening_fee_msat },
332 );
333 *self = OutboundJITChannelState::PendingPaymentForward {
334 payment_queue,
335 opening_fee_msat: *opening_fee_msat,
336 channel_id: *channel_id,
337 };
338 Ok(Some(forward_payment))
339 } else {
340 *self = OutboundJITChannelState::PendingPayment {
341 payment_queue,
342 opening_fee_msat: *opening_fee_msat,
343 channel_id: *channel_id,
344 };
345 Ok(None)
346 }
347 },
348 OutboundJITChannelState::PaymentForwarded { channel_id } => {
349 let forward = HTLCInterceptedAction::ForwardHTLC(*channel_id);
350 *self = OutboundJITChannelState::PaymentForwarded { channel_id: *channel_id };
351 Ok(Some(forward))
352 },
353 }
354 }
355
356 fn channel_ready(
357 &mut self, channel_id: ChannelId,
358 ) -> Result<ForwardPaymentAction, ChannelStateError> {
359 match self {
360 OutboundJITChannelState::PendingChannelOpen { payment_queue, opening_fee_msat } => {
361 if let Some(entry) = payment_queue.pop_greater_than_msat(*opening_fee_msat) {
362 let forward_payment = ForwardPaymentAction(
363 channel_id,
364 FeePayment { htlcs: entry.htlcs, opening_fee_msat: *opening_fee_msat },
365 );
366 *self = OutboundJITChannelState::PendingPaymentForward {
367 payment_queue: core::mem::take(payment_queue),
368 opening_fee_msat: *opening_fee_msat,
369 channel_id,
370 };
371 Ok(forward_payment)
372 } else {
373 return Err(ChannelStateError(
374 "No forwardable payment available when moving to channel ready."
375 .to_string(),
376 ));
377 }
378 },
379 state => Err(ChannelStateError(format!(
380 "Channel ready received when JIT Channel was in state: {:?}",
381 state
382 ))),
383 }
384 }
385
386 fn htlc_handling_failed(&mut self) -> Result<Option<ForwardPaymentAction>, ChannelStateError> {
387 match self {
388 OutboundJITChannelState::PendingPaymentForward {
389 payment_queue,
390 opening_fee_msat,
391 channel_id,
392 } => {
393 if let Some(entry) = payment_queue.pop_greater_than_msat(*opening_fee_msat) {
394 let forward_payment = ForwardPaymentAction(
395 *channel_id,
396 FeePayment { htlcs: entry.htlcs, opening_fee_msat: *opening_fee_msat },
397 );
398 *self = OutboundJITChannelState::PendingPaymentForward {
399 payment_queue: core::mem::take(payment_queue),
400 opening_fee_msat: *opening_fee_msat,
401 channel_id: *channel_id,
402 };
403 Ok(Some(forward_payment))
404 } else {
405 *self = OutboundJITChannelState::PendingPayment {
406 payment_queue: core::mem::take(payment_queue),
407 opening_fee_msat: *opening_fee_msat,
408 channel_id: *channel_id,
409 };
410 Ok(None)
411 }
412 },
413 OutboundJITChannelState::PendingPayment {
414 payment_queue,
415 opening_fee_msat,
416 channel_id,
417 } => {
418 *self = OutboundJITChannelState::PendingPayment {
419 payment_queue: core::mem::take(payment_queue),
420 opening_fee_msat: *opening_fee_msat,
421 channel_id: *channel_id,
422 };
423 Ok(None)
424 },
425 OutboundJITChannelState::PaymentForwarded { channel_id } => {
426 *self = OutboundJITChannelState::PaymentForwarded { channel_id: *channel_id };
427 Ok(None)
428 },
429 state => Err(ChannelStateError(format!(
430 "HTLC handling failed when JIT Channel was in state: {:?}",
431 state
432 ))),
433 }
434 }
435
436 fn payment_forwarded(
437 &mut self, skimmed_fee_msat: u64,
438 ) -> Result<Option<ForwardHTLCsAction>, ChannelStateError> {
439 match self {
440 OutboundJITChannelState::PendingPaymentForward {
441 payment_queue,
442 channel_id,
443 opening_fee_msat,
444 } => {
445 if skimmed_fee_msat >= *opening_fee_msat {
446 let mut pq = core::mem::take(payment_queue);
447 let forward_htlcs = ForwardHTLCsAction(*channel_id, pq.clear());
448 *self = OutboundJITChannelState::PaymentForwarded { channel_id: *channel_id };
449 Ok(Some(forward_htlcs))
450 } else {
451 *self = OutboundJITChannelState::PendingPaymentForward {
452 payment_queue: core::mem::take(payment_queue),
453 opening_fee_msat: *opening_fee_msat,
454 channel_id: *channel_id,
455 };
456 Ok(None)
457 }
458 },
459 OutboundJITChannelState::PaymentForwarded { channel_id } => {
460 *self = OutboundJITChannelState::PaymentForwarded { channel_id: *channel_id };
461 Ok(None)
462 },
463 state => Err(ChannelStateError(format!(
464 "Payment forwarded when JIT Channel was in state: {:?}",
465 state
466 ))),
467 }
468 }
469}
470
471impl_writeable_tlv_based_enum!(OutboundJITChannelState,
472 (0, PendingInitialPayment) => {
473 (0, payment_queue, required),
474 },
475 (2, PendingChannelOpen) => {
476 (0, payment_queue, required),
477 (2, opening_fee_msat, required),
478 },
479 (4, PendingPaymentForward) => {
480 (0, payment_queue, required),
481 (2, opening_fee_msat, required),
482 (4, channel_id, required),
483 },
484 (6, PendingPayment) => {
485 (0, payment_queue, required),
486 (2, opening_fee_msat, required),
487 (4, channel_id, required),
488 },
489 (8, PaymentForwarded) => {
490 (0, channel_id, required),
491 },
492);
493
494struct OutboundJITChannel {
495 state: OutboundJITChannelState,
496 user_channel_id: u128,
497 opening_fee_params: LSPS2OpeningFeeParams,
498 payment_size_msat: Option<u64>,
499 trust_model: TrustModel,
500}
501
502impl_writeable_tlv_based!(OutboundJITChannel, {
503 (0, state, required),
504 (2, user_channel_id, required),
505 (4, opening_fee_params, required),
506 (6, payment_size_msat, option),
507 (8, trust_model, required),
508});
509
510impl OutboundJITChannel {
511 fn new(
512 payment_size_msat: Option<u64>, opening_fee_params: LSPS2OpeningFeeParams,
513 user_channel_id: u128, client_trusts_lsp: bool,
514 ) -> Self {
515 Self {
516 user_channel_id,
517 state: OutboundJITChannelState::new(),
518 opening_fee_params,
519 payment_size_msat,
520 trust_model: TrustModel::new(client_trusts_lsp),
521 }
522 }
523
524 fn htlc_intercepted(
525 &mut self, htlc: InterceptedHTLC,
526 ) -> Result<Option<HTLCInterceptedAction>, LightningError> {
527 let action =
528 self.state.htlc_intercepted(&self.opening_fee_params, &self.payment_size_msat, htlc)?;
529 Ok(action)
530 }
531
532 fn htlc_handling_failed(&mut self) -> Result<Option<ForwardPaymentAction>, LightningError> {
533 let action = self.state.htlc_handling_failed()?;
534 Ok(action)
535 }
536
537 fn channel_ready(
538 &mut self, channel_id: ChannelId,
539 ) -> Result<ForwardPaymentAction, LightningError> {
540 let action = self.state.channel_ready(channel_id)?;
541 Ok(action)
542 }
543
544 fn payment_forwarded(
545 &mut self, skimmed_fee_msat: u64,
546 ) -> Result<Option<ForwardHTLCsAction>, LightningError> {
547 let action = self.state.payment_forwarded(skimmed_fee_msat)?;
548 Ok(action)
549 }
550
551 fn is_pending_initial_payment(&self) -> bool {
552 matches!(self.state, OutboundJITChannelState::PendingInitialPayment { .. })
553 }
554
555 fn is_prunable(&self) -> bool {
556 let is_expired = is_expired_opening_fee_params(&self.opening_fee_params);
559 self.is_pending_initial_payment() && is_expired
560 }
561
562 fn set_funding_tx(&mut self, funding_tx: Transaction) {
563 self.trust_model.set_funding_tx(funding_tx);
564 }
565
566 fn set_funding_tx_broadcast_safe(&mut self, funding_tx_broadcast_safe: bool) {
567 self.trust_model.set_funding_tx_broadcast_safe(funding_tx_broadcast_safe);
568 }
569
570 fn should_broadcast_funding_transaction(&self) -> bool {
571 self.trust_model.should_manually_broadcast(matches!(
572 self.state,
573 OutboundJITChannelState::PaymentForwarded { .. }
574 ))
575 }
576
577 fn get_channel_id(&self) -> Option<ChannelId> {
578 match self.state {
579 OutboundJITChannelState::PaymentForwarded { channel_id } => Some(channel_id),
580 _ => None,
581 }
582 }
583
584 fn get_funding_tx(&self) -> Option<&Transaction> {
585 self.trust_model.get_funding_tx()
586 }
587
588 fn is_client_trusts_lsp(&self) -> bool {
589 self.trust_model.is_client_trusts_lsp()
590 }
591}
592
593pub(crate) struct PeerState {
594 outbound_channels_by_intercept_scid: HashMap<u64, OutboundJITChannel>,
595 intercept_scid_by_user_channel_id: HashMap<u128, u64>,
596 intercept_scid_by_channel_id: HashMap<ChannelId, u64>,
597 pending_requests: HashMap<LSPSRequestId, LSPS2Request>,
598 needs_persist: bool,
599}
600
601impl PeerState {
602 fn new() -> Self {
603 let outbound_channels_by_intercept_scid = new_hash_map();
604 let pending_requests = new_hash_map();
605 let intercept_scid_by_user_channel_id = new_hash_map();
606 let intercept_scid_by_channel_id = new_hash_map();
607 let needs_persist = false;
608 Self {
609 outbound_channels_by_intercept_scid,
610 pending_requests,
611 intercept_scid_by_user_channel_id,
612 intercept_scid_by_channel_id,
613 needs_persist,
614 }
615 }
616
617 fn insert_outbound_channel(&mut self, intercept_scid: u64, channel: OutboundJITChannel) {
618 self.outbound_channels_by_intercept_scid.insert(intercept_scid, channel);
619 self.needs_persist |= true;
620 }
621
622 fn prune_expired_request_state(&mut self) {
623 self.pending_requests.retain(|_, entry| {
624 match entry {
625 LSPS2Request::GetInfo(_) => false,
626 LSPS2Request::Buy(request) => {
627 !is_expired_opening_fee_params(&request.opening_fee_params)
629 },
630 }
631 });
632
633 self.outbound_channels_by_intercept_scid.retain(|intercept_scid, entry| {
634 if entry.is_prunable() {
635 self.intercept_scid_by_channel_id.retain(|_, iscid| intercept_scid != iscid);
637 self.intercept_scid_by_user_channel_id.retain(|_, iscid| intercept_scid != iscid);
638 self.needs_persist |= true;
639 return false;
640 }
641 true
642 });
643 }
644
645 fn pending_requests_and_channels(&self) -> usize {
646 let pending_requests = self.pending_requests.len();
647 let pending_outbound_channels = self
648 .outbound_channels_by_intercept_scid
649 .iter()
650 .filter(|(_, v)| v.is_pending_initial_payment())
651 .count();
652 pending_requests + pending_outbound_channels
653 }
654
655 fn is_prunable(&self) -> bool {
656 self.pending_requests.is_empty() && self.outbound_channels_by_intercept_scid.is_empty()
658 }
659}
660
661impl_writeable_tlv_based!(PeerState, {
662 (0, outbound_channels_by_intercept_scid, required),
663 (2, intercept_scid_by_user_channel_id, required),
664 (4, intercept_scid_by_channel_id, required),
665 (_unused, pending_requests, (static_value, new_hash_map())),
666 (_unused, needs_persist, (static_value, false)),
667});
668
669macro_rules! get_or_insert_peer_state_entry {
670 ($self: ident, $outer_state_lock: expr, $message_queue_notifier: expr, $counterparty_node_id: expr) => {{
671 let is_limited_by_max_total_peers = $outer_state_lock.len() >= MAX_TOTAL_PEERS;
673 match $outer_state_lock.entry(*$counterparty_node_id) {
674 Entry::Vacant(e) => {
675 if is_limited_by_max_total_peers {
676 let error_response = LSPSResponseError {
677 code: JSONRPC_INTERNAL_ERROR_ERROR_CODE,
678 message: JSONRPC_INTERNAL_ERROR_ERROR_MESSAGE.to_string(), data: None,
679 };
680
681 let msg = LSPSMessage::Invalid(error_response);
682 $message_queue_notifier.enqueue($counterparty_node_id, msg);
683
684 let err = format!(
685 "Dropping request from peer {} due to reaching maximally allowed number of total peers: {}",
686 $counterparty_node_id, MAX_TOTAL_PEERS
687 );
688
689 return Err(LightningError { err, action: ErrorAction::IgnoreAndLog(Level::Error) });
690 } else {
691 e.insert(Mutex::new(PeerState::new()))
692 }
693 }
694 Entry::Occupied(e) => {
695 e.into_mut()
696 }
697 }
698
699 }}
700}
701
702pub struct LSPS2ServiceHandler<CM: Deref, K: Deref + Clone, T: Deref>
704where
705 CM::Target: AChannelManager,
706 K::Target: KVStore,
707 T::Target: BroadcasterInterface,
708{
709 channel_manager: CM,
710 kv_store: K,
711 tx_broadcaster: T,
712 pending_messages: Arc<MessageQueue>,
713 pending_events: Arc<EventQueue<K>>,
714 per_peer_state: RwLock<HashMap<PublicKey, Mutex<PeerState>>>,
715 peer_by_intercept_scid: RwLock<HashMap<u64, PublicKey>>,
716 peer_by_channel_id: RwLock<HashMap<ChannelId, PublicKey>>,
717 total_pending_requests: AtomicUsize,
718 config: LSPS2ServiceConfig,
719 persistence_in_flight: AtomicUsize,
720}
721
722impl<CM: Deref, K: Deref + Clone, T: Deref + Clone> LSPS2ServiceHandler<CM, K, T>
723where
724 CM::Target: AChannelManager,
725 K::Target: KVStore,
726 T::Target: BroadcasterInterface,
727{
728 pub(crate) fn new(
730 per_peer_state: HashMap<PublicKey, Mutex<PeerState>>, pending_messages: Arc<MessageQueue>,
731 pending_events: Arc<EventQueue<K>>, channel_manager: CM, kv_store: K, tx_broadcaster: T,
732 config: LSPS2ServiceConfig,
733 ) -> Result<Self, lightning::io::Error> {
734 let mut peer_by_intercept_scid = new_hash_map();
735 let mut peer_by_channel_id = new_hash_map();
736 for (node_id, peer_state) in per_peer_state.iter() {
737 let peer_state_lock = peer_state.lock().unwrap();
738 for (intercept_scid, _) in peer_state_lock.outbound_channels_by_intercept_scid.iter() {
739 let res = peer_by_intercept_scid.insert(*intercept_scid, *node_id);
740 debug_assert!(res.is_none(), "Intercept SCIDs should never collide");
741 if res.is_some() {
742 return Err(lightning::io::Error::new(
743 lightning::io::ErrorKind::InvalidData,
744 "Failed to read LSPS2 peer state due to data inconsistencies: Intercept SCIDs should never collide",
745 ));
746 }
747 }
748
749 for (channel_id, _) in peer_state_lock.intercept_scid_by_channel_id.iter() {
750 let res = peer_by_channel_id.insert(*channel_id, *node_id);
751 debug_assert!(res.is_none(), "Channel IDs should never collide");
752 if res.is_some() {
753 return Err(lightning::io::Error::new(
754 lightning::io::ErrorKind::InvalidData,
755 "Failed to read LSPS2 peer state due to data inconsistencies: Channel IDs should never collide",
756 ));
757 }
758 }
759 }
760
761 Ok(Self {
762 pending_messages,
763 pending_events,
764 per_peer_state: RwLock::new(per_peer_state),
765 peer_by_intercept_scid: RwLock::new(peer_by_intercept_scid),
766 peer_by_channel_id: RwLock::new(peer_by_channel_id),
767 total_pending_requests: AtomicUsize::new(0),
768 persistence_in_flight: AtomicUsize::new(0),
769 channel_manager,
770 kv_store,
771 tx_broadcaster,
772 config,
773 })
774 }
775
776 pub fn config(&self) -> &LSPS2ServiceConfig {
778 &self.config
779 }
780
781 pub(crate) fn has_active_requests(&self, counterparty_node_id: &PublicKey) -> bool {
783 let outer_state_lock = self.per_peer_state.read().unwrap();
784 outer_state_lock.get(counterparty_node_id).map_or(false, |inner| {
785 let peer_state = inner.lock().unwrap();
786 !peer_state.outbound_channels_by_intercept_scid.is_empty()
787 })
788 }
789
790 pub fn invalid_token_provided(
796 &self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId,
797 ) -> Result<(), APIError> {
798 let mut message_queue_notifier = self.pending_messages.notifier();
799
800 let outer_state_lock = self.per_peer_state.read().unwrap();
801
802 match outer_state_lock.get(counterparty_node_id) {
803 Some(inner_state_lock) => {
804 let mut peer_state_lock = inner_state_lock.lock().unwrap();
805
806 match self.remove_pending_request(&mut peer_state_lock, &request_id) {
807 Some(LSPS2Request::GetInfo(_)) => {
808 let response = LSPS2Response::GetInfoError(LSPSResponseError {
809 code: LSPS2_GET_INFO_REQUEST_UNRECOGNIZED_OR_STALE_TOKEN_ERROR_CODE,
810 message: "an unrecognized or stale token was provided".to_string(),
811 data: None,
812 });
813 let msg = LSPS2Message::Response(request_id, response).into();
814 message_queue_notifier.enqueue(counterparty_node_id, msg);
815 Ok(())
816 },
817 _ => Err(APIError::APIMisuseError {
818 err: format!(
819 "No pending get_info request for request_id: {:?}",
820 request_id
821 ),
822 }),
823 }
824 },
825 None => Err(APIError::APIMisuseError {
826 err: format!("No state for the counterparty exists: {}", counterparty_node_id),
827 }),
828 }
829 }
830
831 pub fn opening_fee_params_generated(
837 &self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId,
838 opening_fee_params_menu: Vec<LSPS2RawOpeningFeeParams>,
839 ) -> Result<(), APIError> {
840 let mut message_queue_notifier = self.pending_messages.notifier();
841
842 let outer_state_lock = self.per_peer_state.read().unwrap();
843
844 match outer_state_lock.get(counterparty_node_id) {
845 Some(inner_state_lock) => {
846 let mut peer_state_lock = inner_state_lock.lock().unwrap();
847
848 match self.remove_pending_request(&mut peer_state_lock, &request_id) {
849 Some(LSPS2Request::GetInfo(_)) => {
850 let mut opening_fee_params_menu: Vec<LSPS2OpeningFeeParams> =
851 opening_fee_params_menu
852 .into_iter()
853 .map(|param| {
854 param.into_opening_fee_params(
855 &self.config.promise_secret,
856 counterparty_node_id,
857 )
858 })
859 .collect();
860 opening_fee_params_menu.sort_by(|a, b| {
861 match a.min_fee_msat.cmp(&b.min_fee_msat) {
862 CmpOrdering::Equal => a.proportional.cmp(&b.proportional),
863 other => other,
864 }
865 });
866 let response = LSPS2Response::GetInfo(LSPS2GetInfoResponse {
867 opening_fee_params_menu,
868 });
869 let msg = LSPS2Message::Response(request_id, response).into();
870 message_queue_notifier.enqueue(counterparty_node_id, msg);
871 Ok(())
872 },
873 _ => Err(APIError::APIMisuseError {
874 err: format!(
875 "No pending get_info request for request_id: {:?}",
876 request_id
877 ),
878 }),
879 }
880 },
881 None => Err(APIError::APIMisuseError {
882 err: format!("No state for the counterparty exists: {}", counterparty_node_id),
883 }),
884 }
885 }
886
887 pub async fn invoice_parameters_generated(
908 &self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId, intercept_scid: u64,
909 cltv_expiry_delta: u32, client_trusts_lsp: bool, user_channel_id: u128,
910 ) -> Result<(), APIError> {
911 let mut message_queue_notifier = self.pending_messages.notifier();
912 let mut should_persist = false;
913
914 match self.per_peer_state.read().unwrap().get(counterparty_node_id) {
915 Some(inner_state_lock) => {
916 let mut peer_state_lock = inner_state_lock.lock().unwrap();
917
918 match self.remove_pending_request(&mut peer_state_lock, &request_id) {
919 Some(LSPS2Request::Buy(buy_request)) => {
920 {
921 let mut peer_by_intercept_scid =
922 self.peer_by_intercept_scid.write().unwrap();
923 peer_by_intercept_scid.insert(intercept_scid, *counterparty_node_id);
924 }
925
926 let outbound_jit_channel = OutboundJITChannel::new(
927 buy_request.payment_size_msat,
928 buy_request.opening_fee_params,
929 user_channel_id,
930 client_trusts_lsp,
931 );
932
933 peer_state_lock
934 .intercept_scid_by_user_channel_id
935 .insert(user_channel_id, intercept_scid);
936 peer_state_lock
937 .insert_outbound_channel(intercept_scid, outbound_jit_channel);
938 should_persist |= peer_state_lock.needs_persist;
939
940 let response = LSPS2Response::Buy(LSPS2BuyResponse {
941 jit_channel_scid: intercept_scid.into(),
942 lsp_cltv_expiry_delta: cltv_expiry_delta,
943 client_trusts_lsp,
944 });
945 let msg = LSPS2Message::Response(request_id, response).into();
946 message_queue_notifier.enqueue(counterparty_node_id, msg);
947 },
948 _ => {
949 return Err(APIError::APIMisuseError {
950 err: format!("No pending buy request for request_id: {:?}", request_id),
951 })
952 },
953 }
954 },
955 None => {
956 return Err(APIError::APIMisuseError {
957 err: format!("No state for the counterparty exists: {}", counterparty_node_id),
958 })
959 },
960 };
961
962 if should_persist {
963 self.persist_peer_state(*counterparty_node_id).await.map_err(|e| {
964 APIError::APIMisuseError {
965 err: format!(
966 "Failed to persist peer state for {}: {}",
967 counterparty_node_id, e
968 ),
969 }
970 })?;
971 }
972
973 Ok(())
974 }
975
976 pub async fn htlc_intercepted(
989 &self, intercept_scid: u64, intercept_id: InterceptId, expected_outbound_amount_msat: u64,
990 payment_hash: PaymentHash,
991 ) -> Result<(), APIError> {
992 let event_queue_notifier = self.pending_events.notifier();
993 let mut should_persist = None;
994
995 if let Some(counterparty_node_id) =
996 self.peer_by_intercept_scid.read().unwrap().get(&intercept_scid)
997 {
998 let outer_state_lock = self.per_peer_state.read().unwrap();
999 match outer_state_lock.get(counterparty_node_id) {
1000 Some(inner_state_lock) => {
1001 let mut peer_state = inner_state_lock.lock().unwrap();
1002 if let Some(jit_channel) =
1003 peer_state.outbound_channels_by_intercept_scid.get_mut(&intercept_scid)
1004 {
1005 should_persist = Some(*counterparty_node_id);
1006 let htlc = InterceptedHTLC {
1007 intercept_id,
1008 expected_outbound_amount_msat,
1009 payment_hash,
1010 };
1011 match jit_channel.htlc_intercepted(htlc) {
1012 Ok(Some(HTLCInterceptedAction::OpenChannel(open_channel_params))) => {
1013 let event = LSPS2ServiceEvent::OpenChannel {
1014 their_network_key: counterparty_node_id.clone(),
1015 amt_to_forward_msat: open_channel_params.amt_to_forward_msat,
1016 opening_fee_msat: open_channel_params.opening_fee_msat,
1017 user_channel_id: jit_channel.user_channel_id,
1018 intercept_scid,
1019 };
1020 event_queue_notifier.enqueue(event);
1021 },
1022 Ok(Some(HTLCInterceptedAction::ForwardHTLC(channel_id))) => {
1023 self.channel_manager.get_cm().forward_intercepted_htlc(
1024 intercept_id,
1025 &channel_id,
1026 *counterparty_node_id,
1027 expected_outbound_amount_msat,
1028 )?;
1029 },
1030 Ok(Some(HTLCInterceptedAction::ForwardPayment(
1031 channel_id,
1032 FeePayment { opening_fee_msat, htlcs },
1033 ))) => {
1034 let amounts_to_forward_msat =
1035 calculate_amount_to_forward_per_htlc(&htlcs, opening_fee_msat);
1036
1037 for (intercept_id, amount_to_forward_msat) in
1038 amounts_to_forward_msat
1039 {
1040 self.channel_manager.get_cm().forward_intercepted_htlc(
1041 intercept_id,
1042 &channel_id,
1043 *counterparty_node_id,
1044 amount_to_forward_msat,
1045 )?;
1046 }
1047 },
1048 Ok(None) => {},
1049 Err(e) => {
1050 self.channel_manager
1051 .get_cm()
1052 .fail_intercepted_htlc(intercept_id)?;
1053 peer_state
1054 .outbound_channels_by_intercept_scid
1055 .remove(&intercept_scid);
1056 return Err(APIError::APIMisuseError { err: e.err });
1058 },
1059 }
1060 }
1061
1062 peer_state.needs_persist |= should_persist.is_some();
1063 },
1064 None => {
1065 return Err(APIError::APIMisuseError {
1066 err: format!("No counterparty found for scid: {}", intercept_scid),
1067 });
1068 },
1069 }
1070 }
1071
1072 if let Some(counterparty_node_id) = should_persist {
1073 self.persist_peer_state(counterparty_node_id).await.map_err(|e| {
1074 APIError::APIMisuseError {
1075 err: format!(
1076 "Failed to persist peer state for {}: {}",
1077 counterparty_node_id, e
1078 ),
1079 }
1080 })?;
1081 }
1082
1083 Ok(())
1084 }
1085
1086 pub async fn htlc_handling_failed(
1094 &self, failure_type: HTLCHandlingFailureType,
1095 ) -> Result<(), APIError> {
1096 let mut should_persist = None;
1097 if let HTLCHandlingFailureType::Forward { channel_id, .. } = failure_type {
1098 let peer_by_channel_id = self.peer_by_channel_id.read().unwrap();
1099 if let Some(counterparty_node_id) = peer_by_channel_id.get(&channel_id) {
1100 let outer_state_lock = self.per_peer_state.read().unwrap();
1101 match outer_state_lock.get(counterparty_node_id) {
1102 Some(inner_state_lock) => {
1103 let mut peer_state = inner_state_lock.lock().unwrap();
1104 if let Some(intercept_scid) =
1105 peer_state.intercept_scid_by_channel_id.get(&channel_id).copied()
1106 {
1107 should_persist = Some(*counterparty_node_id);
1108
1109 if let Some(jit_channel) = peer_state
1110 .outbound_channels_by_intercept_scid
1111 .get_mut(&intercept_scid)
1112 {
1113 match jit_channel.htlc_handling_failed() {
1114 Ok(Some(ForwardPaymentAction(
1115 channel_id,
1116 FeePayment { opening_fee_msat, htlcs },
1117 ))) => {
1118 let amounts_to_forward_msat =
1119 calculate_amount_to_forward_per_htlc(
1120 &htlcs,
1121 opening_fee_msat,
1122 );
1123
1124 for (intercept_id, amount_to_forward_msat) in
1125 amounts_to_forward_msat
1126 {
1127 self.channel_manager
1128 .get_cm()
1129 .forward_intercepted_htlc(
1130 intercept_id,
1131 &channel_id,
1132 *counterparty_node_id,
1133 amount_to_forward_msat,
1134 )?;
1135 }
1136 },
1137 Ok(None) => {},
1138 Err(e) => {
1139 return Err(APIError::APIMisuseError {
1140 err: format!("Unable to fail HTLC: {}.", e.err),
1141 });
1142 },
1143 }
1144 }
1145 }
1146 peer_state.needs_persist |= should_persist.is_some();
1147 },
1148 None => {},
1149 }
1150 }
1151 }
1152
1153 if let Some(counterparty_node_id) = should_persist {
1154 self.persist_peer_state(counterparty_node_id).await.map_err(|e| {
1155 APIError::APIMisuseError {
1156 err: format!(
1157 "Failed to persist peer state for {}: {}",
1158 counterparty_node_id, e
1159 ),
1160 }
1161 })?;
1162 }
1163
1164 Ok(())
1165 }
1166
1167 pub async fn payment_forwarded(
1182 &self, next_channel_id: ChannelId, skimmed_fee_msat: u64,
1183 ) -> Result<(), APIError> {
1184 let mut should_persist = None;
1185 if let Some(counterparty_node_id) =
1186 self.peer_by_channel_id.read().unwrap().get(&next_channel_id)
1187 {
1188 let outer_state_lock = self.per_peer_state.read().unwrap();
1189 match outer_state_lock.get(counterparty_node_id) {
1190 Some(inner_state_lock) => {
1191 let mut peer_state = inner_state_lock.lock().unwrap();
1192 if let Some(intercept_scid) =
1193 peer_state.intercept_scid_by_channel_id.get(&next_channel_id).copied()
1194 {
1195 should_persist = Some(*counterparty_node_id);
1196
1197 if let Some(jit_channel) =
1198 peer_state.outbound_channels_by_intercept_scid.get_mut(&intercept_scid)
1199 {
1200 match jit_channel.payment_forwarded(skimmed_fee_msat) {
1201 Ok(Some(ForwardHTLCsAction(channel_id, htlcs))) => {
1202 for htlc in htlcs {
1203 self.channel_manager.get_cm().forward_intercepted_htlc(
1204 htlc.intercept_id,
1205 &channel_id,
1206 *counterparty_node_id,
1207 htlc.expected_outbound_amount_msat,
1208 )?;
1209 }
1210 },
1211 Ok(None) => {},
1212 Err(e) => {
1213 return Err(APIError::APIMisuseError {
1214 err: format!(
1215 "Forwarded payment was not applicable for JIT channel: {}",
1216 e.err
1217 ),
1218 })
1219 },
1220 }
1221
1222 self.broadcast_funding_transaction_if_applies(jit_channel);
1223 }
1224 } else {
1225 return Err(APIError::APIMisuseError {
1226 err: format!("No state for for channel id: {}", next_channel_id),
1227 });
1228 }
1229 peer_state.needs_persist |= should_persist.is_some();
1230 },
1231 None => {
1232 return Err(APIError::APIMisuseError {
1233 err: format!("No counterparty state for: {}", counterparty_node_id),
1234 });
1235 },
1236 }
1237 }
1238
1239 if let Some(counterparty_node_id) = should_persist {
1240 self.persist_peer_state(counterparty_node_id).await.map_err(|e| {
1241 APIError::APIMisuseError {
1242 err: format!(
1243 "Failed to persist peer state for {}: {}",
1244 counterparty_node_id, e
1245 ),
1246 }
1247 })?;
1248 }
1249
1250 Ok(())
1251 }
1252
1253 pub async fn channel_open_abandoned(
1269 &self, counterparty_node_id: &PublicKey, user_channel_id: u128,
1270 ) -> Result<(), APIError> {
1271 {
1272 let outer_state_lock = self.per_peer_state.read().unwrap();
1273 let inner_state_lock = outer_state_lock.get(counterparty_node_id).ok_or_else(|| {
1274 APIError::APIMisuseError {
1275 err: format!("No counterparty state for: {}", counterparty_node_id),
1276 }
1277 })?;
1278 let mut peer_state = inner_state_lock.lock().unwrap();
1279
1280 let intercept_scid = peer_state
1281 .intercept_scid_by_user_channel_id
1282 .get(&user_channel_id)
1283 .copied()
1284 .ok_or_else(|| APIError::APIMisuseError {
1285 err: format!(
1286 "Could not find a channel with user_channel_id {}",
1287 user_channel_id
1288 ),
1289 })?;
1290
1291 let jit_channel = peer_state
1292 .outbound_channels_by_intercept_scid
1293 .get(&intercept_scid)
1294 .ok_or_else(|| APIError::APIMisuseError {
1295 err: format!(
1296 "Failed to map intercept_scid {} for user_channel_id {} to a channel.",
1297 intercept_scid, user_channel_id,
1298 ),
1299 })?;
1300
1301 let is_pending = matches!(
1302 jit_channel.state,
1303 OutboundJITChannelState::PendingInitialPayment { .. }
1304 | OutboundJITChannelState::PendingChannelOpen { .. }
1305 );
1306
1307 if !is_pending {
1308 return Err(APIError::APIMisuseError {
1309 err: "Cannot abandon channel open after channel creation or payment forwarding"
1310 .to_string(),
1311 });
1312 }
1313
1314 peer_state.intercept_scid_by_user_channel_id.remove(&user_channel_id);
1315 peer_state.outbound_channels_by_intercept_scid.remove(&intercept_scid);
1316 peer_state.intercept_scid_by_channel_id.retain(|_, &mut scid| scid != intercept_scid);
1317 peer_state.needs_persist |= true;
1318 }
1319
1320 self.persist_peer_state(*counterparty_node_id).await.map_err(|e| {
1321 APIError::APIMisuseError {
1322 err: format!("Failed to persist peer state for {}: {}", counterparty_node_id, e),
1323 }
1324 })?;
1325
1326 Ok(())
1327 }
1328
1329 pub async fn channel_open_failed(
1337 &self, counterparty_node_id: &PublicKey, user_channel_id: u128,
1338 ) -> Result<(), APIError> {
1339 {
1340 let outer_state_lock = self.per_peer_state.read().unwrap();
1341
1342 let inner_state_lock = outer_state_lock.get(counterparty_node_id).ok_or_else(|| {
1343 APIError::APIMisuseError {
1344 err: format!("No counterparty state for: {}", counterparty_node_id),
1345 }
1346 })?;
1347
1348 let mut peer_state = inner_state_lock.lock().unwrap();
1349
1350 let intercept_scid = peer_state
1351 .intercept_scid_by_user_channel_id
1352 .get(&user_channel_id)
1353 .copied()
1354 .ok_or_else(|| APIError::APIMisuseError {
1355 err: format!(
1356 "Could not find a channel with user_channel_id {}",
1357 user_channel_id
1358 ),
1359 })?;
1360
1361 let jit_channel = peer_state
1362 .outbound_channels_by_intercept_scid
1363 .get_mut(&intercept_scid)
1364 .ok_or_else(|| APIError::APIMisuseError {
1365 err: format!(
1366 "Failed to map intercept_scid {} for user_channel_id {} to a channel.",
1367 intercept_scid, user_channel_id,
1368 ),
1369 })?;
1370
1371 if let OutboundJITChannelState::PendingChannelOpen { payment_queue, .. } =
1372 &mut jit_channel.state
1373 {
1374 let intercepted_htlcs = payment_queue.clear();
1375 for htlc in intercepted_htlcs {
1376 self.channel_manager.get_cm().fail_htlc_backwards_with_reason(
1377 &htlc.payment_hash,
1378 FailureCode::TemporaryNodeFailure,
1379 );
1380 }
1381
1382 jit_channel.state = OutboundJITChannelState::PendingInitialPayment {
1383 payment_queue: PaymentQueue::new(),
1384 };
1385 } else {
1386 return Err(APIError::APIMisuseError {
1387 err: "Channel is not in the PendingChannelOpen state.".to_string(),
1388 });
1389 }
1390
1391 peer_state.needs_persist |= true;
1392 }
1393 self.persist_peer_state(*counterparty_node_id).await.map_err(|e| {
1394 APIError::APIMisuseError {
1395 err: format!("Failed to persist peer state for {}: {}", counterparty_node_id, e),
1396 }
1397 })?;
1398
1399 Ok(())
1400 }
1401
1402 pub async fn channel_ready(
1409 &self, user_channel_id: u128, channel_id: &ChannelId, counterparty_node_id: &PublicKey,
1410 ) -> Result<(), APIError> {
1411 let mut should_persist = false;
1412 {
1413 let mut peer_by_channel_id = self.peer_by_channel_id.write().unwrap();
1414 peer_by_channel_id.insert(*channel_id, *counterparty_node_id);
1415 }
1416 match self.per_peer_state.read().unwrap().get(counterparty_node_id) {
1417 Some(inner_state_lock) => {
1418 let mut peer_state = inner_state_lock.lock().unwrap();
1419 if let Some(intercept_scid) =
1420 peer_state.intercept_scid_by_user_channel_id.get(&user_channel_id).copied()
1421 {
1422 should_persist |= true;
1423 peer_state.intercept_scid_by_channel_id.insert(*channel_id, intercept_scid);
1424 if let Some(jit_channel) =
1425 peer_state.outbound_channels_by_intercept_scid.get_mut(&intercept_scid)
1426 {
1427 match jit_channel.channel_ready(*channel_id) {
1428 Ok(ForwardPaymentAction(
1429 channel_id,
1430 FeePayment { opening_fee_msat, htlcs },
1431 )) => {
1432 let amounts_to_forward_msat =
1433 calculate_amount_to_forward_per_htlc(&htlcs, opening_fee_msat);
1434
1435 for (intercept_id, amount_to_forward_msat) in
1436 amounts_to_forward_msat
1437 {
1438 self.channel_manager.get_cm().forward_intercepted_htlc(
1439 intercept_id,
1440 &channel_id,
1441 *counterparty_node_id,
1442 amount_to_forward_msat,
1443 )?;
1444 }
1445 },
1446 Err(e) => {
1447 return Err(APIError::APIMisuseError {
1448 err: format!(
1449 "Failed to transition to channel ready: {}",
1450 e.err
1451 ),
1452 })
1453 },
1454 }
1455 } else {
1456 return Err(APIError::APIMisuseError {
1457 err: format!(
1458 "Could not find a channel with user_channel_id {}",
1459 user_channel_id
1460 ),
1461 });
1462 }
1463 } else {
1464 return Err(APIError::APIMisuseError {
1465 err: format!(
1466 "Could not find a channel with that user_channel_id {}",
1467 user_channel_id
1468 ),
1469 });
1470 }
1471 peer_state.needs_persist |= should_persist;
1472 },
1473 None => {
1474 return Err(APIError::APIMisuseError {
1475 err: format!("No counterparty state for: {}", counterparty_node_id),
1476 });
1477 },
1478 }
1479
1480 if should_persist {
1481 self.persist_peer_state(*counterparty_node_id).await.map_err(|e| {
1482 APIError::APIMisuseError {
1483 err: format!(
1484 "Failed to persist peer state for {}: {}",
1485 counterparty_node_id, e
1486 ),
1487 }
1488 })?;
1489 }
1490
1491 Ok(())
1492 }
1493
1494 fn handle_get_info_request(
1495 &self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
1496 params: LSPS2GetInfoRequest,
1497 ) -> Result<(), LightningError> {
1498 let mut message_queue_notifier = self.pending_messages.notifier();
1499 let event_queue_notifier = self.pending_events.notifier();
1500
1501 let mut outer_state_lock = self.per_peer_state.write().unwrap();
1502 let inner_state_lock = get_or_insert_peer_state_entry!(
1503 self,
1504 outer_state_lock,
1505 message_queue_notifier,
1506 counterparty_node_id
1507 );
1508 let mut peer_state_lock = inner_state_lock.lock().unwrap();
1509 let request = LSPS2Request::GetInfo(params.clone());
1510 self.insert_pending_request(
1511 &mut peer_state_lock,
1512 &mut message_queue_notifier,
1513 request_id.clone(),
1514 *counterparty_node_id,
1515 request,
1516 )?;
1517
1518 let event = LSPS2ServiceEvent::GetInfo {
1519 request_id,
1520 counterparty_node_id: *counterparty_node_id,
1521 token: params.token,
1522 };
1523 event_queue_notifier.enqueue(event);
1524
1525 Ok(())
1526 }
1527
1528 fn handle_buy_request(
1529 &self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey, params: LSPS2BuyRequest,
1530 ) -> Result<(), LightningError> {
1531 let mut message_queue_notifier = self.pending_messages.notifier();
1532 let event_queue_notifier = self.pending_events.notifier();
1533 if let Some(payment_size_msat) = params.payment_size_msat {
1534 if payment_size_msat < params.opening_fee_params.min_payment_size_msat {
1535 let response = LSPS2Response::BuyError(LSPSResponseError {
1536 code: LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_SMALL_ERROR_CODE,
1537 message: "payment size is below our minimum supported payment size".to_string(),
1538 data: None,
1539 });
1540 let msg = LSPS2Message::Response(request_id, response).into();
1541 message_queue_notifier.enqueue(counterparty_node_id, msg);
1542
1543 return Err(LightningError {
1544 err: "payment size is below our minimum supported payment size".to_string(),
1545 action: ErrorAction::IgnoreAndLog(Level::Info),
1546 });
1547 }
1548
1549 if payment_size_msat > params.opening_fee_params.max_payment_size_msat {
1550 let response = LSPS2Response::BuyError(LSPSResponseError {
1551 code: LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_LARGE_ERROR_CODE,
1552 message: "payment size is above our maximum supported payment size".to_string(),
1553 data: None,
1554 });
1555 let msg = LSPS2Message::Response(request_id, response).into();
1556 message_queue_notifier.enqueue(counterparty_node_id, msg);
1557 return Err(LightningError {
1558 err: "payment size is above our maximum supported payment size".to_string(),
1559 action: ErrorAction::IgnoreAndLog(Level::Info),
1560 });
1561 }
1562
1563 match compute_opening_fee(
1564 payment_size_msat,
1565 params.opening_fee_params.min_fee_msat,
1566 params.opening_fee_params.proportional.into(),
1567 ) {
1568 Some(opening_fee) => {
1569 if opening_fee >= payment_size_msat {
1570 let response = LSPS2Response::BuyError(LSPSResponseError {
1571 code: LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_SMALL_ERROR_CODE,
1572 message: "payment size is too small to cover the opening fee"
1573 .to_string(),
1574 data: None,
1575 });
1576 let msg = LSPS2Message::Response(request_id, response).into();
1577 message_queue_notifier.enqueue(counterparty_node_id, msg);
1578 return Err(LightningError {
1579 err: "payment size is too small to cover the opening fee".to_string(),
1580 action: ErrorAction::IgnoreAndLog(Level::Info),
1581 });
1582 }
1583 },
1584 None => {
1585 let response = LSPS2Response::BuyError(LSPSResponseError {
1586 code: LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_LARGE_ERROR_CODE,
1587 message: "overflow error when calculating opening_fee".to_string(),
1588 data: None,
1589 });
1590 let msg = LSPS2Message::Response(request_id, response).into();
1591 message_queue_notifier.enqueue(counterparty_node_id, msg);
1592 return Err(LightningError {
1593 err: "overflow error when calculating opening_fee".to_string(),
1594 action: ErrorAction::IgnoreAndLog(Level::Info),
1595 });
1596 },
1597 }
1598 }
1599
1600 if !is_valid_opening_fee_params(
1602 ¶ms.opening_fee_params,
1603 &self.config.promise_secret,
1604 counterparty_node_id,
1605 ) {
1606 let response = LSPS2Response::BuyError(LSPSResponseError {
1607 code: LSPS2_BUY_REQUEST_INVALID_OPENING_FEE_PARAMS_ERROR_CODE,
1608 message: "valid_until is already past OR the promise did not match the provided parameters".to_string(),
1609 data: None,
1610 });
1611 let msg = LSPS2Message::Response(request_id, response).into();
1612 message_queue_notifier.enqueue(counterparty_node_id, msg);
1613 return Err(LightningError {
1614 err: "invalid opening fee parameters were supplied by client".to_string(),
1615 action: ErrorAction::IgnoreAndLog(Level::Info),
1616 });
1617 }
1618
1619 let mut outer_state_lock = self.per_peer_state.write().unwrap();
1620 let inner_state_lock = get_or_insert_peer_state_entry!(
1621 self,
1622 outer_state_lock,
1623 message_queue_notifier,
1624 counterparty_node_id
1625 );
1626 let mut peer_state_lock = inner_state_lock.lock().unwrap();
1627
1628 let request = LSPS2Request::Buy(params.clone());
1629
1630 self.insert_pending_request(
1631 &mut peer_state_lock,
1632 &mut message_queue_notifier,
1633 request_id.clone(),
1634 *counterparty_node_id,
1635 request,
1636 )?;
1637
1638 let event = LSPS2ServiceEvent::BuyRequest {
1639 request_id,
1640 counterparty_node_id: *counterparty_node_id,
1641 opening_fee_params: params.opening_fee_params,
1642 payment_size_msat: params.payment_size_msat,
1643 };
1644 event_queue_notifier.enqueue(event);
1645
1646 Ok(())
1647 }
1648
1649 fn insert_pending_request<'a>(
1650 &self, peer_state_lock: &mut MutexGuard<'a, PeerState>,
1651 message_queue_notifier: &mut MessageQueueNotifierGuard, request_id: LSPSRequestId,
1652 counterparty_node_id: PublicKey, request: LSPS2Request,
1653 ) -> Result<(), LightningError> {
1654 let create_pending_request_limit_exceeded_response =
1655 |message_queue_notifier: &mut MessageQueueNotifierGuard, error_message: String| {
1656 let error_details = LSPSResponseError {
1657 code: LSPS0_CLIENT_REJECTED_ERROR_CODE,
1658 message: "Reached maximum number of pending requests. Please try again later."
1659 .to_string(),
1660 data: None,
1661 };
1662 let response = match &request {
1663 LSPS2Request::GetInfo(_) => LSPS2Response::GetInfoError(error_details),
1664 LSPS2Request::Buy(_) => LSPS2Response::BuyError(error_details),
1665 };
1666 let msg = LSPS2Message::Response(request_id.clone(), response).into();
1667 message_queue_notifier.enqueue(&counterparty_node_id, msg);
1668
1669 Err(LightningError {
1670 err: error_message,
1671 action: ErrorAction::IgnoreAndLog(Level::Debug),
1672 })
1673 };
1674
1675 if self.total_pending_requests.load(Ordering::Relaxed) >= MAX_TOTAL_PENDING_REQUESTS {
1676 let error_message = format!(
1677 "Reached maximum number of total pending requests: {}",
1678 MAX_TOTAL_PENDING_REQUESTS
1679 );
1680 return create_pending_request_limit_exceeded_response(
1681 message_queue_notifier,
1682 error_message,
1683 );
1684 }
1685
1686 if peer_state_lock.pending_requests_and_channels() < MAX_PENDING_REQUESTS_PER_PEER {
1687 peer_state_lock.pending_requests.insert(request_id, request);
1688 self.total_pending_requests.fetch_add(1, Ordering::Relaxed);
1689 Ok(())
1690 } else {
1691 let error_message = format!(
1692 "Peer {} reached maximum number of pending requests: {}",
1693 counterparty_node_id, MAX_PENDING_REQUESTS_PER_PEER
1694 );
1695 create_pending_request_limit_exceeded_response(message_queue_notifier, error_message)
1696 }
1697 }
1698
1699 fn remove_pending_request<'a>(
1700 &self, peer_state_lock: &mut MutexGuard<'a, PeerState>, request_id: &LSPSRequestId,
1701 ) -> Option<LSPS2Request> {
1702 match peer_state_lock.pending_requests.remove(request_id) {
1703 Some(req) => {
1704 let res = self.total_pending_requests.fetch_update(
1705 Ordering::Relaxed,
1706 Ordering::Relaxed,
1707 |x| Some(x.saturating_sub(1)),
1708 );
1709 match res {
1710 Ok(previous_value) if previous_value == 0 => debug_assert!(
1711 false,
1712 "total_pending_requests counter out-of-sync! This should never happen!"
1713 ),
1714 Err(previous_value) if previous_value == 0 => debug_assert!(
1715 false,
1716 "total_pending_requests counter out-of-sync! This should never happen!"
1717 ),
1718 _ => {},
1719 }
1720 Some(req)
1721 },
1722 res => res,
1723 }
1724 }
1725
1726 #[cfg(debug_assertions)]
1727 fn verify_pending_request_counter(&self) {
1728 let mut num_requests = 0;
1729 let outer_state_lock = self.per_peer_state.read().unwrap();
1730 for (_, inner) in outer_state_lock.iter() {
1731 let inner_state_lock = inner.lock().unwrap();
1732 num_requests += inner_state_lock.pending_requests.len();
1733 }
1734 debug_assert_eq!(
1735 num_requests,
1736 self.total_pending_requests.load(Ordering::Relaxed),
1737 "total_pending_requests counter out-of-sync! This should never happen!"
1738 );
1739 }
1740
1741 async fn persist_peer_state(
1742 &self, counterparty_node_id: PublicKey,
1743 ) -> Result<(), lightning::io::Error> {
1744 let fut = {
1745 let outer_state_lock = self.per_peer_state.read().unwrap();
1746 match outer_state_lock.get(&counterparty_node_id) {
1747 None => {
1748 return Ok(());
1750 },
1751 Some(entry) => {
1752 let mut peer_state_lock = entry.lock().unwrap();
1753 if !peer_state_lock.needs_persist {
1754 return Ok(());
1756 } else {
1757 peer_state_lock.needs_persist = false;
1758 let key = counterparty_node_id.to_string();
1759 let encoded = peer_state_lock.encode();
1760 self.kv_store.write(
1763 LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1764 LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
1765 &key,
1766 encoded,
1767 )
1768 }
1769 },
1770 }
1771 };
1772
1773 fut.await.map_err(|e| {
1774 self.per_peer_state
1775 .read()
1776 .unwrap()
1777 .get(&counterparty_node_id)
1778 .map(|p| p.lock().unwrap().needs_persist = true);
1779 e
1780 })
1781 }
1782
1783 pub(crate) async fn persist(&self) -> Result<(), lightning::io::Error> {
1784 if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 {
1789 return Ok(());
1792 }
1793
1794 loop {
1795 let mut need_remove = Vec::new();
1796 let mut need_persist = Vec::new();
1797
1798 {
1799 let outer_state_lock = self.per_peer_state.read().unwrap();
1802 for (counterparty_node_id, inner_state_lock) in outer_state_lock.iter() {
1803 let mut peer_state_lock = inner_state_lock.lock().unwrap();
1804 peer_state_lock.prune_expired_request_state();
1805 let is_prunable = peer_state_lock.is_prunable();
1806 if is_prunable {
1807 need_remove.push(*counterparty_node_id);
1808 } else if peer_state_lock.needs_persist {
1809 need_persist.push(*counterparty_node_id);
1810 }
1811 }
1812 }
1813
1814 for counterparty_node_id in need_persist.into_iter() {
1815 debug_assert!(!need_remove.contains(&counterparty_node_id));
1816 self.persist_peer_state(counterparty_node_id).await?;
1817 }
1818
1819 for counterparty_node_id in need_remove {
1820 let mut future_opt = None;
1821 {
1822 let mut per_peer_state = self.per_peer_state.write().unwrap();
1828 if let Entry::Occupied(mut entry) = per_peer_state.entry(counterparty_node_id) {
1829 let state = entry.get_mut().get_mut().unwrap();
1830 if state.is_prunable() {
1831 entry.remove();
1832 let key = counterparty_node_id.to_string();
1833 future_opt = Some(self.kv_store.remove(
1834 LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1835 LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
1836 &key,
1837 ));
1838 } else {
1839 state.needs_persist = true;
1841 }
1842 } else {
1843 debug_assert!(false);
1846 }
1847 }
1848 if let Some(future) = future_opt {
1849 future.await?;
1850 } else {
1851 self.persist_peer_state(counterparty_node_id).await?;
1852 }
1853 }
1854
1855 if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 {
1856 self.persistence_in_flight.store(1, Ordering::Release);
1859 continue;
1860 }
1861 break;
1862 }
1863
1864 Ok(())
1865 }
1866
1867 pub(crate) fn peer_disconnected(&self, counterparty_node_id: PublicKey) {
1868 let outer_state_lock = self.per_peer_state.write().unwrap();
1869 if let Some(inner_state_lock) = outer_state_lock.get(&counterparty_node_id) {
1870 let mut peer_state_lock = inner_state_lock.lock().unwrap();
1871 peer_state_lock.prune_expired_request_state();
1874 }
1875 }
1876
1877 pub fn channel_needs_manual_broadcast(
1881 &self, user_channel_id: u128, counterparty_node_id: &PublicKey,
1882 ) -> Result<bool, APIError> {
1883 let outer_state_lock = self.per_peer_state.read().unwrap();
1884 let inner_state_lock =
1885 outer_state_lock.get(counterparty_node_id).ok_or_else(|| APIError::APIMisuseError {
1886 err: format!("No counterparty state for: {}", counterparty_node_id),
1887 })?;
1888 let peer_state = inner_state_lock.lock().unwrap();
1889
1890 let intercept_scid = peer_state
1891 .intercept_scid_by_user_channel_id
1892 .get(&user_channel_id)
1893 .copied()
1894 .ok_or_else(|| APIError::APIMisuseError {
1895 err: format!("Could not find a channel with user_channel_id {}", user_channel_id),
1896 })?;
1897
1898 let jit_channel = peer_state
1899 .outbound_channels_by_intercept_scid
1900 .get(&intercept_scid)
1901 .ok_or_else(|| APIError::APIMisuseError {
1902 err: format!(
1903 "Failed to map intercept_scid {} for user_channel_id {} to a channel.",
1904 intercept_scid, user_channel_id,
1905 ),
1906 })?;
1907
1908 Ok(jit_channel.is_client_trusts_lsp())
1909 }
1910
1911 pub fn store_funding_transaction(
1922 &self, user_channel_id: u128, counterparty_node_id: &PublicKey, funding_tx: Transaction,
1923 ) -> Result<(), APIError> {
1924 let outer_state_lock = self.per_peer_state.read().unwrap();
1925 let inner_state_lock =
1926 outer_state_lock.get(counterparty_node_id).ok_or_else(|| APIError::APIMisuseError {
1927 err: format!("No counterparty state for: {}", counterparty_node_id),
1928 })?;
1929 let mut peer_state = inner_state_lock.lock().unwrap();
1930
1931 let intercept_scid = peer_state
1932 .intercept_scid_by_user_channel_id
1933 .get(&user_channel_id)
1934 .copied()
1935 .ok_or_else(|| APIError::APIMisuseError {
1936 err: format!("Could not find a channel with user_channel_id {}", user_channel_id),
1937 })?;
1938
1939 let jit_channel = peer_state
1940 .outbound_channels_by_intercept_scid
1941 .get_mut(&intercept_scid)
1942 .ok_or_else(|| APIError::APIMisuseError {
1943 err: format!(
1944 "Failed to map intercept_scid {} for user_channel_id {} to a channel.",
1945 intercept_scid, user_channel_id,
1946 ),
1947 })?;
1948
1949 jit_channel.set_funding_tx(funding_tx);
1950
1951 self.broadcast_funding_transaction_if_applies(jit_channel);
1952 Ok(())
1953 }
1954
1955 pub fn set_funding_tx_broadcast_safe(
1971 &self, user_channel_id: u128, counterparty_node_id: &PublicKey,
1972 ) -> Result<(), APIError> {
1973 let outer_state_lock = self.per_peer_state.read().unwrap();
1974 let inner_state_lock =
1975 outer_state_lock.get(counterparty_node_id).ok_or_else(|| APIError::APIMisuseError {
1976 err: format!("No counterparty state for: {}", counterparty_node_id),
1977 })?;
1978 let mut peer_state = inner_state_lock.lock().unwrap();
1979
1980 let intercept_scid = peer_state
1981 .intercept_scid_by_user_channel_id
1982 .get(&user_channel_id)
1983 .copied()
1984 .ok_or_else(|| APIError::APIMisuseError {
1985 err: format!("Could not find a channel with user_channel_id {}", user_channel_id),
1986 })?;
1987
1988 let jit_channel = peer_state
1989 .outbound_channels_by_intercept_scid
1990 .get_mut(&intercept_scid)
1991 .ok_or_else(|| APIError::APIMisuseError {
1992 err: format!(
1993 "Failed to map intercept_scid {} for user_channel_id {} to a channel.",
1994 intercept_scid, user_channel_id,
1995 ),
1996 })?;
1997
1998 jit_channel.set_funding_tx_broadcast_safe(true);
1999
2000 self.broadcast_funding_transaction_if_applies(jit_channel);
2001 Ok(())
2002 }
2003
2004 fn broadcast_funding_transaction_if_applies(&self, jit_channel: &OutboundJITChannel) {
2005 if !jit_channel.should_broadcast_funding_transaction() {
2006 return;
2007 }
2008
2009 let channel_id_opt = jit_channel.get_channel_id();
2016 if let Some(ch_id) = channel_id_opt {
2017 let is_channel_ready = self
2018 .channel_manager
2019 .get_cm()
2020 .list_channels()
2021 .into_iter()
2022 .any(|cd| cd.channel_id == ch_id && cd.is_channel_ready);
2023 if !is_channel_ready {
2024 return;
2025 }
2026 } else {
2027 return;
2028 }
2029
2030 if let Some(funding_tx) = jit_channel.get_funding_tx() {
2031 self.tx_broadcaster.broadcast_transactions(&[funding_tx]);
2032 }
2033 }
2034}
2035
2036impl<CM: Deref, K: Deref + Clone, T: Deref + Clone> LSPSProtocolMessageHandler
2037 for LSPS2ServiceHandler<CM, K, T>
2038where
2039 CM::Target: AChannelManager,
2040 K::Target: KVStore,
2041 T::Target: BroadcasterInterface,
2042{
2043 type ProtocolMessage = LSPS2Message;
2044 const PROTOCOL_NUMBER: Option<u16> = Some(2);
2045
2046 fn handle_message(
2047 &self, message: Self::ProtocolMessage, counterparty_node_id: &PublicKey,
2048 ) -> Result<(), LightningError> {
2049 match message {
2050 LSPS2Message::Request(request_id, request) => {
2051 let res = match request {
2052 LSPS2Request::GetInfo(params) => {
2053 self.handle_get_info_request(request_id, counterparty_node_id, params)
2054 },
2055 LSPS2Request::Buy(params) => {
2056 self.handle_buy_request(request_id, counterparty_node_id, params)
2057 },
2058 };
2059 #[cfg(debug_assertions)]
2060 self.verify_pending_request_counter();
2061 res
2062 },
2063 _ => {
2064 debug_assert!(
2065 false,
2066 "Service handler received LSPS2 response message. This should never happen."
2067 );
2068 Err(LightningError { err: format!("Service handler received LSPS2 response message from node {}. This should never happen.", counterparty_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)})
2069 },
2070 }
2071 }
2072}
2073
2074fn calculate_amount_to_forward_per_htlc(
2075 htlcs: &[InterceptedHTLC], total_fee_msat: u64,
2076) -> Vec<(InterceptId, u64)> {
2077 let total_expected_outbound_msat: u64 =
2079 htlcs.iter().map(|htlc| htlc.expected_outbound_amount_msat).sum();
2080 if total_fee_msat > total_expected_outbound_msat {
2081 debug_assert!(false, "Fee is larger than the total expected outbound amount.");
2082 return Vec::new();
2083 }
2084
2085 let mut fee_remaining_msat = total_fee_msat;
2086 let mut per_htlc_forwards = vec![];
2087 for (index, htlc) in htlcs.iter().enumerate() {
2088 let proportional_fee_amt_msat = (total_fee_msat as u128
2089 * htlc.expected_outbound_amount_msat as u128
2090 / total_expected_outbound_msat as u128) as u64;
2091
2092 let mut actual_fee_amt_msat = core::cmp::min(fee_remaining_msat, proportional_fee_amt_msat);
2093 actual_fee_amt_msat =
2094 core::cmp::min(actual_fee_amt_msat, htlc.expected_outbound_amount_msat);
2095 fee_remaining_msat -= actual_fee_amt_msat;
2096
2097 if index == htlcs.len() - 1 {
2098 actual_fee_amt_msat += fee_remaining_msat;
2099 }
2100
2101 let amount_to_forward_msat =
2102 htlc.expected_outbound_amount_msat.saturating_sub(actual_fee_amt_msat);
2103
2104 per_htlc_forwards.push((htlc.intercept_id, amount_to_forward_msat))
2105 }
2106 per_htlc_forwards
2107}
2108
2109pub struct LSPS2ServiceHandlerSync<'a, CM: Deref, K: Deref + Clone, T: Deref + Clone>
2112where
2113 CM::Target: AChannelManager,
2114 K::Target: KVStore,
2115 T::Target: BroadcasterInterface,
2116{
2117 inner: &'a LSPS2ServiceHandler<CM, K, T>,
2118}
2119
2120impl<'a, CM: Deref, K: Deref + Clone, T: Deref + Clone> LSPS2ServiceHandlerSync<'a, CM, K, T>
2121where
2122 CM::Target: AChannelManager,
2123 K::Target: KVStore,
2124 T::Target: BroadcasterInterface,
2125{
2126 pub(crate) fn from_inner(inner: &'a LSPS2ServiceHandler<CM, K, T>) -> Self {
2127 Self { inner }
2128 }
2129
2130 pub fn config(&self) -> &LSPS2ServiceConfig {
2134 &self.inner.config
2135 }
2136
2137 pub fn invalid_token_provided(
2141 &self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId,
2142 ) -> Result<(), APIError> {
2143 self.inner.invalid_token_provided(counterparty_node_id, request_id)
2144 }
2145
2146 pub fn opening_fee_params_generated(
2150 &self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId,
2151 opening_fee_params_menu: Vec<LSPS2RawOpeningFeeParams>,
2152 ) -> Result<(), APIError> {
2153 self.inner.opening_fee_params_generated(
2154 counterparty_node_id,
2155 request_id,
2156 opening_fee_params_menu,
2157 )
2158 }
2159
2160 pub fn invoice_parameters_generated(
2165 &self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId, intercept_scid: u64,
2166 cltv_expiry_delta: u32, client_trusts_lsp: bool, user_channel_id: u128,
2167 ) -> Result<(), APIError> {
2168 let mut fut = Box::pin(self.inner.invoice_parameters_generated(
2169 counterparty_node_id,
2170 request_id,
2171 intercept_scid,
2172 cltv_expiry_delta,
2173 client_trusts_lsp,
2174 user_channel_id,
2175 ));
2176
2177 let mut waker = dummy_waker();
2178 let mut ctx = task::Context::from_waker(&mut waker);
2179 match fut.as_mut().poll(&mut ctx) {
2180 task::Poll::Ready(result) => result,
2181 task::Poll::Pending => {
2182 unreachable!("Should not be pending in a sync context");
2184 },
2185 }
2186 }
2187
2188 pub fn htlc_intercepted(
2194 &self, intercept_scid: u64, intercept_id: InterceptId, expected_outbound_amount_msat: u64,
2195 payment_hash: PaymentHash,
2196 ) -> Result<(), APIError> {
2197 let mut fut = Box::pin(self.inner.htlc_intercepted(
2198 intercept_scid,
2199 intercept_id,
2200 expected_outbound_amount_msat,
2201 payment_hash,
2202 ));
2203
2204 let mut waker = dummy_waker();
2205 let mut ctx = task::Context::from_waker(&mut waker);
2206 match fut.as_mut().poll(&mut ctx) {
2207 task::Poll::Ready(result) => result,
2208 task::Poll::Pending => {
2209 unreachable!("Should not be pending in a sync context");
2211 },
2212 }
2213 }
2214
2215 pub fn htlc_handling_failed(
2221 &self, failure_type: HTLCHandlingFailureType,
2222 ) -> Result<(), APIError> {
2223 let mut fut = Box::pin(self.inner.htlc_handling_failed(failure_type));
2224
2225 let mut waker = dummy_waker();
2226 let mut ctx = task::Context::from_waker(&mut waker);
2227 match fut.as_mut().poll(&mut ctx) {
2228 task::Poll::Ready(result) => result,
2229 task::Poll::Pending => {
2230 unreachable!("Should not be pending in a sync context");
2232 },
2233 }
2234 }
2235
2236 pub fn payment_forwarded(
2242 &self, next_channel_id: ChannelId, skimmed_fee_msat: u64,
2243 ) -> Result<(), APIError> {
2244 let mut fut = Box::pin(self.inner.payment_forwarded(next_channel_id, skimmed_fee_msat));
2245
2246 let mut waker = dummy_waker();
2247 let mut ctx = task::Context::from_waker(&mut waker);
2248 match fut.as_mut().poll(&mut ctx) {
2249 task::Poll::Ready(result) => result,
2250 task::Poll::Pending => {
2251 unreachable!("Should not be pending in a sync context");
2253 },
2254 }
2255 }
2256
2257 pub fn channel_needs_manual_broadcast(
2259 &self, user_channel_id: u128, counterparty_node_id: &PublicKey,
2260 ) -> Result<bool, APIError> {
2261 self.inner.channel_needs_manual_broadcast(user_channel_id, counterparty_node_id)
2262 }
2263
2264 pub fn store_funding_transaction(
2266 &self, user_channel_id: u128, counterparty_node_id: &PublicKey, funding_tx: Transaction,
2267 ) -> Result<(), APIError> {
2268 self.inner.store_funding_transaction(user_channel_id, counterparty_node_id, funding_tx)
2269 }
2270
2271 pub fn set_funding_tx_broadcast_safe(
2273 &self, user_channel_id: u128, counterparty_node_id: &PublicKey,
2274 ) -> Result<(), APIError> {
2275 self.inner.set_funding_tx_broadcast_safe(user_channel_id, counterparty_node_id)
2276 }
2277
2278 pub fn channel_open_abandoned(
2282 &self, counterparty_node_id: &PublicKey, user_channel_id: u128,
2283 ) -> Result<(), APIError> {
2284 let mut fut =
2285 Box::pin(self.inner.channel_open_abandoned(counterparty_node_id, user_channel_id));
2286
2287 let mut waker = dummy_waker();
2288 let mut ctx = task::Context::from_waker(&mut waker);
2289 match fut.as_mut().poll(&mut ctx) {
2290 task::Poll::Ready(result) => result,
2291 task::Poll::Pending => {
2292 unreachable!("Should not be pending in a sync context");
2294 },
2295 }
2296 }
2297
2298 pub fn channel_open_failed(
2302 &self, counterparty_node_id: &PublicKey, user_channel_id: u128,
2303 ) -> Result<(), APIError> {
2304 let mut fut =
2305 Box::pin(self.inner.channel_open_failed(counterparty_node_id, user_channel_id));
2306
2307 let mut waker = dummy_waker();
2308 let mut ctx = task::Context::from_waker(&mut waker);
2309 match fut.as_mut().poll(&mut ctx) {
2310 task::Poll::Ready(result) => result,
2311 task::Poll::Pending => {
2312 unreachable!("Should not be pending in a sync context");
2314 },
2315 }
2316 }
2317
2318 pub fn channel_ready(
2324 &self, user_channel_id: u128, channel_id: &ChannelId, counterparty_node_id: &PublicKey,
2325 ) -> Result<(), APIError> {
2326 let mut fut =
2327 Box::pin(self.inner.channel_ready(user_channel_id, channel_id, counterparty_node_id));
2328
2329 let mut waker = dummy_waker();
2330 let mut ctx = task::Context::from_waker(&mut waker);
2331 match fut.as_mut().poll(&mut ctx) {
2332 task::Poll::Ready(result) => result,
2333 task::Poll::Pending => {
2334 unreachable!("Should not be pending in a sync context");
2336 },
2337 }
2338 }
2339}
2340
2341#[cfg(test)]
2342mod tests {
2343 use super::*;
2344
2345 use crate::lsps0::ser::LSPSDateTime;
2346
2347 use proptest::prelude::*;
2348
2349 use bitcoin::{absolute::LockTime, transaction::Version};
2350 use core::str::FromStr;
2351
2352 const MAX_VALUE_MSAT: u64 = 21_000_000_0000_0000_000;
2353
2354 fn arb_forward_amounts() -> impl Strategy<Value = (u64, u64, u64, u64)> {
2355 (1u64..MAX_VALUE_MSAT, 1u64..MAX_VALUE_MSAT, 1u64..MAX_VALUE_MSAT, 1u64..MAX_VALUE_MSAT)
2356 .prop_map(|(a, b, c, d)| {
2357 (a, b, c, core::cmp::min(d, a.saturating_add(b).saturating_add(c)))
2358 })
2359 }
2360
2361 proptest! {
2362 #[test]
2363 fn proptest_calculate_amount_to_forward((o_0, o_1, o_2, total_fee_msat) in arb_forward_amounts()) {
2364 let htlcs = vec![
2365 InterceptedHTLC {
2366 intercept_id: InterceptId([0; 32]),
2367 expected_outbound_amount_msat: o_0,
2368 payment_hash: PaymentHash([0; 32]),
2369 },
2370 InterceptedHTLC {
2371 intercept_id: InterceptId([1; 32]),
2372 expected_outbound_amount_msat: o_1,
2373 payment_hash: PaymentHash([0; 32]),
2374 },
2375 InterceptedHTLC {
2376 intercept_id: InterceptId([2; 32]),
2377 expected_outbound_amount_msat: o_2,
2378 payment_hash: PaymentHash([0; 32]),
2379 },
2380 ];
2381
2382 let result = calculate_amount_to_forward_per_htlc(&htlcs, total_fee_msat);
2383 let total_received_msat = o_0 + o_1 + o_2;
2384
2385 if total_received_msat < total_fee_msat {
2386 assert_eq!(result.len(), 0);
2387 } else {
2388 assert_ne!(result.len(), 0);
2389 assert_eq!(result[0].0, htlcs[0].intercept_id);
2390 assert_eq!(result[1].0, htlcs[1].intercept_id);
2391 assert_eq!(result[2].0, htlcs[2].intercept_id);
2392 assert!(result[0].1 <= o_0);
2393 assert!(result[1].1 <= o_1);
2394 assert!(result[2].1 <= o_2);
2395
2396 let result_sum = result.iter().map(|(_, f)| f).sum::<u64>();
2397 assert_eq!(total_received_msat - result_sum, total_fee_msat);
2398 let five_pct = result_sum as f32 * 0.05;
2399 let fair_share_0 = (o_0 as f32 / total_received_msat as f32) * result_sum as f32;
2400 assert!(result[0].1 as f32 <= fair_share_0 + five_pct);
2401 let fair_share_1 = (o_1 as f32 / total_received_msat as f32) * result_sum as f32;
2402 assert!(result[1].1 as f32 <= fair_share_1 + five_pct);
2403 let fair_share_2 = (o_2 as f32 / total_received_msat as f32) * result_sum as f32;
2404 assert!(result[2].1 as f32 <= fair_share_2 + five_pct);
2405 }
2406 }
2407 }
2408
2409 #[test]
2410 fn test_calculate_amount_to_forward() {
2411 let htlcs = vec![
2412 InterceptedHTLC {
2413 intercept_id: InterceptId([0; 32]),
2414 expected_outbound_amount_msat: 2,
2415 payment_hash: PaymentHash([0; 32]),
2416 },
2417 InterceptedHTLC {
2418 intercept_id: InterceptId([1; 32]),
2419 expected_outbound_amount_msat: 6,
2420 payment_hash: PaymentHash([0; 32]),
2421 },
2422 InterceptedHTLC {
2423 intercept_id: InterceptId([2; 32]),
2424 expected_outbound_amount_msat: 2,
2425 payment_hash: PaymentHash([0; 32]),
2426 },
2427 ];
2428 let result = calculate_amount_to_forward_per_htlc(&htlcs, 5);
2429 assert_eq!(
2430 result,
2431 vec![
2432 (htlcs[0].intercept_id, 1),
2433 (htlcs[1].intercept_id, 3),
2434 (htlcs[2].intercept_id, 1),
2435 ]
2436 );
2437 }
2438
2439 #[test]
2440 fn test_jit_channel_state_mpp() {
2441 let payment_size_msat = Some(500_000_000);
2442 let opening_fee_params = LSPS2OpeningFeeParams {
2443 min_fee_msat: 10_000_000,
2444 proportional: 10_000,
2445 valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(),
2446 min_lifetime: 4032,
2447 max_client_to_self_delay: 2016,
2448 min_payment_size_msat: 10_000_000,
2449 max_payment_size_msat: 1_000_000_000,
2450 promise: "ignore".to_string(),
2451 };
2452 let mut state = OutboundJITChannelState::new();
2453 {
2455 let action = state
2456 .htlc_intercepted(
2457 &opening_fee_params,
2458 &payment_size_msat,
2459 InterceptedHTLC {
2460 intercept_id: InterceptId([0; 32]),
2461 expected_outbound_amount_msat: 200_000_000,
2462 payment_hash: PaymentHash([100; 32]),
2463 },
2464 )
2465 .unwrap();
2466 assert!(matches!(state, OutboundJITChannelState::PendingInitialPayment { .. }));
2467 assert!(action.is_none());
2468 }
2469 {
2471 let action = state
2472 .htlc_intercepted(
2473 &opening_fee_params,
2474 &payment_size_msat,
2475 InterceptedHTLC {
2476 intercept_id: InterceptId([1; 32]),
2477 expected_outbound_amount_msat: 1_000_000,
2478 payment_hash: PaymentHash([101; 32]),
2479 },
2480 )
2481 .unwrap();
2482 assert!(matches!(state, OutboundJITChannelState::PendingInitialPayment { .. }));
2483 assert!(action.is_none());
2484 }
2485 {
2488 let action = state
2489 .htlc_intercepted(
2490 &opening_fee_params,
2491 &payment_size_msat,
2492 InterceptedHTLC {
2493 intercept_id: InterceptId([2; 32]),
2494 expected_outbound_amount_msat: 300_000_000,
2495 payment_hash: PaymentHash([100; 32]),
2496 },
2497 )
2498 .unwrap();
2499 assert!(matches!(state, OutboundJITChannelState::PendingChannelOpen { .. }));
2500 assert!(matches!(action, Some(HTLCInterceptedAction::OpenChannel(_))));
2501 }
2502 {
2504 let ForwardPaymentAction(channel_id, payment) =
2505 state.channel_ready(ChannelId([200; 32])).unwrap();
2506 assert_eq!(channel_id, ChannelId([200; 32]));
2507 assert_eq!(payment.opening_fee_msat, 10_000_000);
2508 assert_eq!(
2509 payment.htlcs,
2510 vec![
2511 InterceptedHTLC {
2512 intercept_id: InterceptId([0; 32]),
2513 expected_outbound_amount_msat: 200_000_000,
2514 payment_hash: PaymentHash([100; 32]),
2515 },
2516 InterceptedHTLC {
2517 intercept_id: InterceptId([2; 32]),
2518 expected_outbound_amount_msat: 300_000_000,
2519 payment_hash: PaymentHash([100; 32]),
2520 },
2521 ]
2522 );
2523 }
2524 {
2526 let action = state
2527 .htlc_intercepted(
2528 &opening_fee_params,
2529 &payment_size_msat,
2530 InterceptedHTLC {
2531 intercept_id: InterceptId([3; 32]),
2532 expected_outbound_amount_msat: 2_000_000,
2533 payment_hash: PaymentHash([102; 32]),
2534 },
2535 )
2536 .unwrap();
2537 assert!(matches!(state, OutboundJITChannelState::PendingPaymentForward { .. }));
2538 assert!(action.is_none());
2539 }
2540 {
2542 let action = state.htlc_handling_failed().unwrap();
2543 assert!(matches!(state, OutboundJITChannelState::PendingPayment { .. }));
2544 assert!(action.is_none());
2546 }
2547 {
2549 let action = state
2550 .htlc_intercepted(
2551 &opening_fee_params,
2552 &payment_size_msat,
2553 InterceptedHTLC {
2554 intercept_id: InterceptId([4; 32]),
2555 expected_outbound_amount_msat: 500_000_000,
2556 payment_hash: PaymentHash([101; 32]),
2557 },
2558 )
2559 .unwrap();
2560 assert!(matches!(state, OutboundJITChannelState::PendingPaymentForward { .. }));
2561 match action {
2562 Some(HTLCInterceptedAction::ForwardPayment(channel_id, payment)) => {
2563 assert_eq!(channel_id, ChannelId([200; 32]));
2564 assert_eq!(payment.opening_fee_msat, 10_000_000);
2565 assert_eq!(
2566 payment.htlcs,
2567 vec![
2568 InterceptedHTLC {
2569 intercept_id: InterceptId([1; 32]),
2570 expected_outbound_amount_msat: 1_000_000,
2571 payment_hash: PaymentHash([101; 32]),
2572 },
2573 InterceptedHTLC {
2574 intercept_id: InterceptId([4; 32]),
2575 expected_outbound_amount_msat: 500_000_000,
2576 payment_hash: PaymentHash([101; 32]),
2577 },
2578 ]
2579 );
2580 },
2581 _ => panic!("Unexpected action when intercepted HTLC."),
2582 }
2583 }
2584 {
2586 let action = state.payment_forwarded(100000000000).unwrap();
2587 assert!(matches!(state, OutboundJITChannelState::PaymentForwarded { .. }));
2588 match action {
2589 Some(ForwardHTLCsAction(channel_id, htlcs)) => {
2590 assert_eq!(channel_id, ChannelId([200; 32]));
2591 assert_eq!(
2592 htlcs,
2593 vec![InterceptedHTLC {
2594 intercept_id: InterceptId([3; 32]),
2595 expected_outbound_amount_msat: 2_000_000,
2596 payment_hash: PaymentHash([102; 32]),
2597 }]
2598 );
2599 },
2600 _ => panic!("Unexpected action when forwarded payment."),
2601 }
2602 }
2603 {
2605 let action = state
2606 .htlc_intercepted(
2607 &opening_fee_params,
2608 &payment_size_msat,
2609 InterceptedHTLC {
2610 intercept_id: InterceptId([5; 32]),
2611 expected_outbound_amount_msat: 200_000_000,
2612 payment_hash: PaymentHash([103; 32]),
2613 },
2614 )
2615 .unwrap();
2616 assert!(matches!(state, OutboundJITChannelState::PaymentForwarded { .. }));
2617 assert!(
2618 matches!(action, Some(HTLCInterceptedAction::ForwardHTLC(channel_id)) if channel_id == ChannelId([200; 32]))
2619 );
2620 }
2621 }
2622
2623 #[test]
2624 fn test_jit_channel_state_no_mpp() {
2625 let payment_size_msat = None;
2626 let opening_fee_params = LSPS2OpeningFeeParams {
2627 min_fee_msat: 10_000_000,
2628 proportional: 10_000,
2629 valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(),
2630 min_lifetime: 4032,
2631 max_client_to_self_delay: 2016,
2632 min_payment_size_msat: 10_000_000,
2633 max_payment_size_msat: 1_000_000_000,
2634 promise: "ignore".to_string(),
2635 };
2636 let mut state = OutboundJITChannelState::new();
2637 {
2639 let action = state
2640 .htlc_intercepted(
2641 &opening_fee_params,
2642 &payment_size_msat,
2643 InterceptedHTLC {
2644 intercept_id: InterceptId([0; 32]),
2645 expected_outbound_amount_msat: 500_000_000,
2646 payment_hash: PaymentHash([100; 32]),
2647 },
2648 )
2649 .unwrap();
2650 assert!(matches!(state, OutboundJITChannelState::PendingChannelOpen { .. }));
2651 assert!(matches!(action, Some(HTLCInterceptedAction::OpenChannel(_))));
2652 }
2653 {
2655 let action = state
2656 .htlc_intercepted(
2657 &opening_fee_params,
2658 &payment_size_msat,
2659 InterceptedHTLC {
2660 intercept_id: InterceptId([1; 32]),
2661 expected_outbound_amount_msat: 600_000_000,
2662 payment_hash: PaymentHash([101; 32]),
2663 },
2664 )
2665 .unwrap();
2666 assert!(matches!(state, OutboundJITChannelState::PendingChannelOpen { .. }));
2667 assert!(action.is_none());
2668 }
2669 {
2671 let ForwardPaymentAction(channel_id, payment) =
2672 state.channel_ready(ChannelId([200; 32])).unwrap();
2673 assert_eq!(channel_id, ChannelId([200; 32]));
2674 assert_eq!(payment.opening_fee_msat, 10_000_000);
2675 assert_eq!(
2676 payment.htlcs,
2677 vec![InterceptedHTLC {
2678 intercept_id: InterceptId([0; 32]),
2679 expected_outbound_amount_msat: 500_000_000,
2680 payment_hash: PaymentHash([100; 32]),
2681 },]
2682 );
2683 }
2684 {
2686 let action = state
2687 .htlc_intercepted(
2688 &opening_fee_params,
2689 &payment_size_msat,
2690 InterceptedHTLC {
2691 intercept_id: InterceptId([2; 32]),
2692 expected_outbound_amount_msat: 500_000_000,
2693 payment_hash: PaymentHash([102; 32]),
2694 },
2695 )
2696 .unwrap();
2697 assert!(matches!(state, OutboundJITChannelState::PendingPaymentForward { .. }));
2698 assert!(action.is_none());
2699 }
2700 {
2702 let action = state.htlc_handling_failed().unwrap();
2703 assert!(matches!(state, OutboundJITChannelState::PendingPaymentForward { .. }));
2704 match action {
2705 Some(ForwardPaymentAction(channel_id, payment)) => {
2706 assert_eq!(channel_id, ChannelId([200; 32]));
2707 assert_eq!(
2708 payment.htlcs,
2709 vec![InterceptedHTLC {
2710 intercept_id: InterceptId([1; 32]),
2711 expected_outbound_amount_msat: 600_000_000,
2712 payment_hash: PaymentHash([101; 32]),
2713 },]
2714 );
2715 },
2716 _ => panic!("Unexpected action when HTLC handling failed."),
2717 }
2718 }
2719 {
2721 let action = state.payment_forwarded(10000000000).unwrap();
2722 assert!(matches!(state, OutboundJITChannelState::PaymentForwarded { .. }));
2723 match action {
2724 Some(ForwardHTLCsAction(channel_id, htlcs)) => {
2725 assert_eq!(channel_id, ChannelId([200; 32]));
2726 assert_eq!(
2727 htlcs,
2728 vec![InterceptedHTLC {
2729 intercept_id: InterceptId([2; 32]),
2730 expected_outbound_amount_msat: 500_000_000,
2731 payment_hash: PaymentHash([102; 32]),
2732 }]
2733 );
2734 },
2735 _ => panic!("Unexpected action when forwarded payment."),
2736 }
2737 }
2738 {
2740 let action = state
2741 .htlc_intercepted(
2742 &opening_fee_params,
2743 &payment_size_msat,
2744 InterceptedHTLC {
2745 intercept_id: InterceptId([3; 32]),
2746 expected_outbound_amount_msat: 200_000_000,
2747 payment_hash: PaymentHash([103; 32]),
2748 },
2749 )
2750 .unwrap();
2751 assert!(matches!(state, OutboundJITChannelState::PaymentForwarded { .. }));
2752 assert!(
2753 matches!(action, Some(HTLCInterceptedAction::ForwardHTLC(channel_id)) if channel_id == ChannelId([200; 32]))
2754 );
2755 }
2756 }
2757
2758 #[test]
2759 fn broadcast_not_allowed_after_non_paying_fee_payment_claimed() {
2760 let min_fee_msat: u64 = 12345;
2761 let opening_fee_params = LSPS2OpeningFeeParams {
2762 min_fee_msat,
2763 proportional: 0,
2764 valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(),
2765 min_lifetime: 144,
2766 max_client_to_self_delay: 128,
2767 min_payment_size_msat: 1,
2768 max_payment_size_msat: 10_000_000_000,
2769 promise: "ignore".to_string(),
2770 };
2771
2772 let payment_size_msat = Some(1_000_000);
2773 let user_channel_id = 4242u128;
2774 let mut jit_channel = OutboundJITChannel::new(
2775 payment_size_msat,
2776 opening_fee_params.clone(),
2777 user_channel_id,
2778 true,
2779 );
2780
2781 let opening_payment_hash = PaymentHash([42; 32]);
2782 let htlcs_for_opening = [
2783 InterceptedHTLC {
2784 intercept_id: InterceptId([0; 32]),
2785 expected_outbound_amount_msat: 400_000,
2786 payment_hash: opening_payment_hash,
2787 },
2788 InterceptedHTLC {
2789 intercept_id: InterceptId([1; 32]),
2790 expected_outbound_amount_msat: 600_000,
2791 payment_hash: opening_payment_hash,
2792 },
2793 ];
2794
2795 assert!(jit_channel.htlc_intercepted(htlcs_for_opening[0].clone()).unwrap().is_none());
2796 let action = jit_channel.htlc_intercepted(htlcs_for_opening[1].clone()).unwrap();
2797 match action {
2798 Some(HTLCInterceptedAction::OpenChannel(_)) => {},
2799 other => panic!("Expected OpenChannel action, got {:?}", other),
2800 }
2801
2802 let channel_id = ChannelId([7; 32]);
2803 let ForwardPaymentAction(_, fee_payment) = jit_channel.channel_ready(channel_id).unwrap();
2804 assert_eq!(fee_payment.opening_fee_msat, min_fee_msat);
2805
2806 let followup = jit_channel.htlc_handling_failed().unwrap();
2807 assert!(followup.is_none());
2808
2809 let dummy_tx = Transaction {
2810 version: Version(2),
2811 lock_time: LockTime::ZERO,
2812 input: vec![],
2813 output: vec![],
2814 };
2815 jit_channel.set_funding_tx(dummy_tx);
2816 jit_channel.set_funding_tx_broadcast_safe(true);
2817 assert!(
2818 !jit_channel.should_broadcast_funding_transaction(),
2819 "Should not broadcast before any successful payment is claimed"
2820 );
2821
2822 let second_payment_hash = PaymentHash([99; 32]);
2823 let second_htlc = InterceptedHTLC {
2824 intercept_id: InterceptId([2; 32]),
2825 expected_outbound_amount_msat: min_fee_msat,
2826 payment_hash: second_payment_hash,
2827 };
2828 let action2 = jit_channel.htlc_intercepted(second_htlc).unwrap();
2829 let (forwarded_channel_id, fee_payment2) = match action2 {
2830 Some(HTLCInterceptedAction::ForwardPayment(cid, fp)) => (cid, fp),
2831 other => panic!("Expected ForwardPayment for second HTLC, got {:?}", other),
2832 };
2833 assert_eq!(forwarded_channel_id, channel_id);
2834 assert_eq!(fee_payment2.opening_fee_msat, min_fee_msat);
2835
2836 assert!(
2837 !jit_channel.should_broadcast_funding_transaction(),
2838 "Should not broadcast before any successful payment is claimed"
2839 );
2840
2841 let _ = jit_channel.payment_forwarded(min_fee_msat - 1).unwrap();
2843
2844 assert!(
2845 !jit_channel.should_broadcast_funding_transaction(),
2846 "Should not broadcast before all the fees are collected"
2847 );
2848
2849 let _ = jit_channel.payment_forwarded(min_fee_msat).unwrap();
2850
2851 let broadcast_allowed = jit_channel.should_broadcast_funding_transaction();
2852
2853 assert!(
2854 broadcast_allowed,
2855 "Broadcast was not allowed even though all the skimmed fees were collected"
2856 );
2857 }
2858}