1use alloc::boxed::Box;
13use alloc::string::{String, ToString};
14use alloc::vec::Vec;
15use lightning::util::persist::KVStore;
16
17use core::cmp::Ordering as CmpOrdering;
18use core::future::Future as StdFuture;
19use core::ops::Deref;
20use core::sync::atomic::{AtomicUsize, Ordering};
21use core::task;
22
23use crate::events::EventQueue;
24use crate::lsps0::ser::{
25 LSPSMessage, LSPSProtocolMessageHandler, LSPSRequestId, LSPSResponseError,
26 JSONRPC_INTERNAL_ERROR_ERROR_CODE, JSONRPC_INTERNAL_ERROR_ERROR_MESSAGE,
27 LSPS0_CLIENT_REJECTED_ERROR_CODE,
28};
29use crate::lsps2::event::LSPS2ServiceEvent;
30use crate::lsps2::payment_queue::{InterceptedHTLC, PaymentQueue};
31use crate::lsps2::utils::{
32 compute_opening_fee, is_expired_opening_fee_params, is_valid_opening_fee_params,
33};
34use crate::message_queue::{MessageQueue, MessageQueueNotifierGuard};
35use crate::persist::{
36 LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
37};
38use crate::prelude::hash_map::Entry;
39use crate::prelude::{new_hash_map, HashMap};
40use crate::sync::{Arc, Mutex, MutexGuard, RwLock};
41use crate::utils::async_poll::dummy_waker;
42
43use lightning::chain::chaininterface::BroadcasterInterface;
44use lightning::events::HTLCHandlingFailureType;
45use lightning::ln::channelmanager::{AChannelManager, FailureCode, InterceptId};
46use lightning::ln::msgs::{ErrorAction, LightningError};
47use lightning::ln::types::ChannelId;
48use lightning::util::errors::APIError;
49use lightning::util::logger::Level;
50use lightning::util::ser::Writeable;
51use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum};
52
53use lightning_types::payment::PaymentHash;
54
55use bitcoin::secp256k1::PublicKey;
56use bitcoin::Transaction;
57
58use crate::lsps2::msgs::{
59 LSPS2BuyRequest, LSPS2BuyResponse, LSPS2GetInfoRequest, LSPS2GetInfoResponse, LSPS2Message,
60 LSPS2OpeningFeeParams, LSPS2RawOpeningFeeParams, LSPS2Request, LSPS2Response,
61 LSPS2_BUY_REQUEST_INVALID_OPENING_FEE_PARAMS_ERROR_CODE,
62 LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_LARGE_ERROR_CODE,
63 LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_SMALL_ERROR_CODE,
64 LSPS2_GET_INFO_REQUEST_UNRECOGNIZED_OR_STALE_TOKEN_ERROR_CODE,
65};
66
67const MAX_PENDING_REQUESTS_PER_PEER: usize = 10;
68const MAX_TOTAL_PENDING_REQUESTS: usize = 1000;
69const MAX_TOTAL_PEERS: usize = 100000;
70
71#[derive(Clone, Debug)]
73pub struct LSPS2ServiceConfig {
74 pub promise_secret: [u8; 32],
78}
79
80#[derive(Clone, Debug, PartialEq)]
83struct OpenChannelParams {
84 opening_fee_msat: u64,
85 amt_to_forward_msat: u64,
86}
87
88#[derive(Clone, Debug, PartialEq)]
90struct FeePayment {
91 htlcs: Vec<InterceptedHTLC>,
92 opening_fee_msat: u64,
93}
94
95#[derive(Debug)]
96struct ChannelStateError(String);
97
98impl From<ChannelStateError> for LightningError {
99 fn from(value: ChannelStateError) -> Self {
100 LightningError { err: value.0, action: ErrorAction::IgnoreAndLog(Level::Info) }
101 }
102}
103
104#[derive(Debug, PartialEq)]
106enum HTLCInterceptedAction {
107 OpenChannel(OpenChannelParams),
109 ForwardHTLC(ChannelId),
111 ForwardPayment(ChannelId, FeePayment),
112}
113
114#[derive(Debug, PartialEq)]
116struct ForwardPaymentAction(ChannelId, FeePayment);
117
118#[derive(Debug, PartialEq)]
120struct ForwardHTLCsAction(ChannelId, Vec<InterceptedHTLC>);
121
122#[derive(Debug, Clone)]
123enum TrustModel {
124 ClientTrustsLsp { funding_tx_broadcast_safe: bool, funding_tx: Option<Transaction> },
125 LspTrustsClient,
126}
127
128impl TrustModel {
129 fn should_manually_broadcast(&self, state_is_payment_forward: bool) -> bool {
130 match self {
131 TrustModel::ClientTrustsLsp { funding_tx_broadcast_safe, funding_tx } => {
132 *funding_tx_broadcast_safe && state_is_payment_forward && funding_tx.is_some()
133 },
134 TrustModel::LspTrustsClient => false,
136 }
137 }
138
139 fn new(client_trusts_lsp: bool) -> Self {
140 if client_trusts_lsp {
141 TrustModel::ClientTrustsLsp { funding_tx_broadcast_safe: false, funding_tx: None }
142 } else {
143 TrustModel::LspTrustsClient
144 }
145 }
146
147 fn set_funding_tx(&mut self, funding_tx: Transaction) {
148 match self {
149 TrustModel::ClientTrustsLsp { funding_tx: tx, .. } => {
150 *tx = Some(funding_tx);
151 },
152 TrustModel::LspTrustsClient => {
153 },
155 }
156 }
157
158 fn set_funding_tx_broadcast_safe(&mut self, funding_tx_broadcast_safe: bool) {
159 match self {
160 TrustModel::ClientTrustsLsp { funding_tx_broadcast_safe: safe, .. } => {
161 *safe = funding_tx_broadcast_safe;
162 },
163 TrustModel::LspTrustsClient => {
164 },
166 }
167 }
168
169 fn get_funding_tx(&self) -> Option<&Transaction> {
170 match self {
171 TrustModel::ClientTrustsLsp { funding_tx, .. } => funding_tx.as_ref(),
172 _ => None,
173 }
174 }
175
176 fn is_client_trusts_lsp(&self) -> bool {
177 match self {
178 TrustModel::ClientTrustsLsp { .. } => true,
179 TrustModel::LspTrustsClient => false,
180 }
181 }
182}
183
184impl_writeable_tlv_based_enum!(TrustModel,
185 (0, ClientTrustsLsp) => {
186 (0, funding_tx_broadcast_safe, required),
187 (2, funding_tx, option),
188 },
189 (2, LspTrustsClient) => {
190 },
191);
192
193#[derive(Debug)]
195enum OutboundJITChannelState {
196 PendingInitialPayment { payment_queue: PaymentQueue },
199 PendingChannelOpen { payment_queue: PaymentQueue, opening_fee_msat: u64 },
202 PendingPaymentForward {
206 payment_queue: PaymentQueue,
207 opening_fee_msat: u64,
208 channel_id: ChannelId,
209 },
210 PendingPayment { payment_queue: PaymentQueue, opening_fee_msat: u64, channel_id: ChannelId },
215 PaymentForwarded { channel_id: ChannelId },
218}
219
220impl OutboundJITChannelState {
221 fn new() -> Self {
222 OutboundJITChannelState::PendingInitialPayment { payment_queue: PaymentQueue::new() }
223 }
224
225 fn htlc_intercepted(
226 &mut self, opening_fee_params: &LSPS2OpeningFeeParams, payment_size_msat: &Option<u64>,
227 htlc: InterceptedHTLC,
228 ) -> Result<Option<HTLCInterceptedAction>, ChannelStateError> {
229 match self {
230 OutboundJITChannelState::PendingInitialPayment { payment_queue } => {
231 let (total_expected_outbound_amount_msat, num_htlcs) = payment_queue.add_htlc(htlc);
232
233 let (expected_payment_size_msat, mpp_mode) =
234 if let Some(payment_size_msat) = payment_size_msat {
235 (*payment_size_msat, true)
236 } else {
237 debug_assert_eq!(num_htlcs, 1);
238 if num_htlcs != 1 {
239 return Err(ChannelStateError(
240 "Paying via multiple HTLCs is disallowed in \"no-MPP+var-invoice\" mode.".to_string()
241 ));
242 }
243 (total_expected_outbound_amount_msat, false)
244 };
245
246 if expected_payment_size_msat < opening_fee_params.min_payment_size_msat
247 || expected_payment_size_msat > opening_fee_params.max_payment_size_msat
248 {
249 return Err(ChannelStateError(
250 format!("Payment size violates our limits: expected_payment_size_msat = {}, min_payment_size_msat = {}, max_payment_size_msat = {}",
251 expected_payment_size_msat,
252 opening_fee_params.min_payment_size_msat,
253 opening_fee_params.max_payment_size_msat
254 )));
255 }
256
257 let opening_fee_msat = compute_opening_fee(
258 expected_payment_size_msat,
259 opening_fee_params.min_fee_msat,
260 opening_fee_params.proportional.into(),
261 ).ok_or(ChannelStateError(
262 format!("Could not compute valid opening fee with min_fee_msat = {}, proportional = {}, and expected_payment_size_msat = {}",
263 opening_fee_params.min_fee_msat,
264 opening_fee_params.proportional,
265 expected_payment_size_msat
266 ))
267 )?;
268
269 let amt_to_forward_msat =
270 expected_payment_size_msat.saturating_sub(opening_fee_msat);
271
272 if total_expected_outbound_amount_msat >= expected_payment_size_msat
274 && amt_to_forward_msat > 0
275 {
276 *self = OutboundJITChannelState::PendingChannelOpen {
277 payment_queue: core::mem::take(payment_queue),
278 opening_fee_msat,
279 };
280 let open_channel = HTLCInterceptedAction::OpenChannel(OpenChannelParams {
281 opening_fee_msat,
282 amt_to_forward_msat,
283 });
284 Ok(Some(open_channel))
285 } else {
286 if mpp_mode {
287 *self = OutboundJITChannelState::PendingInitialPayment {
288 payment_queue: core::mem::take(payment_queue),
289 };
290 Ok(None)
291 } else {
292 Err(ChannelStateError(
293 "Intercepted HTLC is too small to pay opening fee".to_string(),
294 ))
295 }
296 }
297 },
298 OutboundJITChannelState::PendingChannelOpen { payment_queue, opening_fee_msat } => {
299 let mut payment_queue = core::mem::take(payment_queue);
300 payment_queue.add_htlc(htlc);
301 *self = OutboundJITChannelState::PendingChannelOpen {
302 payment_queue,
303 opening_fee_msat: *opening_fee_msat,
304 };
305 Ok(None)
306 },
307 OutboundJITChannelState::PendingPaymentForward {
308 payment_queue,
309 opening_fee_msat,
310 channel_id,
311 } => {
312 let mut payment_queue = core::mem::take(payment_queue);
313 payment_queue.add_htlc(htlc);
314 *self = OutboundJITChannelState::PendingPaymentForward {
315 payment_queue,
316 opening_fee_msat: *opening_fee_msat,
317 channel_id: *channel_id,
318 };
319 Ok(None)
320 },
321 OutboundJITChannelState::PendingPayment {
322 payment_queue,
323 opening_fee_msat,
324 channel_id,
325 } => {
326 let mut payment_queue = core::mem::take(payment_queue);
327 payment_queue.add_htlc(htlc);
328 if let Some(entry) = payment_queue.pop_greater_than_msat(*opening_fee_msat) {
329 let forward_payment = HTLCInterceptedAction::ForwardPayment(
330 *channel_id,
331 FeePayment { htlcs: entry.htlcs, opening_fee_msat: *opening_fee_msat },
332 );
333 *self = OutboundJITChannelState::PendingPaymentForward {
334 payment_queue,
335 opening_fee_msat: *opening_fee_msat,
336 channel_id: *channel_id,
337 };
338 Ok(Some(forward_payment))
339 } else {
340 *self = OutboundJITChannelState::PendingPayment {
341 payment_queue,
342 opening_fee_msat: *opening_fee_msat,
343 channel_id: *channel_id,
344 };
345 Ok(None)
346 }
347 },
348 OutboundJITChannelState::PaymentForwarded { channel_id } => {
349 let forward = HTLCInterceptedAction::ForwardHTLC(*channel_id);
350 *self = OutboundJITChannelState::PaymentForwarded { channel_id: *channel_id };
351 Ok(Some(forward))
352 },
353 }
354 }
355
356 fn channel_ready(
357 &mut self, channel_id: ChannelId,
358 ) -> Result<ForwardPaymentAction, ChannelStateError> {
359 match self {
360 OutboundJITChannelState::PendingChannelOpen { payment_queue, opening_fee_msat } => {
361 if let Some(entry) = payment_queue.pop_greater_than_msat(*opening_fee_msat) {
362 let forward_payment = ForwardPaymentAction(
363 channel_id,
364 FeePayment { htlcs: entry.htlcs, opening_fee_msat: *opening_fee_msat },
365 );
366 *self = OutboundJITChannelState::PendingPaymentForward {
367 payment_queue: core::mem::take(payment_queue),
368 opening_fee_msat: *opening_fee_msat,
369 channel_id,
370 };
371 Ok(forward_payment)
372 } else {
373 return Err(ChannelStateError(
374 "No forwardable payment available when moving to channel ready."
375 .to_string(),
376 ));
377 }
378 },
379 state => Err(ChannelStateError(format!(
380 "Channel ready received when JIT Channel was in state: {:?}",
381 state
382 ))),
383 }
384 }
385
386 fn htlc_handling_failed(&mut self) -> Result<Option<ForwardPaymentAction>, ChannelStateError> {
387 match self {
388 OutboundJITChannelState::PendingPaymentForward {
389 payment_queue,
390 opening_fee_msat,
391 channel_id,
392 } => {
393 if let Some(entry) = payment_queue.pop_greater_than_msat(*opening_fee_msat) {
394 let forward_payment = ForwardPaymentAction(
395 *channel_id,
396 FeePayment { htlcs: entry.htlcs, opening_fee_msat: *opening_fee_msat },
397 );
398 *self = OutboundJITChannelState::PendingPaymentForward {
399 payment_queue: core::mem::take(payment_queue),
400 opening_fee_msat: *opening_fee_msat,
401 channel_id: *channel_id,
402 };
403 Ok(Some(forward_payment))
404 } else {
405 *self = OutboundJITChannelState::PendingPayment {
406 payment_queue: core::mem::take(payment_queue),
407 opening_fee_msat: *opening_fee_msat,
408 channel_id: *channel_id,
409 };
410 Ok(None)
411 }
412 },
413 OutboundJITChannelState::PendingPayment {
414 payment_queue,
415 opening_fee_msat,
416 channel_id,
417 } => {
418 *self = OutboundJITChannelState::PendingPayment {
419 payment_queue: core::mem::take(payment_queue),
420 opening_fee_msat: *opening_fee_msat,
421 channel_id: *channel_id,
422 };
423 Ok(None)
424 },
425 OutboundJITChannelState::PaymentForwarded { channel_id } => {
426 *self = OutboundJITChannelState::PaymentForwarded { channel_id: *channel_id };
427 Ok(None)
428 },
429 state => Err(ChannelStateError(format!(
430 "HTLC handling failed when JIT Channel was in state: {:?}",
431 state
432 ))),
433 }
434 }
435
436 fn payment_forwarded(
437 &mut self, skimmed_fee_msat: u64,
438 ) -> Result<Option<ForwardHTLCsAction>, ChannelStateError> {
439 match self {
440 OutboundJITChannelState::PendingPaymentForward {
441 payment_queue,
442 channel_id,
443 opening_fee_msat,
444 } => {
445 if skimmed_fee_msat >= *opening_fee_msat {
446 let mut pq = core::mem::take(payment_queue);
447 let forward_htlcs = ForwardHTLCsAction(*channel_id, pq.clear());
448 *self = OutboundJITChannelState::PaymentForwarded { channel_id: *channel_id };
449 Ok(Some(forward_htlcs))
450 } else {
451 *self = OutboundJITChannelState::PendingPaymentForward {
452 payment_queue: core::mem::take(payment_queue),
453 opening_fee_msat: *opening_fee_msat,
454 channel_id: *channel_id,
455 };
456 Ok(None)
457 }
458 },
459 OutboundJITChannelState::PaymentForwarded { channel_id } => {
460 *self = OutboundJITChannelState::PaymentForwarded { channel_id: *channel_id };
461 Ok(None)
462 },
463 state => Err(ChannelStateError(format!(
464 "Payment forwarded when JIT Channel was in state: {:?}",
465 state
466 ))),
467 }
468 }
469}
470
471impl_writeable_tlv_based_enum!(OutboundJITChannelState,
472 (0, PendingInitialPayment) => {
473 (0, payment_queue, required),
474 },
475 (2, PendingChannelOpen) => {
476 (0, payment_queue, required),
477 (2, opening_fee_msat, required),
478 },
479 (4, PendingPaymentForward) => {
480 (0, payment_queue, required),
481 (2, opening_fee_msat, required),
482 (4, channel_id, required),
483 },
484 (6, PendingPayment) => {
485 (0, payment_queue, required),
486 (2, opening_fee_msat, required),
487 (4, channel_id, required),
488 },
489 (8, PaymentForwarded) => {
490 (0, channel_id, required),
491 },
492);
493
494struct OutboundJITChannel {
495 state: OutboundJITChannelState,
496 user_channel_id: u128,
497 opening_fee_params: LSPS2OpeningFeeParams,
498 payment_size_msat: Option<u64>,
499 trust_model: TrustModel,
500}
501
502impl_writeable_tlv_based!(OutboundJITChannel, {
503 (0, state, required),
504 (2, user_channel_id, required),
505 (4, opening_fee_params, required),
506 (6, payment_size_msat, option),
507 (8, trust_model, required),
508});
509
510impl OutboundJITChannel {
511 fn new(
512 payment_size_msat: Option<u64>, opening_fee_params: LSPS2OpeningFeeParams,
513 user_channel_id: u128, client_trusts_lsp: bool,
514 ) -> Self {
515 Self {
516 user_channel_id,
517 state: OutboundJITChannelState::new(),
518 opening_fee_params,
519 payment_size_msat,
520 trust_model: TrustModel::new(client_trusts_lsp),
521 }
522 }
523
524 fn htlc_intercepted(
525 &mut self, htlc: InterceptedHTLC,
526 ) -> Result<Option<HTLCInterceptedAction>, LightningError> {
527 let action =
528 self.state.htlc_intercepted(&self.opening_fee_params, &self.payment_size_msat, htlc)?;
529 Ok(action)
530 }
531
532 fn htlc_handling_failed(&mut self) -> Result<Option<ForwardPaymentAction>, LightningError> {
533 let action = self.state.htlc_handling_failed()?;
534 Ok(action)
535 }
536
537 fn channel_ready(
538 &mut self, channel_id: ChannelId,
539 ) -> Result<ForwardPaymentAction, LightningError> {
540 let action = self.state.channel_ready(channel_id)?;
541 Ok(action)
542 }
543
544 fn payment_forwarded(
545 &mut self, skimmed_fee_msat: u64,
546 ) -> Result<Option<ForwardHTLCsAction>, LightningError> {
547 let action = self.state.payment_forwarded(skimmed_fee_msat)?;
548 Ok(action)
549 }
550
551 fn is_pending_initial_payment(&self) -> bool {
552 matches!(self.state, OutboundJITChannelState::PendingInitialPayment { .. })
553 }
554
555 fn is_prunable(&self) -> bool {
556 let is_expired = is_expired_opening_fee_params(&self.opening_fee_params);
559 self.is_pending_initial_payment() && is_expired
560 }
561
562 fn set_funding_tx(&mut self, funding_tx: Transaction) {
563 self.trust_model.set_funding_tx(funding_tx);
564 }
565
566 fn set_funding_tx_broadcast_safe(&mut self, funding_tx_broadcast_safe: bool) {
567 self.trust_model.set_funding_tx_broadcast_safe(funding_tx_broadcast_safe);
568 }
569
570 fn should_broadcast_funding_transaction(&self) -> bool {
571 self.trust_model.should_manually_broadcast(matches!(
572 self.state,
573 OutboundJITChannelState::PaymentForwarded { .. }
574 ))
575 }
576
577 fn get_channel_id(&self) -> Option<ChannelId> {
578 match self.state {
579 OutboundJITChannelState::PaymentForwarded { channel_id } => Some(channel_id),
580 _ => None,
581 }
582 }
583
584 fn get_funding_tx(&self) -> Option<&Transaction> {
585 self.trust_model.get_funding_tx()
586 }
587
588 fn is_client_trusts_lsp(&self) -> bool {
589 self.trust_model.is_client_trusts_lsp()
590 }
591}
592
593pub(crate) struct PeerState {
594 outbound_channels_by_intercept_scid: HashMap<u64, OutboundJITChannel>,
595 intercept_scid_by_user_channel_id: HashMap<u128, u64>,
596 intercept_scid_by_channel_id: HashMap<ChannelId, u64>,
597 pending_requests: HashMap<LSPSRequestId, LSPS2Request>,
598 needs_persist: bool,
599}
600
601impl PeerState {
602 fn new() -> Self {
603 let outbound_channels_by_intercept_scid = new_hash_map();
604 let pending_requests = new_hash_map();
605 let intercept_scid_by_user_channel_id = new_hash_map();
606 let intercept_scid_by_channel_id = new_hash_map();
607 let needs_persist = false;
608 Self {
609 outbound_channels_by_intercept_scid,
610 pending_requests,
611 intercept_scid_by_user_channel_id,
612 intercept_scid_by_channel_id,
613 needs_persist,
614 }
615 }
616
617 fn insert_outbound_channel(&mut self, intercept_scid: u64, channel: OutboundJITChannel) {
618 self.outbound_channels_by_intercept_scid.insert(intercept_scid, channel);
619 self.needs_persist |= true;
620 }
621
622 fn prune_pending_requests(&mut self) {
623 self.pending_requests.retain(|_, entry| {
624 match entry {
625 LSPS2Request::GetInfo(_) => false,
626 LSPS2Request::Buy(request) => {
627 !is_expired_opening_fee_params(&request.opening_fee_params)
629 },
630 }
631 });
632 }
633
634 fn prune_expired_request_state(&mut self) {
635 self.outbound_channels_by_intercept_scid.retain(|intercept_scid, entry| {
636 if entry.is_prunable() {
637 self.intercept_scid_by_channel_id.retain(|_, iscid| intercept_scid != iscid);
639 self.intercept_scid_by_user_channel_id.retain(|_, iscid| intercept_scid != iscid);
640 self.needs_persist |= true;
641 return false;
642 }
643 true
644 });
645 }
646
647 fn pending_requests_and_channels(&self) -> usize {
648 let pending_requests = self.pending_requests.len();
649 let pending_outbound_channels = self
650 .outbound_channels_by_intercept_scid
651 .iter()
652 .filter(|(_, v)| v.is_pending_initial_payment())
653 .count();
654 pending_requests + pending_outbound_channels
655 }
656
657 fn is_prunable(&self) -> bool {
658 self.pending_requests.is_empty() && self.outbound_channels_by_intercept_scid.is_empty()
660 }
661}
662
663impl_writeable_tlv_based!(PeerState, {
664 (0, outbound_channels_by_intercept_scid, required),
665 (2, intercept_scid_by_user_channel_id, required),
666 (4, intercept_scid_by_channel_id, required),
667 (_unused, pending_requests, (static_value, new_hash_map())),
668 (_unused, needs_persist, (static_value, false)),
669});
670
671macro_rules! get_or_insert_peer_state_entry {
672 ($self: ident, $outer_state_lock: expr, $message_queue_notifier: expr, $counterparty_node_id: expr) => {{
673 let is_limited_by_max_total_peers = $outer_state_lock.len() >= MAX_TOTAL_PEERS;
675 match $outer_state_lock.entry(*$counterparty_node_id) {
676 Entry::Vacant(e) => {
677 if is_limited_by_max_total_peers {
678 let error_response = LSPSResponseError {
679 code: JSONRPC_INTERNAL_ERROR_ERROR_CODE,
680 message: JSONRPC_INTERNAL_ERROR_ERROR_MESSAGE.to_string(), data: None,
681 };
682
683 let msg = LSPSMessage::Invalid(error_response);
684 $message_queue_notifier.enqueue($counterparty_node_id, msg);
685
686 let err = format!(
687 "Dropping request from peer {} due to reaching maximally allowed number of total peers: {}",
688 $counterparty_node_id, MAX_TOTAL_PEERS
689 );
690
691 return Err(LightningError { err, action: ErrorAction::IgnoreAndLog(Level::Error) });
692 } else {
693 e.insert(Mutex::new(PeerState::new()))
694 }
695 }
696 Entry::Occupied(e) => {
697 e.into_mut()
698 }
699 }
700
701 }}
702}
703
704pub struct LSPS2ServiceHandler<CM: Deref, K: Deref + Clone, T: Deref>
706where
707 CM::Target: AChannelManager,
708 K::Target: KVStore,
709 T::Target: BroadcasterInterface,
710{
711 channel_manager: CM,
712 kv_store: K,
713 tx_broadcaster: T,
714 pending_messages: Arc<MessageQueue>,
715 pending_events: Arc<EventQueue<K>>,
716 per_peer_state: RwLock<HashMap<PublicKey, Mutex<PeerState>>>,
717 peer_by_intercept_scid: RwLock<HashMap<u64, PublicKey>>,
718 peer_by_channel_id: RwLock<HashMap<ChannelId, PublicKey>>,
719 total_pending_requests: AtomicUsize,
720 config: LSPS2ServiceConfig,
721 persistence_in_flight: AtomicUsize,
722}
723
724impl<CM: Deref, K: Deref + Clone, T: Deref + Clone> LSPS2ServiceHandler<CM, K, T>
725where
726 CM::Target: AChannelManager,
727 K::Target: KVStore,
728 T::Target: BroadcasterInterface,
729{
730 pub(crate) fn new(
732 per_peer_state: HashMap<PublicKey, Mutex<PeerState>>, pending_messages: Arc<MessageQueue>,
733 pending_events: Arc<EventQueue<K>>, channel_manager: CM, kv_store: K, tx_broadcaster: T,
734 config: LSPS2ServiceConfig,
735 ) -> Result<Self, lightning::io::Error> {
736 let mut peer_by_intercept_scid = new_hash_map();
737 let mut peer_by_channel_id = new_hash_map();
738 for (node_id, peer_state) in per_peer_state.iter() {
739 let peer_state_lock = peer_state.lock().unwrap();
740 for (intercept_scid, _) in peer_state_lock.outbound_channels_by_intercept_scid.iter() {
741 let res = peer_by_intercept_scid.insert(*intercept_scid, *node_id);
742 debug_assert!(res.is_none(), "Intercept SCIDs should never collide");
743 if res.is_some() {
744 return Err(lightning::io::Error::new(
745 lightning::io::ErrorKind::InvalidData,
746 "Failed to read LSPS2 peer state due to data inconsistencies: Intercept SCIDs should never collide",
747 ));
748 }
749 }
750
751 for (channel_id, _) in peer_state_lock.intercept_scid_by_channel_id.iter() {
752 let res = peer_by_channel_id.insert(*channel_id, *node_id);
753 debug_assert!(res.is_none(), "Channel IDs should never collide");
754 if res.is_some() {
755 return Err(lightning::io::Error::new(
756 lightning::io::ErrorKind::InvalidData,
757 "Failed to read LSPS2 peer state due to data inconsistencies: Channel IDs should never collide",
758 ));
759 }
760 }
761 }
762
763 Ok(Self {
764 pending_messages,
765 pending_events,
766 per_peer_state: RwLock::new(per_peer_state),
767 peer_by_intercept_scid: RwLock::new(peer_by_intercept_scid),
768 peer_by_channel_id: RwLock::new(peer_by_channel_id),
769 total_pending_requests: AtomicUsize::new(0),
770 persistence_in_flight: AtomicUsize::new(0),
771 channel_manager,
772 kv_store,
773 tx_broadcaster,
774 config,
775 })
776 }
777
778 pub fn config(&self) -> &LSPS2ServiceConfig {
780 &self.config
781 }
782
783 pub(crate) fn has_active_requests(&self, counterparty_node_id: &PublicKey) -> bool {
785 let outer_state_lock = self.per_peer_state.read().unwrap();
786 outer_state_lock.get(counterparty_node_id).map_or(false, |inner| {
787 let peer_state = inner.lock().unwrap();
788 !peer_state.outbound_channels_by_intercept_scid.is_empty()
789 })
790 }
791
792 pub fn invalid_token_provided(
798 &self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId,
799 ) -> Result<(), APIError> {
800 let mut message_queue_notifier = self.pending_messages.notifier();
801
802 let outer_state_lock = self.per_peer_state.read().unwrap();
803
804 match outer_state_lock.get(counterparty_node_id) {
805 Some(inner_state_lock) => {
806 let mut peer_state_lock = inner_state_lock.lock().unwrap();
807
808 match self.remove_pending_request(&mut peer_state_lock, &request_id) {
809 Some(LSPS2Request::GetInfo(_)) => {
810 let response = LSPS2Response::GetInfoError(LSPSResponseError {
811 code: LSPS2_GET_INFO_REQUEST_UNRECOGNIZED_OR_STALE_TOKEN_ERROR_CODE,
812 message: "an unrecognized or stale token was provided".to_string(),
813 data: None,
814 });
815 let msg = LSPS2Message::Response(request_id, response).into();
816 message_queue_notifier.enqueue(counterparty_node_id, msg);
817 Ok(())
818 },
819 _ => Err(APIError::APIMisuseError {
820 err: format!(
821 "No pending get_info request for request_id: {:?}",
822 request_id
823 ),
824 }),
825 }
826 },
827 None => Err(APIError::APIMisuseError {
828 err: format!("No state for the counterparty exists: {}", counterparty_node_id),
829 }),
830 }
831 }
832
833 pub fn opening_fee_params_generated(
839 &self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId,
840 opening_fee_params_menu: Vec<LSPS2RawOpeningFeeParams>,
841 ) -> Result<(), APIError> {
842 let mut message_queue_notifier = self.pending_messages.notifier();
843
844 let outer_state_lock = self.per_peer_state.read().unwrap();
845
846 match outer_state_lock.get(counterparty_node_id) {
847 Some(inner_state_lock) => {
848 let mut peer_state_lock = inner_state_lock.lock().unwrap();
849
850 match self.remove_pending_request(&mut peer_state_lock, &request_id) {
851 Some(LSPS2Request::GetInfo(_)) => {
852 let mut opening_fee_params_menu: Vec<LSPS2OpeningFeeParams> =
853 opening_fee_params_menu
854 .into_iter()
855 .map(|param| {
856 param.into_opening_fee_params(
857 &self.config.promise_secret,
858 counterparty_node_id,
859 )
860 })
861 .collect();
862 opening_fee_params_menu.sort_by(|a, b| {
863 match a.min_fee_msat.cmp(&b.min_fee_msat) {
864 CmpOrdering::Equal => a.proportional.cmp(&b.proportional),
865 other => other,
866 }
867 });
868 let response = LSPS2Response::GetInfo(LSPS2GetInfoResponse {
869 opening_fee_params_menu,
870 });
871 let msg = LSPS2Message::Response(request_id, response).into();
872 message_queue_notifier.enqueue(counterparty_node_id, msg);
873 Ok(())
874 },
875 _ => Err(APIError::APIMisuseError {
876 err: format!(
877 "No pending get_info request for request_id: {:?}",
878 request_id
879 ),
880 }),
881 }
882 },
883 None => Err(APIError::APIMisuseError {
884 err: format!("No state for the counterparty exists: {}", counterparty_node_id),
885 }),
886 }
887 }
888
889 pub async fn invoice_parameters_generated(
910 &self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId, intercept_scid: u64,
911 cltv_expiry_delta: u32, client_trusts_lsp: bool, user_channel_id: u128,
912 ) -> Result<(), APIError> {
913 let mut message_queue_notifier = self.pending_messages.notifier();
914 let mut should_persist = false;
915
916 match self.per_peer_state.read().unwrap().get(counterparty_node_id) {
917 Some(inner_state_lock) => {
918 let mut peer_state_lock = inner_state_lock.lock().unwrap();
919
920 match self.remove_pending_request(&mut peer_state_lock, &request_id) {
921 Some(LSPS2Request::Buy(buy_request)) => {
922 {
923 let mut peer_by_intercept_scid =
924 self.peer_by_intercept_scid.write().unwrap();
925 peer_by_intercept_scid.insert(intercept_scid, *counterparty_node_id);
926 }
927
928 let outbound_jit_channel = OutboundJITChannel::new(
929 buy_request.payment_size_msat,
930 buy_request.opening_fee_params,
931 user_channel_id,
932 client_trusts_lsp,
933 );
934
935 peer_state_lock
936 .intercept_scid_by_user_channel_id
937 .insert(user_channel_id, intercept_scid);
938 peer_state_lock
939 .insert_outbound_channel(intercept_scid, outbound_jit_channel);
940 should_persist |= peer_state_lock.needs_persist;
941
942 let response = LSPS2Response::Buy(LSPS2BuyResponse {
943 jit_channel_scid: intercept_scid.into(),
944 lsp_cltv_expiry_delta: cltv_expiry_delta,
945 client_trusts_lsp,
946 });
947 let msg = LSPS2Message::Response(request_id, response).into();
948 message_queue_notifier.enqueue(counterparty_node_id, msg);
949 },
950 _ => {
951 return Err(APIError::APIMisuseError {
952 err: format!("No pending buy request for request_id: {:?}", request_id),
953 })
954 },
955 }
956 },
957 None => {
958 return Err(APIError::APIMisuseError {
959 err: format!("No state for the counterparty exists: {}", counterparty_node_id),
960 })
961 },
962 };
963
964 if should_persist {
965 self.persist_peer_state(*counterparty_node_id).await.map_err(|e| {
966 APIError::APIMisuseError {
967 err: format!(
968 "Failed to persist peer state for {}: {}",
969 counterparty_node_id, e
970 ),
971 }
972 })?;
973 }
974
975 Ok(())
976 }
977
978 pub async fn htlc_intercepted(
991 &self, intercept_scid: u64, intercept_id: InterceptId, expected_outbound_amount_msat: u64,
992 payment_hash: PaymentHash,
993 ) -> Result<(), APIError> {
994 let event_queue_notifier = self.pending_events.notifier();
995 let mut should_persist = None;
996
997 if let Some(counterparty_node_id) =
998 self.peer_by_intercept_scid.read().unwrap().get(&intercept_scid)
999 {
1000 let outer_state_lock = self.per_peer_state.read().unwrap();
1001 match outer_state_lock.get(counterparty_node_id) {
1002 Some(inner_state_lock) => {
1003 let mut peer_state = inner_state_lock.lock().unwrap();
1004 if let Some(jit_channel) =
1005 peer_state.outbound_channels_by_intercept_scid.get_mut(&intercept_scid)
1006 {
1007 should_persist = Some(*counterparty_node_id);
1008 let htlc = InterceptedHTLC {
1009 intercept_id,
1010 expected_outbound_amount_msat,
1011 payment_hash,
1012 };
1013 match jit_channel.htlc_intercepted(htlc) {
1014 Ok(Some(HTLCInterceptedAction::OpenChannel(open_channel_params))) => {
1015 let event = LSPS2ServiceEvent::OpenChannel {
1016 their_network_key: counterparty_node_id.clone(),
1017 amt_to_forward_msat: open_channel_params.amt_to_forward_msat,
1018 opening_fee_msat: open_channel_params.opening_fee_msat,
1019 user_channel_id: jit_channel.user_channel_id,
1020 intercept_scid,
1021 };
1022 event_queue_notifier.enqueue(event);
1023 },
1024 Ok(Some(HTLCInterceptedAction::ForwardHTLC(channel_id))) => {
1025 self.channel_manager.get_cm().forward_intercepted_htlc(
1026 intercept_id,
1027 &channel_id,
1028 *counterparty_node_id,
1029 expected_outbound_amount_msat,
1030 )?;
1031 },
1032 Ok(Some(HTLCInterceptedAction::ForwardPayment(
1033 channel_id,
1034 FeePayment { opening_fee_msat, htlcs },
1035 ))) => {
1036 let amounts_to_forward_msat =
1037 calculate_amount_to_forward_per_htlc(&htlcs, opening_fee_msat);
1038
1039 for (intercept_id, amount_to_forward_msat) in
1040 amounts_to_forward_msat
1041 {
1042 self.channel_manager.get_cm().forward_intercepted_htlc(
1043 intercept_id,
1044 &channel_id,
1045 *counterparty_node_id,
1046 amount_to_forward_msat,
1047 )?;
1048 }
1049 },
1050 Ok(None) => {},
1051 Err(e) => {
1052 self.channel_manager
1053 .get_cm()
1054 .fail_intercepted_htlc(intercept_id)?;
1055 peer_state
1056 .outbound_channels_by_intercept_scid
1057 .remove(&intercept_scid);
1058 return Err(APIError::APIMisuseError { err: e.err });
1060 },
1061 }
1062 }
1063
1064 peer_state.needs_persist |= should_persist.is_some();
1065 },
1066 None => {
1067 return Err(APIError::APIMisuseError {
1068 err: format!("No counterparty found for scid: {}", intercept_scid),
1069 });
1070 },
1071 }
1072 }
1073
1074 if let Some(counterparty_node_id) = should_persist {
1075 self.persist_peer_state(counterparty_node_id).await.map_err(|e| {
1076 APIError::APIMisuseError {
1077 err: format!(
1078 "Failed to persist peer state for {}: {}",
1079 counterparty_node_id, e
1080 ),
1081 }
1082 })?;
1083 }
1084
1085 Ok(())
1086 }
1087
1088 pub async fn htlc_handling_failed(
1096 &self, failure_type: HTLCHandlingFailureType,
1097 ) -> Result<(), APIError> {
1098 let mut should_persist = None;
1099 if let HTLCHandlingFailureType::Forward { channel_id, .. } = failure_type {
1100 let peer_by_channel_id = self.peer_by_channel_id.read().unwrap();
1101 if let Some(counterparty_node_id) = peer_by_channel_id.get(&channel_id) {
1102 let outer_state_lock = self.per_peer_state.read().unwrap();
1103 match outer_state_lock.get(counterparty_node_id) {
1104 Some(inner_state_lock) => {
1105 let mut peer_state = inner_state_lock.lock().unwrap();
1106 if let Some(intercept_scid) =
1107 peer_state.intercept_scid_by_channel_id.get(&channel_id).copied()
1108 {
1109 should_persist = Some(*counterparty_node_id);
1110
1111 if let Some(jit_channel) = peer_state
1112 .outbound_channels_by_intercept_scid
1113 .get_mut(&intercept_scid)
1114 {
1115 match jit_channel.htlc_handling_failed() {
1116 Ok(Some(ForwardPaymentAction(
1117 channel_id,
1118 FeePayment { opening_fee_msat, htlcs },
1119 ))) => {
1120 let amounts_to_forward_msat =
1121 calculate_amount_to_forward_per_htlc(
1122 &htlcs,
1123 opening_fee_msat,
1124 );
1125
1126 for (intercept_id, amount_to_forward_msat) in
1127 amounts_to_forward_msat
1128 {
1129 self.channel_manager
1130 .get_cm()
1131 .forward_intercepted_htlc(
1132 intercept_id,
1133 &channel_id,
1134 *counterparty_node_id,
1135 amount_to_forward_msat,
1136 )?;
1137 }
1138 },
1139 Ok(None) => {},
1140 Err(e) => {
1141 return Err(APIError::APIMisuseError {
1142 err: format!("Unable to fail HTLC: {}.", e.err),
1143 });
1144 },
1145 }
1146 }
1147 }
1148 peer_state.needs_persist |= should_persist.is_some();
1149 },
1150 None => {},
1151 }
1152 }
1153 }
1154
1155 if let Some(counterparty_node_id) = should_persist {
1156 self.persist_peer_state(counterparty_node_id).await.map_err(|e| {
1157 APIError::APIMisuseError {
1158 err: format!(
1159 "Failed to persist peer state for {}: {}",
1160 counterparty_node_id, e
1161 ),
1162 }
1163 })?;
1164 }
1165
1166 Ok(())
1167 }
1168
1169 pub async fn payment_forwarded(
1184 &self, next_channel_id: ChannelId, skimmed_fee_msat: u64,
1185 ) -> Result<(), APIError> {
1186 let mut should_persist = None;
1187 if let Some(counterparty_node_id) =
1188 self.peer_by_channel_id.read().unwrap().get(&next_channel_id)
1189 {
1190 let outer_state_lock = self.per_peer_state.read().unwrap();
1191 match outer_state_lock.get(counterparty_node_id) {
1192 Some(inner_state_lock) => {
1193 let mut peer_state = inner_state_lock.lock().unwrap();
1194 if let Some(intercept_scid) =
1195 peer_state.intercept_scid_by_channel_id.get(&next_channel_id).copied()
1196 {
1197 should_persist = Some(*counterparty_node_id);
1198
1199 if let Some(jit_channel) =
1200 peer_state.outbound_channels_by_intercept_scid.get_mut(&intercept_scid)
1201 {
1202 match jit_channel.payment_forwarded(skimmed_fee_msat) {
1203 Ok(Some(ForwardHTLCsAction(channel_id, htlcs))) => {
1204 for htlc in htlcs {
1205 self.channel_manager.get_cm().forward_intercepted_htlc(
1206 htlc.intercept_id,
1207 &channel_id,
1208 *counterparty_node_id,
1209 htlc.expected_outbound_amount_msat,
1210 )?;
1211 }
1212 },
1213 Ok(None) => {},
1214 Err(e) => {
1215 return Err(APIError::APIMisuseError {
1216 err: format!(
1217 "Forwarded payment was not applicable for JIT channel: {}",
1218 e.err
1219 ),
1220 })
1221 },
1222 }
1223
1224 self.broadcast_funding_transaction_if_applies(jit_channel);
1225 }
1226 } else {
1227 return Err(APIError::APIMisuseError {
1228 err: format!("No state for for channel id: {}", next_channel_id),
1229 });
1230 }
1231 peer_state.needs_persist |= should_persist.is_some();
1232 },
1233 None => {
1234 return Err(APIError::APIMisuseError {
1235 err: format!("No counterparty state for: {}", counterparty_node_id),
1236 });
1237 },
1238 }
1239 }
1240
1241 if let Some(counterparty_node_id) = should_persist {
1242 self.persist_peer_state(counterparty_node_id).await.map_err(|e| {
1243 APIError::APIMisuseError {
1244 err: format!(
1245 "Failed to persist peer state for {}: {}",
1246 counterparty_node_id, e
1247 ),
1248 }
1249 })?;
1250 }
1251
1252 Ok(())
1253 }
1254
1255 pub async fn channel_open_abandoned(
1271 &self, counterparty_node_id: &PublicKey, user_channel_id: u128,
1272 ) -> Result<(), APIError> {
1273 {
1274 let outer_state_lock = self.per_peer_state.read().unwrap();
1275 let inner_state_lock = outer_state_lock.get(counterparty_node_id).ok_or_else(|| {
1276 APIError::APIMisuseError {
1277 err: format!("No counterparty state for: {}", counterparty_node_id),
1278 }
1279 })?;
1280 let mut peer_state = inner_state_lock.lock().unwrap();
1281
1282 let intercept_scid = peer_state
1283 .intercept_scid_by_user_channel_id
1284 .get(&user_channel_id)
1285 .copied()
1286 .ok_or_else(|| APIError::APIMisuseError {
1287 err: format!(
1288 "Could not find a channel with user_channel_id {}",
1289 user_channel_id
1290 ),
1291 })?;
1292
1293 let jit_channel = peer_state
1294 .outbound_channels_by_intercept_scid
1295 .get(&intercept_scid)
1296 .ok_or_else(|| APIError::APIMisuseError {
1297 err: format!(
1298 "Failed to map intercept_scid {} for user_channel_id {} to a channel.",
1299 intercept_scid, user_channel_id,
1300 ),
1301 })?;
1302
1303 let is_pending = matches!(
1304 jit_channel.state,
1305 OutboundJITChannelState::PendingInitialPayment { .. }
1306 | OutboundJITChannelState::PendingChannelOpen { .. }
1307 );
1308
1309 if !is_pending {
1310 return Err(APIError::APIMisuseError {
1311 err: "Cannot abandon channel open after channel creation or payment forwarding"
1312 .to_string(),
1313 });
1314 }
1315
1316 peer_state.intercept_scid_by_user_channel_id.remove(&user_channel_id);
1317 peer_state.outbound_channels_by_intercept_scid.remove(&intercept_scid);
1318 peer_state.intercept_scid_by_channel_id.retain(|_, &mut scid| scid != intercept_scid);
1319 peer_state.needs_persist |= true;
1320 }
1321
1322 self.persist_peer_state(*counterparty_node_id).await.map_err(|e| {
1323 APIError::APIMisuseError {
1324 err: format!("Failed to persist peer state for {}: {}", counterparty_node_id, e),
1325 }
1326 })?;
1327
1328 Ok(())
1329 }
1330
1331 pub async fn channel_open_failed(
1339 &self, counterparty_node_id: &PublicKey, user_channel_id: u128,
1340 ) -> Result<(), APIError> {
1341 {
1342 let outer_state_lock = self.per_peer_state.read().unwrap();
1343
1344 let inner_state_lock = outer_state_lock.get(counterparty_node_id).ok_or_else(|| {
1345 APIError::APIMisuseError {
1346 err: format!("No counterparty state for: {}", counterparty_node_id),
1347 }
1348 })?;
1349
1350 let mut peer_state = inner_state_lock.lock().unwrap();
1351
1352 let intercept_scid = peer_state
1353 .intercept_scid_by_user_channel_id
1354 .get(&user_channel_id)
1355 .copied()
1356 .ok_or_else(|| APIError::APIMisuseError {
1357 err: format!(
1358 "Could not find a channel with user_channel_id {}",
1359 user_channel_id
1360 ),
1361 })?;
1362
1363 let jit_channel = peer_state
1364 .outbound_channels_by_intercept_scid
1365 .get_mut(&intercept_scid)
1366 .ok_or_else(|| APIError::APIMisuseError {
1367 err: format!(
1368 "Failed to map intercept_scid {} for user_channel_id {} to a channel.",
1369 intercept_scid, user_channel_id,
1370 ),
1371 })?;
1372
1373 if let OutboundJITChannelState::PendingChannelOpen { payment_queue, .. } =
1374 &mut jit_channel.state
1375 {
1376 let intercepted_htlcs = payment_queue.clear();
1377 for htlc in intercepted_htlcs {
1378 self.channel_manager.get_cm().fail_htlc_backwards_with_reason(
1379 &htlc.payment_hash,
1380 FailureCode::TemporaryNodeFailure,
1381 );
1382 }
1383
1384 jit_channel.state = OutboundJITChannelState::PendingInitialPayment {
1385 payment_queue: PaymentQueue::new(),
1386 };
1387 } else {
1388 return Err(APIError::APIMisuseError {
1389 err: "Channel is not in the PendingChannelOpen state.".to_string(),
1390 });
1391 }
1392
1393 peer_state.needs_persist |= true;
1394 }
1395 self.persist_peer_state(*counterparty_node_id).await.map_err(|e| {
1396 APIError::APIMisuseError {
1397 err: format!("Failed to persist peer state for {}: {}", counterparty_node_id, e),
1398 }
1399 })?;
1400
1401 Ok(())
1402 }
1403
1404 pub async fn channel_ready(
1411 &self, user_channel_id: u128, channel_id: &ChannelId, counterparty_node_id: &PublicKey,
1412 ) -> Result<(), APIError> {
1413 let mut should_persist = false;
1414 {
1415 let mut peer_by_channel_id = self.peer_by_channel_id.write().unwrap();
1416 peer_by_channel_id.insert(*channel_id, *counterparty_node_id);
1417 }
1418 match self.per_peer_state.read().unwrap().get(counterparty_node_id) {
1419 Some(inner_state_lock) => {
1420 let mut peer_state = inner_state_lock.lock().unwrap();
1421 if let Some(intercept_scid) =
1422 peer_state.intercept_scid_by_user_channel_id.get(&user_channel_id).copied()
1423 {
1424 should_persist |= true;
1425 peer_state.intercept_scid_by_channel_id.insert(*channel_id, intercept_scid);
1426 if let Some(jit_channel) =
1427 peer_state.outbound_channels_by_intercept_scid.get_mut(&intercept_scid)
1428 {
1429 match jit_channel.channel_ready(*channel_id) {
1430 Ok(ForwardPaymentAction(
1431 channel_id,
1432 FeePayment { opening_fee_msat, htlcs },
1433 )) => {
1434 let amounts_to_forward_msat =
1435 calculate_amount_to_forward_per_htlc(&htlcs, opening_fee_msat);
1436
1437 for (intercept_id, amount_to_forward_msat) in
1438 amounts_to_forward_msat
1439 {
1440 self.channel_manager.get_cm().forward_intercepted_htlc(
1441 intercept_id,
1442 &channel_id,
1443 *counterparty_node_id,
1444 amount_to_forward_msat,
1445 )?;
1446 }
1447 },
1448 Err(e) => {
1449 return Err(APIError::APIMisuseError {
1450 err: format!(
1451 "Failed to transition to channel ready: {}",
1452 e.err
1453 ),
1454 })
1455 },
1456 }
1457 } else {
1458 return Err(APIError::APIMisuseError {
1459 err: format!(
1460 "Could not find a channel with user_channel_id {}",
1461 user_channel_id
1462 ),
1463 });
1464 }
1465 } else {
1466 return Err(APIError::APIMisuseError {
1467 err: format!(
1468 "Could not find a channel with that user_channel_id {}",
1469 user_channel_id
1470 ),
1471 });
1472 }
1473 peer_state.needs_persist |= should_persist;
1474 },
1475 None => {
1476 return Err(APIError::APIMisuseError {
1477 err: format!("No counterparty state for: {}", counterparty_node_id),
1478 });
1479 },
1480 }
1481
1482 if should_persist {
1483 self.persist_peer_state(*counterparty_node_id).await.map_err(|e| {
1484 APIError::APIMisuseError {
1485 err: format!(
1486 "Failed to persist peer state for {}: {}",
1487 counterparty_node_id, e
1488 ),
1489 }
1490 })?;
1491 }
1492
1493 Ok(())
1494 }
1495
1496 fn handle_get_info_request(
1497 &self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
1498 params: LSPS2GetInfoRequest,
1499 ) -> Result<(), LightningError> {
1500 let mut message_queue_notifier = self.pending_messages.notifier();
1501 let event_queue_notifier = self.pending_events.notifier();
1502
1503 let mut outer_state_lock = self.per_peer_state.write().unwrap();
1504 let inner_state_lock = get_or_insert_peer_state_entry!(
1505 self,
1506 outer_state_lock,
1507 message_queue_notifier,
1508 counterparty_node_id
1509 );
1510 let mut peer_state_lock = inner_state_lock.lock().unwrap();
1511 let request = LSPS2Request::GetInfo(params.clone());
1512 self.insert_pending_request(
1513 &mut peer_state_lock,
1514 &mut message_queue_notifier,
1515 request_id.clone(),
1516 *counterparty_node_id,
1517 request,
1518 )?;
1519
1520 let event = LSPS2ServiceEvent::GetInfo {
1521 request_id,
1522 counterparty_node_id: *counterparty_node_id,
1523 token: params.token,
1524 };
1525 event_queue_notifier.enqueue(event);
1526
1527 Ok(())
1528 }
1529
1530 fn handle_buy_request(
1531 &self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey, params: LSPS2BuyRequest,
1532 ) -> Result<(), LightningError> {
1533 let mut message_queue_notifier = self.pending_messages.notifier();
1534 let event_queue_notifier = self.pending_events.notifier();
1535 if let Some(payment_size_msat) = params.payment_size_msat {
1536 if payment_size_msat < params.opening_fee_params.min_payment_size_msat {
1537 let response = LSPS2Response::BuyError(LSPSResponseError {
1538 code: LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_SMALL_ERROR_CODE,
1539 message: "payment size is below our minimum supported payment size".to_string(),
1540 data: None,
1541 });
1542 let msg = LSPS2Message::Response(request_id, response).into();
1543 message_queue_notifier.enqueue(counterparty_node_id, msg);
1544
1545 return Err(LightningError {
1546 err: "payment size is below our minimum supported payment size".to_string(),
1547 action: ErrorAction::IgnoreAndLog(Level::Info),
1548 });
1549 }
1550
1551 if payment_size_msat > params.opening_fee_params.max_payment_size_msat {
1552 let response = LSPS2Response::BuyError(LSPSResponseError {
1553 code: LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_LARGE_ERROR_CODE,
1554 message: "payment size is above our maximum supported payment size".to_string(),
1555 data: None,
1556 });
1557 let msg = LSPS2Message::Response(request_id, response).into();
1558 message_queue_notifier.enqueue(counterparty_node_id, msg);
1559 return Err(LightningError {
1560 err: "payment size is above our maximum supported payment size".to_string(),
1561 action: ErrorAction::IgnoreAndLog(Level::Info),
1562 });
1563 }
1564
1565 match compute_opening_fee(
1566 payment_size_msat,
1567 params.opening_fee_params.min_fee_msat,
1568 params.opening_fee_params.proportional.into(),
1569 ) {
1570 Some(opening_fee) => {
1571 if opening_fee >= payment_size_msat {
1572 let response = LSPS2Response::BuyError(LSPSResponseError {
1573 code: LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_SMALL_ERROR_CODE,
1574 message: "payment size is too small to cover the opening fee"
1575 .to_string(),
1576 data: None,
1577 });
1578 let msg = LSPS2Message::Response(request_id, response).into();
1579 message_queue_notifier.enqueue(counterparty_node_id, msg);
1580 return Err(LightningError {
1581 err: "payment size is too small to cover the opening fee".to_string(),
1582 action: ErrorAction::IgnoreAndLog(Level::Info),
1583 });
1584 }
1585 },
1586 None => {
1587 let response = LSPS2Response::BuyError(LSPSResponseError {
1588 code: LSPS2_BUY_REQUEST_PAYMENT_SIZE_TOO_LARGE_ERROR_CODE,
1589 message: "overflow error when calculating opening_fee".to_string(),
1590 data: None,
1591 });
1592 let msg = LSPS2Message::Response(request_id, response).into();
1593 message_queue_notifier.enqueue(counterparty_node_id, msg);
1594 return Err(LightningError {
1595 err: "overflow error when calculating opening_fee".to_string(),
1596 action: ErrorAction::IgnoreAndLog(Level::Info),
1597 });
1598 },
1599 }
1600 }
1601
1602 if !is_valid_opening_fee_params(
1604 ¶ms.opening_fee_params,
1605 &self.config.promise_secret,
1606 counterparty_node_id,
1607 ) {
1608 let response = LSPS2Response::BuyError(LSPSResponseError {
1609 code: LSPS2_BUY_REQUEST_INVALID_OPENING_FEE_PARAMS_ERROR_CODE,
1610 message: "valid_until is already past OR the promise did not match the provided parameters".to_string(),
1611 data: None,
1612 });
1613 let msg = LSPS2Message::Response(request_id, response).into();
1614 message_queue_notifier.enqueue(counterparty_node_id, msg);
1615 return Err(LightningError {
1616 err: "invalid opening fee parameters were supplied by client".to_string(),
1617 action: ErrorAction::IgnoreAndLog(Level::Info),
1618 });
1619 }
1620
1621 let mut outer_state_lock = self.per_peer_state.write().unwrap();
1622 let inner_state_lock = get_or_insert_peer_state_entry!(
1623 self,
1624 outer_state_lock,
1625 message_queue_notifier,
1626 counterparty_node_id
1627 );
1628 let mut peer_state_lock = inner_state_lock.lock().unwrap();
1629
1630 let request = LSPS2Request::Buy(params.clone());
1631
1632 self.insert_pending_request(
1633 &mut peer_state_lock,
1634 &mut message_queue_notifier,
1635 request_id.clone(),
1636 *counterparty_node_id,
1637 request,
1638 )?;
1639
1640 let event = LSPS2ServiceEvent::BuyRequest {
1641 request_id,
1642 counterparty_node_id: *counterparty_node_id,
1643 opening_fee_params: params.opening_fee_params,
1644 payment_size_msat: params.payment_size_msat,
1645 };
1646 event_queue_notifier.enqueue(event);
1647
1648 Ok(())
1649 }
1650
1651 fn insert_pending_request<'a>(
1652 &self, peer_state_lock: &mut MutexGuard<'a, PeerState>,
1653 message_queue_notifier: &mut MessageQueueNotifierGuard, request_id: LSPSRequestId,
1654 counterparty_node_id: PublicKey, request: LSPS2Request,
1655 ) -> Result<(), LightningError> {
1656 let create_pending_request_limit_exceeded_response =
1657 |message_queue_notifier: &mut MessageQueueNotifierGuard, error_message: String| {
1658 let error_details = LSPSResponseError {
1659 code: LSPS0_CLIENT_REJECTED_ERROR_CODE,
1660 message: "Reached maximum number of pending requests. Please try again later."
1661 .to_string(),
1662 data: None,
1663 };
1664 let response = match &request {
1665 LSPS2Request::GetInfo(_) => LSPS2Response::GetInfoError(error_details),
1666 LSPS2Request::Buy(_) => LSPS2Response::BuyError(error_details),
1667 };
1668 let msg = LSPS2Message::Response(request_id.clone(), response).into();
1669 message_queue_notifier.enqueue(&counterparty_node_id, msg);
1670
1671 Err(LightningError {
1672 err: error_message,
1673 action: ErrorAction::IgnoreAndLog(Level::Debug),
1674 })
1675 };
1676
1677 if self.total_pending_requests.load(Ordering::Relaxed) >= MAX_TOTAL_PENDING_REQUESTS {
1678 let error_message = format!(
1679 "Reached maximum number of total pending requests: {}",
1680 MAX_TOTAL_PENDING_REQUESTS
1681 );
1682 return create_pending_request_limit_exceeded_response(
1683 message_queue_notifier,
1684 error_message,
1685 );
1686 }
1687
1688 if peer_state_lock.pending_requests_and_channels() < MAX_PENDING_REQUESTS_PER_PEER {
1689 peer_state_lock.pending_requests.insert(request_id, request);
1690 self.total_pending_requests.fetch_add(1, Ordering::Relaxed);
1691 Ok(())
1692 } else {
1693 let error_message = format!(
1694 "Peer {} reached maximum number of pending requests: {}",
1695 counterparty_node_id, MAX_PENDING_REQUESTS_PER_PEER
1696 );
1697 create_pending_request_limit_exceeded_response(message_queue_notifier, error_message)
1698 }
1699 }
1700
1701 fn remove_pending_request<'a>(
1702 &self, peer_state_lock: &mut MutexGuard<'a, PeerState>, request_id: &LSPSRequestId,
1703 ) -> Option<LSPS2Request> {
1704 match peer_state_lock.pending_requests.remove(request_id) {
1705 Some(req) => {
1706 let res = self.total_pending_requests.fetch_update(
1707 Ordering::Relaxed,
1708 Ordering::Relaxed,
1709 |x| Some(x.saturating_sub(1)),
1710 );
1711 match res {
1712 Ok(previous_value) if previous_value == 0 => debug_assert!(
1713 false,
1714 "total_pending_requests counter out-of-sync! This should never happen!"
1715 ),
1716 Err(previous_value) if previous_value == 0 => debug_assert!(
1717 false,
1718 "total_pending_requests counter out-of-sync! This should never happen!"
1719 ),
1720 _ => {},
1721 }
1722 Some(req)
1723 },
1724 res => res,
1725 }
1726 }
1727
1728 #[cfg(debug_assertions)]
1729 fn verify_pending_request_counter(&self) {
1730 let mut num_requests = 0;
1731 let outer_state_lock = self.per_peer_state.read().unwrap();
1732 for (_, inner) in outer_state_lock.iter() {
1733 let inner_state_lock = inner.lock().unwrap();
1734 num_requests += inner_state_lock.pending_requests.len();
1735 }
1736 debug_assert_eq!(
1737 num_requests,
1738 self.total_pending_requests.load(Ordering::Relaxed),
1739 "total_pending_requests counter out-of-sync! This should never happen!"
1740 );
1741 }
1742
1743 async fn persist_peer_state(
1744 &self, counterparty_node_id: PublicKey,
1745 ) -> Result<(), lightning::io::Error> {
1746 let fut = {
1747 let outer_state_lock = self.per_peer_state.read().unwrap();
1748 match outer_state_lock.get(&counterparty_node_id) {
1749 None => {
1750 return Ok(());
1752 },
1753 Some(entry) => {
1754 let mut peer_state_lock = entry.lock().unwrap();
1755 if !peer_state_lock.needs_persist {
1756 return Ok(());
1758 } else {
1759 peer_state_lock.needs_persist = false;
1760 let key = counterparty_node_id.to_string();
1761 let encoded = peer_state_lock.encode();
1762 self.kv_store.write(
1765 LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1766 LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
1767 &key,
1768 encoded,
1769 )
1770 }
1771 },
1772 }
1773 };
1774
1775 fut.await.map_err(|e| {
1776 self.per_peer_state
1777 .read()
1778 .unwrap()
1779 .get(&counterparty_node_id)
1780 .map(|p| p.lock().unwrap().needs_persist = true);
1781 e
1782 })
1783 }
1784
1785 pub(crate) async fn persist(&self) -> Result<(), lightning::io::Error> {
1786 if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 {
1791 return Ok(());
1794 }
1795
1796 loop {
1797 let mut need_remove = Vec::new();
1798 let mut need_persist = Vec::new();
1799
1800 {
1801 let outer_state_lock = self.per_peer_state.read().unwrap();
1804 for (counterparty_node_id, inner_state_lock) in outer_state_lock.iter() {
1805 let mut peer_state_lock = inner_state_lock.lock().unwrap();
1806 peer_state_lock.prune_expired_request_state();
1807 let is_prunable = peer_state_lock.is_prunable();
1808 if is_prunable {
1809 need_remove.push(*counterparty_node_id);
1810 } else if peer_state_lock.needs_persist {
1811 need_persist.push(*counterparty_node_id);
1812 }
1813 }
1814 }
1815
1816 for counterparty_node_id in need_persist.into_iter() {
1817 debug_assert!(!need_remove.contains(&counterparty_node_id));
1818 self.persist_peer_state(counterparty_node_id).await?;
1819 }
1820
1821 for counterparty_node_id in need_remove {
1822 let mut future_opt = None;
1823 {
1824 let mut per_peer_state = self.per_peer_state.write().unwrap();
1830 if let Entry::Occupied(mut entry) = per_peer_state.entry(counterparty_node_id) {
1831 let state = entry.get_mut().get_mut().unwrap();
1832 if state.is_prunable() {
1833 entry.remove();
1834 let key = counterparty_node_id.to_string();
1835 future_opt = Some(self.kv_store.remove(
1836 LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1837 LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
1838 &key,
1839 true,
1840 ));
1841 } else {
1842 state.needs_persist = true;
1844 }
1845 } else {
1846 debug_assert!(false);
1849 }
1850 }
1851 if let Some(future) = future_opt {
1852 future.await?;
1853 } else {
1854 self.persist_peer_state(counterparty_node_id).await?;
1855 }
1856 }
1857
1858 if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 {
1859 self.persistence_in_flight.store(1, Ordering::Release);
1862 continue;
1863 }
1864 break;
1865 }
1866
1867 Ok(())
1868 }
1869
1870 pub(crate) fn peer_disconnected(&self, counterparty_node_id: PublicKey) {
1871 let outer_state_lock = self.per_peer_state.write().unwrap();
1872 if let Some(inner_state_lock) = outer_state_lock.get(&counterparty_node_id) {
1873 let mut peer_state_lock = inner_state_lock.lock().unwrap();
1874 peer_state_lock.prune_pending_requests();
1877 peer_state_lock.prune_expired_request_state();
1878 }
1879 }
1880
1881 pub fn channel_needs_manual_broadcast(
1885 &self, user_channel_id: u128, counterparty_node_id: &PublicKey,
1886 ) -> Result<bool, APIError> {
1887 let outer_state_lock = self.per_peer_state.read().unwrap();
1888 let inner_state_lock =
1889 outer_state_lock.get(counterparty_node_id).ok_or_else(|| APIError::APIMisuseError {
1890 err: format!("No counterparty state for: {}", counterparty_node_id),
1891 })?;
1892 let peer_state = inner_state_lock.lock().unwrap();
1893
1894 let intercept_scid = peer_state
1895 .intercept_scid_by_user_channel_id
1896 .get(&user_channel_id)
1897 .copied()
1898 .ok_or_else(|| APIError::APIMisuseError {
1899 err: format!("Could not find a channel with user_channel_id {}", user_channel_id),
1900 })?;
1901
1902 let jit_channel = peer_state
1903 .outbound_channels_by_intercept_scid
1904 .get(&intercept_scid)
1905 .ok_or_else(|| APIError::APIMisuseError {
1906 err: format!(
1907 "Failed to map intercept_scid {} for user_channel_id {} to a channel.",
1908 intercept_scid, user_channel_id,
1909 ),
1910 })?;
1911
1912 Ok(jit_channel.is_client_trusts_lsp())
1913 }
1914
1915 pub fn store_funding_transaction(
1926 &self, user_channel_id: u128, counterparty_node_id: &PublicKey, funding_tx: Transaction,
1927 ) -> Result<(), APIError> {
1928 let outer_state_lock = self.per_peer_state.read().unwrap();
1929 let inner_state_lock =
1930 outer_state_lock.get(counterparty_node_id).ok_or_else(|| APIError::APIMisuseError {
1931 err: format!("No counterparty state for: {}", counterparty_node_id),
1932 })?;
1933 let mut peer_state = inner_state_lock.lock().unwrap();
1934
1935 let intercept_scid = peer_state
1936 .intercept_scid_by_user_channel_id
1937 .get(&user_channel_id)
1938 .copied()
1939 .ok_or_else(|| APIError::APIMisuseError {
1940 err: format!("Could not find a channel with user_channel_id {}", user_channel_id),
1941 })?;
1942
1943 let jit_channel = peer_state
1944 .outbound_channels_by_intercept_scid
1945 .get_mut(&intercept_scid)
1946 .ok_or_else(|| APIError::APIMisuseError {
1947 err: format!(
1948 "Failed to map intercept_scid {} for user_channel_id {} to a channel.",
1949 intercept_scid, user_channel_id,
1950 ),
1951 })?;
1952
1953 jit_channel.set_funding_tx(funding_tx);
1954
1955 self.broadcast_funding_transaction_if_applies(jit_channel);
1956 Ok(())
1957 }
1958
1959 pub fn set_funding_tx_broadcast_safe(
1975 &self, user_channel_id: u128, counterparty_node_id: &PublicKey,
1976 ) -> Result<(), APIError> {
1977 let outer_state_lock = self.per_peer_state.read().unwrap();
1978 let inner_state_lock =
1979 outer_state_lock.get(counterparty_node_id).ok_or_else(|| APIError::APIMisuseError {
1980 err: format!("No counterparty state for: {}", counterparty_node_id),
1981 })?;
1982 let mut peer_state = inner_state_lock.lock().unwrap();
1983
1984 let intercept_scid = peer_state
1985 .intercept_scid_by_user_channel_id
1986 .get(&user_channel_id)
1987 .copied()
1988 .ok_or_else(|| APIError::APIMisuseError {
1989 err: format!("Could not find a channel with user_channel_id {}", user_channel_id),
1990 })?;
1991
1992 let jit_channel = peer_state
1993 .outbound_channels_by_intercept_scid
1994 .get_mut(&intercept_scid)
1995 .ok_or_else(|| APIError::APIMisuseError {
1996 err: format!(
1997 "Failed to map intercept_scid {} for user_channel_id {} to a channel.",
1998 intercept_scid, user_channel_id,
1999 ),
2000 })?;
2001
2002 jit_channel.set_funding_tx_broadcast_safe(true);
2003
2004 self.broadcast_funding_transaction_if_applies(jit_channel);
2005 Ok(())
2006 }
2007
2008 fn broadcast_funding_transaction_if_applies(&self, jit_channel: &OutboundJITChannel) {
2009 if !jit_channel.should_broadcast_funding_transaction() {
2010 return;
2011 }
2012
2013 let channel_id_opt = jit_channel.get_channel_id();
2020 if let Some(ch_id) = channel_id_opt {
2021 let is_channel_ready = self
2022 .channel_manager
2023 .get_cm()
2024 .list_channels()
2025 .into_iter()
2026 .any(|cd| cd.channel_id == ch_id && cd.is_channel_ready);
2027 if !is_channel_ready {
2028 return;
2029 }
2030 } else {
2031 return;
2032 }
2033
2034 if let Some(funding_tx) = jit_channel.get_funding_tx() {
2035 self.tx_broadcaster.broadcast_transactions(&[funding_tx]);
2036 }
2037 }
2038}
2039
2040impl<CM: Deref, K: Deref + Clone, T: Deref + Clone> LSPSProtocolMessageHandler
2041 for LSPS2ServiceHandler<CM, K, T>
2042where
2043 CM::Target: AChannelManager,
2044 K::Target: KVStore,
2045 T::Target: BroadcasterInterface,
2046{
2047 type ProtocolMessage = LSPS2Message;
2048 const PROTOCOL_NUMBER: Option<u16> = Some(2);
2049
2050 fn handle_message(
2051 &self, message: Self::ProtocolMessage, counterparty_node_id: &PublicKey,
2052 ) -> Result<(), LightningError> {
2053 match message {
2054 LSPS2Message::Request(request_id, request) => {
2055 let res = match request {
2056 LSPS2Request::GetInfo(params) => {
2057 self.handle_get_info_request(request_id, counterparty_node_id, params)
2058 },
2059 LSPS2Request::Buy(params) => {
2060 self.handle_buy_request(request_id, counterparty_node_id, params)
2061 },
2062 };
2063 #[cfg(debug_assertions)]
2064 self.verify_pending_request_counter();
2065 res
2066 },
2067 _ => {
2068 debug_assert!(
2069 false,
2070 "Service handler received LSPS2 response message. This should never happen."
2071 );
2072 Err(LightningError { err: format!("Service handler received LSPS2 response message from node {}. This should never happen.", counterparty_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)})
2073 },
2074 }
2075 }
2076}
2077
2078fn calculate_amount_to_forward_per_htlc(
2079 htlcs: &[InterceptedHTLC], total_fee_msat: u64,
2080) -> Vec<(InterceptId, u64)> {
2081 let total_expected_outbound_msat: u64 =
2083 htlcs.iter().map(|htlc| htlc.expected_outbound_amount_msat).sum();
2084 if total_fee_msat > total_expected_outbound_msat {
2085 debug_assert!(false, "Fee is larger than the total expected outbound amount.");
2086 return Vec::new();
2087 }
2088
2089 let mut fee_remaining_msat = total_fee_msat;
2090 let mut per_htlc_forwards = vec![];
2091 for (index, htlc) in htlcs.iter().enumerate() {
2092 let proportional_fee_amt_msat = (total_fee_msat as u128
2093 * htlc.expected_outbound_amount_msat as u128
2094 / total_expected_outbound_msat as u128) as u64;
2095
2096 let mut actual_fee_amt_msat = core::cmp::min(fee_remaining_msat, proportional_fee_amt_msat);
2097 actual_fee_amt_msat =
2098 core::cmp::min(actual_fee_amt_msat, htlc.expected_outbound_amount_msat);
2099 fee_remaining_msat -= actual_fee_amt_msat;
2100
2101 if index == htlcs.len() - 1 {
2102 actual_fee_amt_msat += fee_remaining_msat;
2103 }
2104
2105 let amount_to_forward_msat =
2106 htlc.expected_outbound_amount_msat.saturating_sub(actual_fee_amt_msat);
2107
2108 per_htlc_forwards.push((htlc.intercept_id, amount_to_forward_msat))
2109 }
2110 per_htlc_forwards
2111}
2112
2113pub struct LSPS2ServiceHandlerSync<'a, CM: Deref, K: Deref + Clone, T: Deref + Clone>
2116where
2117 CM::Target: AChannelManager,
2118 K::Target: KVStore,
2119 T::Target: BroadcasterInterface,
2120{
2121 inner: &'a LSPS2ServiceHandler<CM, K, T>,
2122}
2123
2124impl<'a, CM: Deref, K: Deref + Clone, T: Deref + Clone> LSPS2ServiceHandlerSync<'a, CM, K, T>
2125where
2126 CM::Target: AChannelManager,
2127 K::Target: KVStore,
2128 T::Target: BroadcasterInterface,
2129{
2130 pub(crate) fn from_inner(inner: &'a LSPS2ServiceHandler<CM, K, T>) -> Self {
2131 Self { inner }
2132 }
2133
2134 pub fn config(&self) -> &LSPS2ServiceConfig {
2138 &self.inner.config
2139 }
2140
2141 pub fn invalid_token_provided(
2145 &self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId,
2146 ) -> Result<(), APIError> {
2147 self.inner.invalid_token_provided(counterparty_node_id, request_id)
2148 }
2149
2150 pub fn opening_fee_params_generated(
2154 &self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId,
2155 opening_fee_params_menu: Vec<LSPS2RawOpeningFeeParams>,
2156 ) -> Result<(), APIError> {
2157 self.inner.opening_fee_params_generated(
2158 counterparty_node_id,
2159 request_id,
2160 opening_fee_params_menu,
2161 )
2162 }
2163
2164 pub fn invoice_parameters_generated(
2169 &self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId, intercept_scid: u64,
2170 cltv_expiry_delta: u32, client_trusts_lsp: bool, user_channel_id: u128,
2171 ) -> Result<(), APIError> {
2172 let mut fut = Box::pin(self.inner.invoice_parameters_generated(
2173 counterparty_node_id,
2174 request_id,
2175 intercept_scid,
2176 cltv_expiry_delta,
2177 client_trusts_lsp,
2178 user_channel_id,
2179 ));
2180
2181 let mut waker = dummy_waker();
2182 let mut ctx = task::Context::from_waker(&mut waker);
2183 match fut.as_mut().poll(&mut ctx) {
2184 task::Poll::Ready(result) => result,
2185 task::Poll::Pending => {
2186 unreachable!("Should not be pending in a sync context");
2188 },
2189 }
2190 }
2191
2192 pub fn htlc_intercepted(
2198 &self, intercept_scid: u64, intercept_id: InterceptId, expected_outbound_amount_msat: u64,
2199 payment_hash: PaymentHash,
2200 ) -> Result<(), APIError> {
2201 let mut fut = Box::pin(self.inner.htlc_intercepted(
2202 intercept_scid,
2203 intercept_id,
2204 expected_outbound_amount_msat,
2205 payment_hash,
2206 ));
2207
2208 let mut waker = dummy_waker();
2209 let mut ctx = task::Context::from_waker(&mut waker);
2210 match fut.as_mut().poll(&mut ctx) {
2211 task::Poll::Ready(result) => result,
2212 task::Poll::Pending => {
2213 unreachable!("Should not be pending in a sync context");
2215 },
2216 }
2217 }
2218
2219 pub fn htlc_handling_failed(
2225 &self, failure_type: HTLCHandlingFailureType,
2226 ) -> Result<(), APIError> {
2227 let mut fut = Box::pin(self.inner.htlc_handling_failed(failure_type));
2228
2229 let mut waker = dummy_waker();
2230 let mut ctx = task::Context::from_waker(&mut waker);
2231 match fut.as_mut().poll(&mut ctx) {
2232 task::Poll::Ready(result) => result,
2233 task::Poll::Pending => {
2234 unreachable!("Should not be pending in a sync context");
2236 },
2237 }
2238 }
2239
2240 pub fn payment_forwarded(
2246 &self, next_channel_id: ChannelId, skimmed_fee_msat: u64,
2247 ) -> Result<(), APIError> {
2248 let mut fut = Box::pin(self.inner.payment_forwarded(next_channel_id, skimmed_fee_msat));
2249
2250 let mut waker = dummy_waker();
2251 let mut ctx = task::Context::from_waker(&mut waker);
2252 match fut.as_mut().poll(&mut ctx) {
2253 task::Poll::Ready(result) => result,
2254 task::Poll::Pending => {
2255 unreachable!("Should not be pending in a sync context");
2257 },
2258 }
2259 }
2260
2261 pub fn channel_needs_manual_broadcast(
2263 &self, user_channel_id: u128, counterparty_node_id: &PublicKey,
2264 ) -> Result<bool, APIError> {
2265 self.inner.channel_needs_manual_broadcast(user_channel_id, counterparty_node_id)
2266 }
2267
2268 pub fn store_funding_transaction(
2270 &self, user_channel_id: u128, counterparty_node_id: &PublicKey, funding_tx: Transaction,
2271 ) -> Result<(), APIError> {
2272 self.inner.store_funding_transaction(user_channel_id, counterparty_node_id, funding_tx)
2273 }
2274
2275 pub fn set_funding_tx_broadcast_safe(
2277 &self, user_channel_id: u128, counterparty_node_id: &PublicKey,
2278 ) -> Result<(), APIError> {
2279 self.inner.set_funding_tx_broadcast_safe(user_channel_id, counterparty_node_id)
2280 }
2281
2282 pub fn channel_open_abandoned(
2286 &self, counterparty_node_id: &PublicKey, user_channel_id: u128,
2287 ) -> Result<(), APIError> {
2288 let mut fut =
2289 Box::pin(self.inner.channel_open_abandoned(counterparty_node_id, user_channel_id));
2290
2291 let mut waker = dummy_waker();
2292 let mut ctx = task::Context::from_waker(&mut waker);
2293 match fut.as_mut().poll(&mut ctx) {
2294 task::Poll::Ready(result) => result,
2295 task::Poll::Pending => {
2296 unreachable!("Should not be pending in a sync context");
2298 },
2299 }
2300 }
2301
2302 pub fn channel_open_failed(
2306 &self, counterparty_node_id: &PublicKey, user_channel_id: u128,
2307 ) -> Result<(), APIError> {
2308 let mut fut =
2309 Box::pin(self.inner.channel_open_failed(counterparty_node_id, user_channel_id));
2310
2311 let mut waker = dummy_waker();
2312 let mut ctx = task::Context::from_waker(&mut waker);
2313 match fut.as_mut().poll(&mut ctx) {
2314 task::Poll::Ready(result) => result,
2315 task::Poll::Pending => {
2316 unreachable!("Should not be pending in a sync context");
2318 },
2319 }
2320 }
2321
2322 pub fn channel_ready(
2328 &self, user_channel_id: u128, channel_id: &ChannelId, counterparty_node_id: &PublicKey,
2329 ) -> Result<(), APIError> {
2330 let mut fut =
2331 Box::pin(self.inner.channel_ready(user_channel_id, channel_id, counterparty_node_id));
2332
2333 let mut waker = dummy_waker();
2334 let mut ctx = task::Context::from_waker(&mut waker);
2335 match fut.as_mut().poll(&mut ctx) {
2336 task::Poll::Ready(result) => result,
2337 task::Poll::Pending => {
2338 unreachable!("Should not be pending in a sync context");
2340 },
2341 }
2342 }
2343}
2344
2345#[cfg(test)]
2346mod tests {
2347 use super::*;
2348
2349 use crate::lsps0::ser::LSPSDateTime;
2350
2351 use proptest::prelude::*;
2352
2353 use bitcoin::{absolute::LockTime, transaction::Version};
2354 use core::str::FromStr;
2355
2356 const MAX_VALUE_MSAT: u64 = 21_000_000_0000_0000_000;
2357
2358 fn arb_forward_amounts() -> impl Strategy<Value = (u64, u64, u64, u64)> {
2359 (1u64..MAX_VALUE_MSAT, 1u64..MAX_VALUE_MSAT, 1u64..MAX_VALUE_MSAT, 1u64..MAX_VALUE_MSAT)
2360 .prop_map(|(a, b, c, d)| {
2361 (a, b, c, core::cmp::min(d, a.saturating_add(b).saturating_add(c)))
2362 })
2363 }
2364
2365 proptest! {
2366 #[test]
2367 fn proptest_calculate_amount_to_forward((o_0, o_1, o_2, total_fee_msat) in arb_forward_amounts()) {
2368 let htlcs = vec![
2369 InterceptedHTLC {
2370 intercept_id: InterceptId([0; 32]),
2371 expected_outbound_amount_msat: o_0,
2372 payment_hash: PaymentHash([0; 32]),
2373 },
2374 InterceptedHTLC {
2375 intercept_id: InterceptId([1; 32]),
2376 expected_outbound_amount_msat: o_1,
2377 payment_hash: PaymentHash([0; 32]),
2378 },
2379 InterceptedHTLC {
2380 intercept_id: InterceptId([2; 32]),
2381 expected_outbound_amount_msat: o_2,
2382 payment_hash: PaymentHash([0; 32]),
2383 },
2384 ];
2385
2386 let result = calculate_amount_to_forward_per_htlc(&htlcs, total_fee_msat);
2387 let total_received_msat = o_0 + o_1 + o_2;
2388
2389 if total_received_msat < total_fee_msat {
2390 assert_eq!(result.len(), 0);
2391 } else {
2392 assert_ne!(result.len(), 0);
2393 assert_eq!(result[0].0, htlcs[0].intercept_id);
2394 assert_eq!(result[1].0, htlcs[1].intercept_id);
2395 assert_eq!(result[2].0, htlcs[2].intercept_id);
2396 assert!(result[0].1 <= o_0);
2397 assert!(result[1].1 <= o_1);
2398 assert!(result[2].1 <= o_2);
2399
2400 let result_sum = result.iter().map(|(_, f)| f).sum::<u64>();
2401 assert_eq!(total_received_msat - result_sum, total_fee_msat);
2402 let five_pct = result_sum as f32 * 0.05;
2403 let fair_share_0 = (o_0 as f32 / total_received_msat as f32) * result_sum as f32;
2404 assert!(result[0].1 as f32 <= fair_share_0 + five_pct);
2405 let fair_share_1 = (o_1 as f32 / total_received_msat as f32) * result_sum as f32;
2406 assert!(result[1].1 as f32 <= fair_share_1 + five_pct);
2407 let fair_share_2 = (o_2 as f32 / total_received_msat as f32) * result_sum as f32;
2408 assert!(result[2].1 as f32 <= fair_share_2 + five_pct);
2409 }
2410 }
2411 }
2412
2413 #[test]
2414 fn test_calculate_amount_to_forward() {
2415 let htlcs = vec![
2416 InterceptedHTLC {
2417 intercept_id: InterceptId([0; 32]),
2418 expected_outbound_amount_msat: 2,
2419 payment_hash: PaymentHash([0; 32]),
2420 },
2421 InterceptedHTLC {
2422 intercept_id: InterceptId([1; 32]),
2423 expected_outbound_amount_msat: 6,
2424 payment_hash: PaymentHash([0; 32]),
2425 },
2426 InterceptedHTLC {
2427 intercept_id: InterceptId([2; 32]),
2428 expected_outbound_amount_msat: 2,
2429 payment_hash: PaymentHash([0; 32]),
2430 },
2431 ];
2432 let result = calculate_amount_to_forward_per_htlc(&htlcs, 5);
2433 assert_eq!(
2434 result,
2435 vec![
2436 (htlcs[0].intercept_id, 1),
2437 (htlcs[1].intercept_id, 3),
2438 (htlcs[2].intercept_id, 1),
2439 ]
2440 );
2441 }
2442
2443 #[test]
2444 fn test_jit_channel_state_mpp() {
2445 let payment_size_msat = Some(500_000_000);
2446 let opening_fee_params = LSPS2OpeningFeeParams {
2447 min_fee_msat: 10_000_000,
2448 proportional: 10_000,
2449 valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(),
2450 min_lifetime: 4032,
2451 max_client_to_self_delay: 2016,
2452 min_payment_size_msat: 10_000_000,
2453 max_payment_size_msat: 1_000_000_000,
2454 promise: "ignore".to_string(),
2455 };
2456 let mut state = OutboundJITChannelState::new();
2457 {
2459 let action = state
2460 .htlc_intercepted(
2461 &opening_fee_params,
2462 &payment_size_msat,
2463 InterceptedHTLC {
2464 intercept_id: InterceptId([0; 32]),
2465 expected_outbound_amount_msat: 200_000_000,
2466 payment_hash: PaymentHash([100; 32]),
2467 },
2468 )
2469 .unwrap();
2470 assert!(matches!(state, OutboundJITChannelState::PendingInitialPayment { .. }));
2471 assert!(action.is_none());
2472 }
2473 {
2475 let action = state
2476 .htlc_intercepted(
2477 &opening_fee_params,
2478 &payment_size_msat,
2479 InterceptedHTLC {
2480 intercept_id: InterceptId([1; 32]),
2481 expected_outbound_amount_msat: 1_000_000,
2482 payment_hash: PaymentHash([101; 32]),
2483 },
2484 )
2485 .unwrap();
2486 assert!(matches!(state, OutboundJITChannelState::PendingInitialPayment { .. }));
2487 assert!(action.is_none());
2488 }
2489 {
2492 let action = state
2493 .htlc_intercepted(
2494 &opening_fee_params,
2495 &payment_size_msat,
2496 InterceptedHTLC {
2497 intercept_id: InterceptId([2; 32]),
2498 expected_outbound_amount_msat: 300_000_000,
2499 payment_hash: PaymentHash([100; 32]),
2500 },
2501 )
2502 .unwrap();
2503 assert!(matches!(state, OutboundJITChannelState::PendingChannelOpen { .. }));
2504 assert!(matches!(action, Some(HTLCInterceptedAction::OpenChannel(_))));
2505 }
2506 {
2508 let ForwardPaymentAction(channel_id, payment) =
2509 state.channel_ready(ChannelId([200; 32])).unwrap();
2510 assert_eq!(channel_id, ChannelId([200; 32]));
2511 assert_eq!(payment.opening_fee_msat, 10_000_000);
2512 assert_eq!(
2513 payment.htlcs,
2514 vec![
2515 InterceptedHTLC {
2516 intercept_id: InterceptId([0; 32]),
2517 expected_outbound_amount_msat: 200_000_000,
2518 payment_hash: PaymentHash([100; 32]),
2519 },
2520 InterceptedHTLC {
2521 intercept_id: InterceptId([2; 32]),
2522 expected_outbound_amount_msat: 300_000_000,
2523 payment_hash: PaymentHash([100; 32]),
2524 },
2525 ]
2526 );
2527 }
2528 {
2530 let action = state
2531 .htlc_intercepted(
2532 &opening_fee_params,
2533 &payment_size_msat,
2534 InterceptedHTLC {
2535 intercept_id: InterceptId([3; 32]),
2536 expected_outbound_amount_msat: 2_000_000,
2537 payment_hash: PaymentHash([102; 32]),
2538 },
2539 )
2540 .unwrap();
2541 assert!(matches!(state, OutboundJITChannelState::PendingPaymentForward { .. }));
2542 assert!(action.is_none());
2543 }
2544 {
2546 let action = state.htlc_handling_failed().unwrap();
2547 assert!(matches!(state, OutboundJITChannelState::PendingPayment { .. }));
2548 assert!(action.is_none());
2550 }
2551 {
2553 let action = state
2554 .htlc_intercepted(
2555 &opening_fee_params,
2556 &payment_size_msat,
2557 InterceptedHTLC {
2558 intercept_id: InterceptId([4; 32]),
2559 expected_outbound_amount_msat: 500_000_000,
2560 payment_hash: PaymentHash([101; 32]),
2561 },
2562 )
2563 .unwrap();
2564 assert!(matches!(state, OutboundJITChannelState::PendingPaymentForward { .. }));
2565 match action {
2566 Some(HTLCInterceptedAction::ForwardPayment(channel_id, payment)) => {
2567 assert_eq!(channel_id, ChannelId([200; 32]));
2568 assert_eq!(payment.opening_fee_msat, 10_000_000);
2569 assert_eq!(
2570 payment.htlcs,
2571 vec![
2572 InterceptedHTLC {
2573 intercept_id: InterceptId([1; 32]),
2574 expected_outbound_amount_msat: 1_000_000,
2575 payment_hash: PaymentHash([101; 32]),
2576 },
2577 InterceptedHTLC {
2578 intercept_id: InterceptId([4; 32]),
2579 expected_outbound_amount_msat: 500_000_000,
2580 payment_hash: PaymentHash([101; 32]),
2581 },
2582 ]
2583 );
2584 },
2585 _ => panic!("Unexpected action when intercepted HTLC."),
2586 }
2587 }
2588 {
2590 let action = state.payment_forwarded(100000000000).unwrap();
2591 assert!(matches!(state, OutboundJITChannelState::PaymentForwarded { .. }));
2592 match action {
2593 Some(ForwardHTLCsAction(channel_id, htlcs)) => {
2594 assert_eq!(channel_id, ChannelId([200; 32]));
2595 assert_eq!(
2596 htlcs,
2597 vec![InterceptedHTLC {
2598 intercept_id: InterceptId([3; 32]),
2599 expected_outbound_amount_msat: 2_000_000,
2600 payment_hash: PaymentHash([102; 32]),
2601 }]
2602 );
2603 },
2604 _ => panic!("Unexpected action when forwarded payment."),
2605 }
2606 }
2607 {
2609 let action = state
2610 .htlc_intercepted(
2611 &opening_fee_params,
2612 &payment_size_msat,
2613 InterceptedHTLC {
2614 intercept_id: InterceptId([5; 32]),
2615 expected_outbound_amount_msat: 200_000_000,
2616 payment_hash: PaymentHash([103; 32]),
2617 },
2618 )
2619 .unwrap();
2620 assert!(matches!(state, OutboundJITChannelState::PaymentForwarded { .. }));
2621 assert!(
2622 matches!(action, Some(HTLCInterceptedAction::ForwardHTLC(channel_id)) if channel_id == ChannelId([200; 32]))
2623 );
2624 }
2625 }
2626
2627 #[test]
2628 fn test_jit_channel_state_no_mpp() {
2629 let payment_size_msat = None;
2630 let opening_fee_params = LSPS2OpeningFeeParams {
2631 min_fee_msat: 10_000_000,
2632 proportional: 10_000,
2633 valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(),
2634 min_lifetime: 4032,
2635 max_client_to_self_delay: 2016,
2636 min_payment_size_msat: 10_000_000,
2637 max_payment_size_msat: 1_000_000_000,
2638 promise: "ignore".to_string(),
2639 };
2640 let mut state = OutboundJITChannelState::new();
2641 {
2643 let action = state
2644 .htlc_intercepted(
2645 &opening_fee_params,
2646 &payment_size_msat,
2647 InterceptedHTLC {
2648 intercept_id: InterceptId([0; 32]),
2649 expected_outbound_amount_msat: 500_000_000,
2650 payment_hash: PaymentHash([100; 32]),
2651 },
2652 )
2653 .unwrap();
2654 assert!(matches!(state, OutboundJITChannelState::PendingChannelOpen { .. }));
2655 assert!(matches!(action, Some(HTLCInterceptedAction::OpenChannel(_))));
2656 }
2657 {
2659 let action = state
2660 .htlc_intercepted(
2661 &opening_fee_params,
2662 &payment_size_msat,
2663 InterceptedHTLC {
2664 intercept_id: InterceptId([1; 32]),
2665 expected_outbound_amount_msat: 600_000_000,
2666 payment_hash: PaymentHash([101; 32]),
2667 },
2668 )
2669 .unwrap();
2670 assert!(matches!(state, OutboundJITChannelState::PendingChannelOpen { .. }));
2671 assert!(action.is_none());
2672 }
2673 {
2675 let ForwardPaymentAction(channel_id, payment) =
2676 state.channel_ready(ChannelId([200; 32])).unwrap();
2677 assert_eq!(channel_id, ChannelId([200; 32]));
2678 assert_eq!(payment.opening_fee_msat, 10_000_000);
2679 assert_eq!(
2680 payment.htlcs,
2681 vec![InterceptedHTLC {
2682 intercept_id: InterceptId([0; 32]),
2683 expected_outbound_amount_msat: 500_000_000,
2684 payment_hash: PaymentHash([100; 32]),
2685 },]
2686 );
2687 }
2688 {
2690 let action = state
2691 .htlc_intercepted(
2692 &opening_fee_params,
2693 &payment_size_msat,
2694 InterceptedHTLC {
2695 intercept_id: InterceptId([2; 32]),
2696 expected_outbound_amount_msat: 500_000_000,
2697 payment_hash: PaymentHash([102; 32]),
2698 },
2699 )
2700 .unwrap();
2701 assert!(matches!(state, OutboundJITChannelState::PendingPaymentForward { .. }));
2702 assert!(action.is_none());
2703 }
2704 {
2706 let action = state.htlc_handling_failed().unwrap();
2707 assert!(matches!(state, OutboundJITChannelState::PendingPaymentForward { .. }));
2708 match action {
2709 Some(ForwardPaymentAction(channel_id, payment)) => {
2710 assert_eq!(channel_id, ChannelId([200; 32]));
2711 assert_eq!(
2712 payment.htlcs,
2713 vec![InterceptedHTLC {
2714 intercept_id: InterceptId([1; 32]),
2715 expected_outbound_amount_msat: 600_000_000,
2716 payment_hash: PaymentHash([101; 32]),
2717 },]
2718 );
2719 },
2720 _ => panic!("Unexpected action when HTLC handling failed."),
2721 }
2722 }
2723 {
2725 let action = state.payment_forwarded(10000000000).unwrap();
2726 assert!(matches!(state, OutboundJITChannelState::PaymentForwarded { .. }));
2727 match action {
2728 Some(ForwardHTLCsAction(channel_id, htlcs)) => {
2729 assert_eq!(channel_id, ChannelId([200; 32]));
2730 assert_eq!(
2731 htlcs,
2732 vec![InterceptedHTLC {
2733 intercept_id: InterceptId([2; 32]),
2734 expected_outbound_amount_msat: 500_000_000,
2735 payment_hash: PaymentHash([102; 32]),
2736 }]
2737 );
2738 },
2739 _ => panic!("Unexpected action when forwarded payment."),
2740 }
2741 }
2742 {
2744 let action = state
2745 .htlc_intercepted(
2746 &opening_fee_params,
2747 &payment_size_msat,
2748 InterceptedHTLC {
2749 intercept_id: InterceptId([3; 32]),
2750 expected_outbound_amount_msat: 200_000_000,
2751 payment_hash: PaymentHash([103; 32]),
2752 },
2753 )
2754 .unwrap();
2755 assert!(matches!(state, OutboundJITChannelState::PaymentForwarded { .. }));
2756 assert!(
2757 matches!(action, Some(HTLCInterceptedAction::ForwardHTLC(channel_id)) if channel_id == ChannelId([200; 32]))
2758 );
2759 }
2760 }
2761
2762 #[test]
2763 fn broadcast_not_allowed_after_non_paying_fee_payment_claimed() {
2764 let min_fee_msat: u64 = 12345;
2765 let opening_fee_params = LSPS2OpeningFeeParams {
2766 min_fee_msat,
2767 proportional: 0,
2768 valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(),
2769 min_lifetime: 144,
2770 max_client_to_self_delay: 128,
2771 min_payment_size_msat: 1,
2772 max_payment_size_msat: 10_000_000_000,
2773 promise: "ignore".to_string(),
2774 };
2775
2776 let payment_size_msat = Some(1_000_000);
2777 let user_channel_id = 4242u128;
2778 let mut jit_channel = OutboundJITChannel::new(
2779 payment_size_msat,
2780 opening_fee_params.clone(),
2781 user_channel_id,
2782 true,
2783 );
2784
2785 let opening_payment_hash = PaymentHash([42; 32]);
2786 let htlcs_for_opening = [
2787 InterceptedHTLC {
2788 intercept_id: InterceptId([0; 32]),
2789 expected_outbound_amount_msat: 400_000,
2790 payment_hash: opening_payment_hash,
2791 },
2792 InterceptedHTLC {
2793 intercept_id: InterceptId([1; 32]),
2794 expected_outbound_amount_msat: 600_000,
2795 payment_hash: opening_payment_hash,
2796 },
2797 ];
2798
2799 assert!(jit_channel.htlc_intercepted(htlcs_for_opening[0].clone()).unwrap().is_none());
2800 let action = jit_channel.htlc_intercepted(htlcs_for_opening[1].clone()).unwrap();
2801 match action {
2802 Some(HTLCInterceptedAction::OpenChannel(_)) => {},
2803 other => panic!("Expected OpenChannel action, got {:?}", other),
2804 }
2805
2806 let channel_id = ChannelId([7; 32]);
2807 let ForwardPaymentAction(_, fee_payment) = jit_channel.channel_ready(channel_id).unwrap();
2808 assert_eq!(fee_payment.opening_fee_msat, min_fee_msat);
2809
2810 let followup = jit_channel.htlc_handling_failed().unwrap();
2811 assert!(followup.is_none());
2812
2813 let dummy_tx = Transaction {
2814 version: Version(2),
2815 lock_time: LockTime::ZERO,
2816 input: vec![],
2817 output: vec![],
2818 };
2819 jit_channel.set_funding_tx(dummy_tx);
2820 jit_channel.set_funding_tx_broadcast_safe(true);
2821 assert!(
2822 !jit_channel.should_broadcast_funding_transaction(),
2823 "Should not broadcast before any successful payment is claimed"
2824 );
2825
2826 let second_payment_hash = PaymentHash([99; 32]);
2827 let second_htlc = InterceptedHTLC {
2828 intercept_id: InterceptId([2; 32]),
2829 expected_outbound_amount_msat: min_fee_msat,
2830 payment_hash: second_payment_hash,
2831 };
2832 let action2 = jit_channel.htlc_intercepted(second_htlc).unwrap();
2833 let (forwarded_channel_id, fee_payment2) = match action2 {
2834 Some(HTLCInterceptedAction::ForwardPayment(cid, fp)) => (cid, fp),
2835 other => panic!("Expected ForwardPayment for second HTLC, got {:?}", other),
2836 };
2837 assert_eq!(forwarded_channel_id, channel_id);
2838 assert_eq!(fee_payment2.opening_fee_msat, min_fee_msat);
2839
2840 assert!(
2841 !jit_channel.should_broadcast_funding_transaction(),
2842 "Should not broadcast before any successful payment is claimed"
2843 );
2844
2845 let _ = jit_channel.payment_forwarded(min_fee_msat - 1).unwrap();
2847
2848 assert!(
2849 !jit_channel.should_broadcast_funding_transaction(),
2850 "Should not broadcast before all the fees are collected"
2851 );
2852
2853 let _ = jit_channel.payment_forwarded(min_fee_msat).unwrap();
2854
2855 let broadcast_allowed = jit_channel.should_broadcast_funding_transaction();
2856
2857 assert!(
2858 broadcast_allowed,
2859 "Broadcast was not allowed even though all the skimmed fees were collected"
2860 );
2861 }
2862}