1use core::future::Future;
9use core::task::{Poll, Waker};
10use std::collections::VecDeque;
11use std::ops::Deref;
12use std::sync::{Arc, Mutex};
13
14use bitcoin::blockdata::locktime::absolute::LockTime;
15use bitcoin::secp256k1::PublicKey;
16use bitcoin::{Amount, OutPoint};
17use lightning::events::bump_transaction::BumpTransactionEvent;
18use lightning::events::{
19 ClosureReason, Event as LdkEvent, PaymentFailureReason, PaymentPurpose, ReplayEvent,
20};
21use lightning::impl_writeable_tlv_based_enum;
22use lightning::ln::channelmanager::PaymentId;
23use lightning::ln::types::ChannelId;
24use lightning::routing::gossip::NodeId;
25use lightning::util::config::{
26 ChannelConfigOverrides, ChannelConfigUpdate, ChannelHandshakeConfigUpdate,
27};
28use lightning::util::errors::APIError;
29use lightning::util::persist::KVStore;
30use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
31use lightning_liquidity::lsps2::utils::compute_opening_fee;
32use lightning_types::payment::{PaymentHash, PaymentPreimage};
33use rand::{rng, Rng};
34
35use crate::config::{may_announce_channel, Config};
36use crate::connection::ConnectionManager;
37use crate::data_store::DataStoreUpdateResult;
38use crate::fee_estimator::ConfirmationTarget;
39use crate::io::{
40 EVENT_QUEUE_PERSISTENCE_KEY, EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
41 EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
42};
43use crate::liquidity::LiquiditySource;
44use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger};
45use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox;
46use crate::payment::asynchronous::static_invoice_store::StaticInvoiceStore;
47use crate::payment::store::{
48 PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus,
49};
50use crate::runtime::Runtime;
51use crate::types::{CustomTlvRecord, DynStore, OnionMessenger, PaymentStore, Sweeper, Wallet};
52use crate::{
53 hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore,
54 UserChannelId,
55};
56
57#[derive(Debug, Clone, PartialEq, Eq)]
61pub enum Event {
62 PaymentSuccessful {
64 payment_id: Option<PaymentId>,
68 payment_hash: PaymentHash,
70 payment_preimage: Option<PaymentPreimage>,
76 fee_paid_msat: Option<u64>,
78 },
79 PaymentFailed {
81 payment_id: Option<PaymentId>,
85 payment_hash: Option<PaymentHash>,
92 reason: Option<PaymentFailureReason>,
96 },
97 PaymentReceived {
99 payment_id: Option<PaymentId>,
103 payment_hash: PaymentHash,
105 amount_msat: u64,
107 custom_records: Vec<CustomTlvRecord>,
109 },
110 PaymentForwarded {
112 prev_channel_id: ChannelId,
114 next_channel_id: ChannelId,
116 prev_user_channel_id: Option<UserChannelId>,
120 next_user_channel_id: Option<UserChannelId>,
125 prev_node_id: Option<PublicKey>,
130 next_node_id: Option<PublicKey>,
135 total_fee_earned_msat: Option<u64>,
147 skimmed_fee_msat: Option<u64>,
156 claim_from_onchain_tx: bool,
159 outbound_amount_forwarded_msat: Option<u64>,
163 },
164 PaymentClaimable {
176 payment_id: PaymentId,
178 payment_hash: PaymentHash,
180 claimable_amount_msat: u64,
182 claim_deadline: Option<u32>,
185 custom_records: Vec<CustomTlvRecord>,
187 },
188 ChannelPending {
190 channel_id: ChannelId,
192 user_channel_id: UserChannelId,
194 former_temporary_channel_id: ChannelId,
196 counterparty_node_id: PublicKey,
198 funding_txo: OutPoint,
200 },
201 ChannelReady {
207 channel_id: ChannelId,
209 user_channel_id: UserChannelId,
211 counterparty_node_id: Option<PublicKey>,
215 funding_txo: Option<OutPoint>,
223 },
224 ChannelClosed {
226 channel_id: ChannelId,
228 user_channel_id: UserChannelId,
230 counterparty_node_id: Option<PublicKey>,
234 reason: Option<ClosureReason>,
236 },
237 SplicePending {
239 channel_id: ChannelId,
241 user_channel_id: UserChannelId,
243 counterparty_node_id: PublicKey,
245 new_funding_txo: OutPoint,
247 },
248 SpliceFailed {
250 channel_id: ChannelId,
252 user_channel_id: UserChannelId,
254 counterparty_node_id: PublicKey,
256 abandoned_funding_txo: Option<OutPoint>,
258 },
259}
260
261impl_writeable_tlv_based_enum!(Event,
262 (0, PaymentSuccessful) => {
263 (0, payment_hash, required),
264 (1, fee_paid_msat, option),
265 (3, payment_id, option),
266 (5, payment_preimage, option),
267 },
268 (1, PaymentFailed) => {
269 (0, payment_hash, option),
270 (1, reason, upgradable_option),
271 (3, payment_id, option),
272 },
273 (2, PaymentReceived) => {
274 (0, payment_hash, required),
275 (1, payment_id, option),
276 (2, amount_msat, required),
277 (3, custom_records, optional_vec),
278 },
279 (3, ChannelReady) => {
280 (0, channel_id, required),
281 (1, counterparty_node_id, option),
282 (2, user_channel_id, required),
283 (3, funding_txo, option),
284 },
285 (4, ChannelPending) => {
286 (0, channel_id, required),
287 (2, user_channel_id, required),
288 (4, former_temporary_channel_id, required),
289 (6, counterparty_node_id, required),
290 (8, funding_txo, required),
291 },
292 (5, ChannelClosed) => {
293 (0, channel_id, required),
294 (1, counterparty_node_id, option),
295 (2, user_channel_id, required),
296 (3, reason, upgradable_option),
297 },
298 (6, PaymentClaimable) => {
299 (0, payment_hash, required),
300 (2, payment_id, required),
301 (4, claimable_amount_msat, required),
302 (6, claim_deadline, option),
303 (7, custom_records, optional_vec),
304 },
305 (7, PaymentForwarded) => {
306 (0, prev_channel_id, required),
307 (1, prev_node_id, option),
308 (2, next_channel_id, required),
309 (3, next_node_id, option),
310 (4, prev_user_channel_id, option),
311 (6, next_user_channel_id, option),
312 (8, total_fee_earned_msat, option),
313 (10, skimmed_fee_msat, option),
314 (12, claim_from_onchain_tx, required),
315 (14, outbound_amount_forwarded_msat, option),
316 },
317 (8, SplicePending) => {
318 (1, channel_id, required),
319 (3, counterparty_node_id, required),
320 (5, user_channel_id, required),
321 (7, new_funding_txo, required),
322 },
323 (9, SpliceFailed) => {
324 (1, channel_id, required),
325 (3, counterparty_node_id, required),
326 (5, user_channel_id, required),
327 (7, abandoned_funding_txo, option),
328 },
329);
330
331pub struct EventQueue<L: Deref>
332where
333 L::Target: LdkLogger,
334{
335 queue: Arc<Mutex<VecDeque<Event>>>,
336 waker: Arc<Mutex<Option<Waker>>>,
337 kv_store: Arc<DynStore>,
338 logger: L,
339}
340
341impl<L: Deref> EventQueue<L>
342where
343 L::Target: LdkLogger,
344{
345 pub(crate) fn new(kv_store: Arc<DynStore>, logger: L) -> Self {
346 let queue = Arc::new(Mutex::new(VecDeque::new()));
347 let waker = Arc::new(Mutex::new(None));
348 Self { queue, waker, kv_store, logger }
349 }
350
351 pub(crate) async fn add_event(&self, event: Event) -> Result<(), Error> {
352 let data = {
353 let mut locked_queue = self.queue.lock().unwrap();
354 locked_queue.push_back(event);
355 EventQueueSerWrapper(&locked_queue).encode()
356 };
357
358 self.persist_queue(data).await?;
359
360 if let Some(waker) = self.waker.lock().unwrap().take() {
361 waker.wake();
362 }
363 Ok(())
364 }
365
366 pub(crate) fn next_event(&self) -> Option<Event> {
367 let locked_queue = self.queue.lock().unwrap();
368 locked_queue.front().cloned()
369 }
370
371 pub(crate) async fn next_event_async(&self) -> Event {
372 EventFuture { event_queue: Arc::clone(&self.queue), waker: Arc::clone(&self.waker) }.await
373 }
374
375 pub(crate) async fn event_handled(&self) -> Result<(), Error> {
376 let data = {
377 let mut locked_queue = self.queue.lock().unwrap();
378 locked_queue.pop_front();
379 EventQueueSerWrapper(&locked_queue).encode()
380 };
381
382 self.persist_queue(data).await?;
383
384 if let Some(waker) = self.waker.lock().unwrap().take() {
385 waker.wake();
386 }
387 Ok(())
388 }
389
390 async fn persist_queue(&self, encoded_queue: Vec<u8>) -> Result<(), Error> {
391 KVStore::write(
392 &*self.kv_store,
393 EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
394 EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
395 EVENT_QUEUE_PERSISTENCE_KEY,
396 encoded_queue,
397 )
398 .await
399 .map_err(|e| {
400 log_error!(
401 self.logger,
402 "Write for key {}/{}/{} failed due to: {}",
403 EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
404 EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
405 EVENT_QUEUE_PERSISTENCE_KEY,
406 e
407 );
408 Error::PersistenceFailed
409 })?;
410 Ok(())
411 }
412}
413
414impl<L: Deref> ReadableArgs<(Arc<DynStore>, L)> for EventQueue<L>
415where
416 L::Target: LdkLogger,
417{
418 #[inline]
419 fn read<R: lightning::io::Read>(
420 reader: &mut R, args: (Arc<DynStore>, L),
421 ) -> Result<Self, lightning::ln::msgs::DecodeError> {
422 let (kv_store, logger) = args;
423 let read_queue: EventQueueDeserWrapper = Readable::read(reader)?;
424 let queue = Arc::new(Mutex::new(read_queue.0));
425 let waker = Arc::new(Mutex::new(None));
426 Ok(Self { queue, waker, kv_store, logger })
427 }
428}
429
430struct EventQueueDeserWrapper(VecDeque<Event>);
431
432impl Readable for EventQueueDeserWrapper {
433 fn read<R: lightning::io::Read>(
434 reader: &mut R,
435 ) -> Result<Self, lightning::ln::msgs::DecodeError> {
436 let len: u16 = Readable::read(reader)?;
437 let mut queue = VecDeque::with_capacity(len as usize);
438 for _ in 0..len {
439 queue.push_back(Readable::read(reader)?);
440 }
441 Ok(Self(queue))
442 }
443}
444
445struct EventQueueSerWrapper<'a>(&'a VecDeque<Event>);
446
447impl Writeable for EventQueueSerWrapper<'_> {
448 fn write<W: Writer>(&self, writer: &mut W) -> Result<(), lightning::io::Error> {
449 (self.0.len() as u16).write(writer)?;
450 for e in self.0.iter() {
451 e.write(writer)?;
452 }
453 Ok(())
454 }
455}
456
457struct EventFuture {
458 event_queue: Arc<Mutex<VecDeque<Event>>>,
459 waker: Arc<Mutex<Option<Waker>>>,
460}
461
462impl Future for EventFuture {
463 type Output = Event;
464
465 fn poll(
466 self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>,
467 ) -> core::task::Poll<Self::Output> {
468 if let Some(event) = self.event_queue.lock().unwrap().front() {
469 Poll::Ready(event.clone())
470 } else {
471 *self.waker.lock().unwrap() = Some(cx.waker().clone());
472 Poll::Pending
473 }
474 }
475}
476
477pub(crate) struct EventHandler<L: Deref + Clone + Sync + Send + 'static>
478where
479 L::Target: LdkLogger,
480{
481 event_queue: Arc<EventQueue<L>>,
482 wallet: Arc<Wallet>,
483 bump_tx_event_handler: Arc<BumpTransactionEventHandler>,
484 channel_manager: Arc<ChannelManager>,
485 connection_manager: Arc<ConnectionManager<L>>,
486 output_sweeper: Arc<Sweeper>,
487 network_graph: Arc<Graph>,
488 liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
489 payment_store: Arc<PaymentStore>,
490 peer_store: Arc<PeerStore<L>>,
491 runtime: Arc<Runtime>,
492 logger: L,
493 config: Arc<Config>,
494 static_invoice_store: Option<StaticInvoiceStore>,
495 onion_messenger: Arc<OnionMessenger>,
496 om_mailbox: Option<Arc<OnionMessageMailbox>>,
497}
498
499impl<L: Deref + Clone + Sync + Send + 'static> EventHandler<L>
500where
501 L::Target: LdkLogger,
502{
503 pub fn new(
504 event_queue: Arc<EventQueue<L>>, wallet: Arc<Wallet>,
505 bump_tx_event_handler: Arc<BumpTransactionEventHandler>,
506 channel_manager: Arc<ChannelManager>, connection_manager: Arc<ConnectionManager<L>>,
507 output_sweeper: Arc<Sweeper>, network_graph: Arc<Graph>,
508 liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
509 payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>,
510 static_invoice_store: Option<StaticInvoiceStore>, onion_messenger: Arc<OnionMessenger>,
511 om_mailbox: Option<Arc<OnionMessageMailbox>>, runtime: Arc<Runtime>, logger: L,
512 config: Arc<Config>,
513 ) -> Self {
514 Self {
515 event_queue,
516 wallet,
517 bump_tx_event_handler,
518 channel_manager,
519 connection_manager,
520 output_sweeper,
521 network_graph,
522 liquidity_source,
523 payment_store,
524 peer_store,
525 logger,
526 runtime,
527 config,
528 static_invoice_store,
529 onion_messenger,
530 om_mailbox,
531 }
532 }
533
534 pub async fn handle_event(&self, event: LdkEvent) -> Result<(), ReplayEvent> {
535 match event {
536 LdkEvent::FundingGenerationReady {
537 temporary_channel_id,
538 counterparty_node_id,
539 channel_value_satoshis,
540 output_script,
541 user_channel_id,
542 } => {
543 let confirmation_target = ConfirmationTarget::ChannelFunding;
546
547 let cur_height = self.channel_manager.current_best_block().height;
549 let locktime = LockTime::from_height(cur_height).unwrap_or(LockTime::ZERO);
550
551 let channel_amount = Amount::from_sat(channel_value_satoshis);
553 match self.wallet.create_funding_transaction(
554 output_script,
555 channel_amount,
556 confirmation_target,
557 locktime,
558 ) {
559 Ok(final_tx) => {
560 let needs_manual_broadcast =
561 self.liquidity_source.as_ref().map_or(false, |ls| {
562 ls.as_ref().lsps2_channel_needs_manual_broadcast(
563 counterparty_node_id,
564 user_channel_id,
565 )
566 });
567
568 let result = if needs_manual_broadcast {
569 self.liquidity_source.as_ref().map(|ls| {
570 ls.lsps2_store_funding_transaction(
571 user_channel_id,
572 counterparty_node_id,
573 final_tx.clone(),
574 );
575 });
576 self.channel_manager.funding_transaction_generated_manual_broadcast(
577 temporary_channel_id,
578 counterparty_node_id,
579 final_tx,
580 )
581 } else {
582 self.channel_manager.funding_transaction_generated(
583 temporary_channel_id,
584 counterparty_node_id,
585 final_tx,
586 )
587 };
588
589 match result {
590 Ok(()) => {},
591 Err(APIError::APIMisuseError { err }) => {
592 log_error!(
593 self.logger,
594 "Encountered APIMisuseError, this should never happen: {}",
595 err
596 );
597 debug_assert!(false, "APIMisuseError: {}", err);
598 },
599 Err(APIError::ChannelUnavailable { err }) => {
600 log_error!(
601 self.logger,
602 "Failed to process funding transaction as channel went away before we could fund it: {}",
603 err
604 )
605 },
606 Err(err) => {
607 log_error!(
608 self.logger,
609 "Failed to process funding transaction: {:?}",
610 err
611 )
612 },
613 }
614 },
615 Err(err) => {
616 log_error!(self.logger, "Failed to create funding transaction: {}", err);
617 self.channel_manager
618 .force_close_broadcasting_latest_txn(
619 &temporary_channel_id,
620 &counterparty_node_id,
621 "Failed to create funding transaction".to_string(),
622 )
623 .unwrap_or_else(|e| {
624 log_error!(self.logger, "Failed to force close channel after funding generation failed: {:?}", e);
625 debug_assert!(false,
626 "Failed to force close channel after funding generation failed"
627 );
628 });
629 },
630 }
631 },
632 LdkEvent::FundingTxBroadcastSafe { user_channel_id, counterparty_node_id, .. } => {
633 self.liquidity_source.as_ref().map(|ls| {
634 ls.lsps2_funding_tx_broadcast_safe(user_channel_id, counterparty_node_id);
635 });
636 },
637 LdkEvent::PaymentClaimable {
638 payment_hash,
639 purpose,
640 amount_msat,
641 claim_deadline,
642 onion_fields,
643 counterparty_skimmed_fee_msat,
644 ..
645 } => {
646 let payment_id = PaymentId(payment_hash.0);
647 if let Some(info) = self.payment_store.get(&payment_id) {
648 if info.direction == PaymentDirection::Outbound {
649 log_info!(
650 self.logger,
651 "Refused inbound payment with ID {}: circular payments are unsupported.",
652 payment_id
653 );
654 self.channel_manager.fail_htlc_backwards(&payment_hash);
655
656 let update = PaymentDetailsUpdate {
657 status: Some(PaymentStatus::Failed),
658 ..PaymentDetailsUpdate::new(payment_id)
659 };
660 match self.payment_store.update(&update) {
661 Ok(_) => return Ok(()),
662 Err(e) => {
663 log_error!(self.logger, "Failed to access payment store: {}", e);
664 return Err(ReplayEvent());
665 },
666 };
667 }
668
669 if info.status == PaymentStatus::Succeeded
670 || matches!(info.kind, PaymentKind::Spontaneous { .. })
671 {
672 log_info!(
673 self.logger,
674 "Refused duplicate inbound payment from payment hash {} of {}msat",
675 hex_utils::to_string(&payment_hash.0),
676 amount_msat,
677 );
678 self.channel_manager.fail_htlc_backwards(&payment_hash);
679
680 let update = PaymentDetailsUpdate {
681 status: Some(PaymentStatus::Failed),
682 ..PaymentDetailsUpdate::new(payment_id)
683 };
684 match self.payment_store.update(&update) {
685 Ok(_) => return Ok(()),
686 Err(e) => {
687 log_error!(self.logger, "Failed to access payment store: {}", e);
688 return Err(ReplayEvent());
689 },
690 };
691 }
692
693 let max_total_opening_fee_msat = match info.kind {
694 PaymentKind::Bolt11Jit { lsp_fee_limits, .. } => {
695 lsp_fee_limits
696 .max_total_opening_fee_msat
697 .or_else(|| {
698 lsp_fee_limits.max_proportional_opening_fee_ppm_msat.and_then(
699 |max_prop_fee| {
700 compute_opening_fee(amount_msat, 0, max_prop_fee)
702 },
703 )
704 })
705 .unwrap_or(0)
706 },
707 _ => 0,
708 };
709
710 if counterparty_skimmed_fee_msat > max_total_opening_fee_msat {
711 log_info!(
712 self.logger,
713 "Refusing inbound payment with hash {} as the counterparty-withheld fee of {}msat exceeds our limit of {}msat",
714 hex_utils::to_string(&payment_hash.0),
715 counterparty_skimmed_fee_msat,
716 max_total_opening_fee_msat,
717 );
718 self.channel_manager.fail_htlc_backwards(&payment_hash);
719
720 let update = PaymentDetailsUpdate {
721 hash: Some(Some(payment_hash)),
722 status: Some(PaymentStatus::Failed),
723 ..PaymentDetailsUpdate::new(payment_id)
724 };
725 match self.payment_store.update(&update) {
726 Ok(_) => return Ok(()),
727 Err(e) => {
728 log_error!(self.logger, "Failed to access payment store: {}", e);
729 return Err(ReplayEvent());
730 },
731 };
732 }
733
734 if counterparty_skimmed_fee_msat > 0 {
736 match info.kind {
737 PaymentKind::Bolt11Jit { .. } => {
738 let update = PaymentDetailsUpdate {
739 counterparty_skimmed_fee_msat: Some(Some(counterparty_skimmed_fee_msat)),
740 ..PaymentDetailsUpdate::new(payment_id)
741 };
742 match self.payment_store.update(&update) {
743 Ok(_) => (),
744 Err(e) => {
745 log_error!(self.logger, "Failed to access payment store: {}", e);
746 return Err(ReplayEvent());
747 },
748 };
749 }
750 _ => debug_assert!(false, "We only expect the counterparty to get away with withholding fees for JIT payments."),
751 }
752 }
753
754 match info.kind {
758 PaymentKind::Bolt11 { preimage, .. }
759 | PaymentKind::Bolt11Jit { preimage, .. } => {
760 if purpose.preimage().is_none() {
761 debug_assert!(
762 preimage.is_none(),
763 "We would have registered the preimage if we knew"
764 );
765
766 let custom_records = onion_fields
767 .map(|cf| {
768 cf.custom_tlvs().into_iter().map(|tlv| tlv.into()).collect()
769 })
770 .unwrap_or_default();
771 let event = Event::PaymentClaimable {
772 payment_id,
773 payment_hash,
774 claimable_amount_msat: amount_msat,
775 claim_deadline,
776 custom_records,
777 };
778 match self.event_queue.add_event(event).await {
779 Ok(_) => return Ok(()),
780 Err(e) => {
781 log_error!(
782 self.logger,
783 "Failed to push to event queue: {}",
784 e
785 );
786 return Err(ReplayEvent());
787 },
788 };
789 }
790 },
791 _ => {},
792 }
793 }
794
795 log_info!(
796 self.logger,
797 "Received payment from payment hash {} of {}msat",
798 hex_utils::to_string(&payment_hash.0),
799 amount_msat,
800 );
801 let payment_preimage = match purpose {
802 PaymentPurpose::Bolt11InvoicePayment { payment_preimage, .. } => {
803 payment_preimage
804 },
805 PaymentPurpose::Bolt12OfferPayment {
806 payment_preimage,
807 payment_secret,
808 payment_context,
809 ..
810 } => {
811 let payer_note = payment_context.invoice_request.payer_note_truncated;
812 let offer_id = payment_context.offer_id;
813 let quantity = payment_context.invoice_request.quantity;
814 let kind = PaymentKind::Bolt12Offer {
815 hash: Some(payment_hash),
816 preimage: payment_preimage,
817 secret: Some(payment_secret),
818 offer_id,
819 payer_note,
820 quantity,
821 };
822
823 let payment = PaymentDetails::new(
824 payment_id,
825 kind,
826 Some(amount_msat),
827 None,
828 PaymentDirection::Inbound,
829 PaymentStatus::Pending,
830 );
831
832 match self.payment_store.insert(payment) {
833 Ok(false) => (),
834 Ok(true) => {
835 log_error!(
836 self.logger,
837 "Bolt12OfferPayment with ID {} was previously known",
838 payment_id,
839 );
840 debug_assert!(false);
841 },
842 Err(e) => {
843 log_error!(
844 self.logger,
845 "Failed to insert payment with ID {}: {}",
846 payment_id,
847 e
848 );
849 debug_assert!(false);
850 },
851 }
852 payment_preimage
853 },
854 PaymentPurpose::Bolt12RefundPayment { payment_preimage, .. } => {
855 payment_preimage
856 },
857 PaymentPurpose::SpontaneousPayment(preimage) => {
858 let kind = PaymentKind::Spontaneous {
860 hash: payment_hash,
861 preimage: Some(preimage),
862 };
863
864 let payment = PaymentDetails::new(
865 payment_id,
866 kind,
867 Some(amount_msat),
868 None,
869 PaymentDirection::Inbound,
870 PaymentStatus::Pending,
871 );
872
873 match self.payment_store.insert(payment) {
874 Ok(false) => (),
875 Ok(true) => {
876 log_error!(
877 self.logger,
878 "Spontaneous payment with ID {} was previously known",
879 payment_id,
880 );
881 debug_assert!(false);
882 },
883 Err(e) => {
884 log_error!(
885 self.logger,
886 "Failed to insert payment with ID {}: {}",
887 payment_id,
888 e
889 );
890 debug_assert!(false);
891 },
892 }
893
894 Some(preimage)
895 },
896 };
897
898 if let Some(preimage) = payment_preimage {
899 self.channel_manager.claim_funds(preimage);
900 } else {
901 log_error!(
902 self.logger,
903 "Failed to claim payment with ID {}: preimage unknown.",
904 payment_id,
905 );
906 self.channel_manager.fail_htlc_backwards(&payment_hash);
907
908 let update = PaymentDetailsUpdate {
909 hash: Some(Some(payment_hash)),
910 status: Some(PaymentStatus::Failed),
911 ..PaymentDetailsUpdate::new(payment_id)
912 };
913 match self.payment_store.update(&update) {
914 Ok(_) => return Ok(()),
915 Err(e) => {
916 log_error!(self.logger, "Failed to access payment store: {}", e);
917 return Err(ReplayEvent());
918 },
919 };
920 }
921 },
922 LdkEvent::PaymentClaimed {
923 payment_hash,
924 purpose,
925 amount_msat,
926 receiver_node_id: _,
927 htlcs: _,
928 sender_intended_total_msat: _,
929 onion_fields,
930 payment_id: _,
931 } => {
932 let payment_id = PaymentId(payment_hash.0);
933 log_info!(
934 self.logger,
935 "Claimed payment with ID {} from payment hash {} of {}msat.",
936 payment_id,
937 hex_utils::to_string(&payment_hash.0),
938 amount_msat,
939 );
940
941 let update = match purpose {
942 PaymentPurpose::Bolt11InvoicePayment {
943 payment_preimage,
944 payment_secret,
945 ..
946 } => PaymentDetailsUpdate {
947 preimage: Some(payment_preimage),
948 secret: Some(Some(payment_secret)),
949 amount_msat: Some(Some(amount_msat)),
950 status: Some(PaymentStatus::Succeeded),
951 ..PaymentDetailsUpdate::new(payment_id)
952 },
953 PaymentPurpose::Bolt12OfferPayment {
954 payment_preimage, payment_secret, ..
955 } => PaymentDetailsUpdate {
956 preimage: Some(payment_preimage),
957 secret: Some(Some(payment_secret)),
958 amount_msat: Some(Some(amount_msat)),
959 status: Some(PaymentStatus::Succeeded),
960 ..PaymentDetailsUpdate::new(payment_id)
961 },
962 PaymentPurpose::Bolt12RefundPayment {
963 payment_preimage,
964 payment_secret,
965 ..
966 } => PaymentDetailsUpdate {
967 preimage: Some(payment_preimage),
968 secret: Some(Some(payment_secret)),
969 amount_msat: Some(Some(amount_msat)),
970 status: Some(PaymentStatus::Succeeded),
971 ..PaymentDetailsUpdate::new(payment_id)
972 },
973 PaymentPurpose::SpontaneousPayment(preimage) => PaymentDetailsUpdate {
974 preimage: Some(Some(preimage)),
975 amount_msat: Some(Some(amount_msat)),
976 status: Some(PaymentStatus::Succeeded),
977 ..PaymentDetailsUpdate::new(payment_id)
978 },
979 };
980
981 match self.payment_store.update(&update) {
982 Ok(DataStoreUpdateResult::Updated) | Ok(DataStoreUpdateResult::Unchanged) => (
983 ),
986 Ok(DataStoreUpdateResult::NotFound) => {
987 log_error!(
988 self.logger,
989 "Claimed payment with ID {} couldn't be found in store",
990 payment_id,
991 );
992 },
993 Err(e) => {
994 log_error!(
995 self.logger,
996 "Failed to update payment with ID {}: {}",
997 payment_id,
998 e
999 );
1000 return Err(ReplayEvent());
1001 },
1002 }
1003
1004 let event = Event::PaymentReceived {
1005 payment_id: Some(payment_id),
1006 payment_hash,
1007 amount_msat,
1008 custom_records: onion_fields
1009 .map(|cf| cf.custom_tlvs().into_iter().map(|tlv| tlv.into()).collect())
1010 .unwrap_or_default(),
1011 };
1012 match self.event_queue.add_event(event).await {
1013 Ok(_) => return Ok(()),
1014 Err(e) => {
1015 log_error!(self.logger, "Failed to push to event queue: {}", e);
1016 return Err(ReplayEvent());
1017 },
1018 };
1019 },
1020 LdkEvent::PaymentSent {
1021 payment_id,
1022 payment_preimage,
1023 payment_hash,
1024 fee_paid_msat,
1025 ..
1026 } => {
1027 let payment_id = if let Some(id) = payment_id {
1028 id
1029 } else {
1030 debug_assert!(false, "payment_id should always be set.");
1031 return Ok(());
1032 };
1033
1034 let update = PaymentDetailsUpdate {
1035 hash: Some(Some(payment_hash)),
1036 preimage: Some(Some(payment_preimage)),
1037 fee_paid_msat: Some(fee_paid_msat),
1038 status: Some(PaymentStatus::Succeeded),
1039 ..PaymentDetailsUpdate::new(payment_id)
1040 };
1041
1042 match self.payment_store.update(&update) {
1043 Ok(_) => {},
1044 Err(e) => {
1045 log_error!(self.logger, "Failed to access payment store: {}", e);
1046 return Err(ReplayEvent());
1047 },
1048 };
1049
1050 self.payment_store.get(&payment_id).map(|payment| {
1051 log_info!(
1052 self.logger,
1053 "Successfully sent payment of {}msat{} from \
1054 payment hash {:?} with preimage {:?}",
1055 payment.amount_msat.unwrap(),
1056 if let Some(fee) = fee_paid_msat {
1057 format!(" (fee {} msat)", fee)
1058 } else {
1059 "".to_string()
1060 },
1061 hex_utils::to_string(&payment_hash.0),
1062 hex_utils::to_string(&payment_preimage.0)
1063 );
1064 });
1065 let event = Event::PaymentSuccessful {
1066 payment_id: Some(payment_id),
1067 payment_hash,
1068 payment_preimage: Some(payment_preimage),
1069 fee_paid_msat,
1070 };
1071
1072 match self.event_queue.add_event(event).await {
1073 Ok(_) => return Ok(()),
1074 Err(e) => {
1075 log_error!(self.logger, "Failed to push to event queue: {}", e);
1076 return Err(ReplayEvent());
1077 },
1078 };
1079 },
1080 LdkEvent::PaymentFailed { payment_id, payment_hash, reason, .. } => {
1081 log_info!(
1082 self.logger,
1083 "Failed to send payment with ID {} due to {:?}.",
1084 payment_id,
1085 reason
1086 );
1087
1088 let update = PaymentDetailsUpdate {
1089 hash: Some(payment_hash),
1090 status: Some(PaymentStatus::Failed),
1091 ..PaymentDetailsUpdate::new(payment_id)
1092 };
1093 match self.payment_store.update(&update) {
1094 Ok(_) => {},
1095 Err(e) => {
1096 log_error!(self.logger, "Failed to access payment store: {}", e);
1097 return Err(ReplayEvent());
1098 },
1099 };
1100
1101 let event =
1102 Event::PaymentFailed { payment_id: Some(payment_id), payment_hash, reason };
1103 match self.event_queue.add_event(event).await {
1104 Ok(_) => return Ok(()),
1105 Err(e) => {
1106 log_error!(self.logger, "Failed to push to event queue: {}", e);
1107 return Err(ReplayEvent());
1108 },
1109 };
1110 },
1111
1112 LdkEvent::PaymentPathSuccessful { .. } => {},
1113 LdkEvent::PaymentPathFailed { .. } => {},
1114 LdkEvent::ProbeSuccessful { .. } => {},
1115 LdkEvent::ProbeFailed { .. } => {},
1116 LdkEvent::HTLCHandlingFailed { failure_type, .. } => {
1117 if let Some(liquidity_source) = self.liquidity_source.as_ref() {
1118 liquidity_source.handle_htlc_handling_failed(failure_type).await;
1119 }
1120 },
1121 LdkEvent::SpendableOutputs { outputs, channel_id } => {
1122 match self
1123 .output_sweeper
1124 .track_spendable_outputs(outputs, channel_id, true, None)
1125 .await
1126 {
1127 Ok(_) => return Ok(()),
1128 Err(_) => {
1129 log_error!(self.logger, "Failed to track spendable outputs");
1130 return Err(ReplayEvent());
1131 },
1132 };
1133 },
1134 LdkEvent::OpenChannelRequest {
1135 temporary_channel_id,
1136 counterparty_node_id,
1137 funding_satoshis,
1138 channel_type,
1139 channel_negotiation_type: _,
1140 is_announced,
1141 params: _,
1142 } => {
1143 if is_announced {
1144 if let Err(err) = may_announce_channel(&*self.config) {
1145 log_error!(self.logger, "Rejecting inbound announced channel from peer {} due to missing configuration: {}", counterparty_node_id, err);
1146
1147 self.channel_manager
1148 .force_close_broadcasting_latest_txn(
1149 &temporary_channel_id,
1150 &counterparty_node_id,
1151 "Channel request rejected".to_string(),
1152 )
1153 .unwrap_or_else(|e| {
1154 log_error!(self.logger, "Failed to reject channel: {:?}", e)
1155 });
1156 return Ok(());
1157 }
1158 }
1159
1160 let anchor_channel = channel_type.requires_anchors_zero_fee_htlc_tx();
1161 if anchor_channel {
1162 if let Some(anchor_channels_config) =
1163 self.config.anchor_channels_config.as_ref()
1164 {
1165 let cur_anchor_reserve_sats = crate::total_anchor_channels_reserve_sats(
1166 &self.channel_manager,
1167 &self.config,
1168 );
1169 let spendable_amount_sats = self
1170 .wallet
1171 .get_spendable_amount_sats(cur_anchor_reserve_sats)
1172 .unwrap_or(0);
1173
1174 let required_amount_sats = if anchor_channels_config
1175 .trusted_peers_no_reserve
1176 .contains(&counterparty_node_id)
1177 {
1178 0
1179 } else {
1180 anchor_channels_config.per_channel_reserve_sats
1181 };
1182
1183 if spendable_amount_sats < required_amount_sats {
1184 log_error!(
1185 self.logger,
1186 "Rejecting inbound Anchor channel from peer {} due to insufficient available on-chain reserves. Available: {}/{}sats",
1187 counterparty_node_id,
1188 spendable_amount_sats,
1189 required_amount_sats,
1190 );
1191 self.channel_manager
1192 .force_close_broadcasting_latest_txn(
1193 &temporary_channel_id,
1194 &counterparty_node_id,
1195 "Channel request rejected".to_string(),
1196 )
1197 .unwrap_or_else(|e| {
1198 log_error!(self.logger, "Failed to reject channel: {:?}", e)
1199 });
1200 return Ok(());
1201 }
1202 } else {
1203 log_error!(
1204 self.logger,
1205 "Rejecting inbound channel from peer {} due to Anchor channels being disabled.",
1206 counterparty_node_id,
1207 );
1208 self.channel_manager
1209 .force_close_broadcasting_latest_txn(
1210 &temporary_channel_id,
1211 &counterparty_node_id,
1212 "Channel request rejected".to_string(),
1213 )
1214 .unwrap_or_else(|e| {
1215 log_error!(self.logger, "Failed to reject channel: {:?}", e)
1216 });
1217 return Ok(());
1218 }
1219 }
1220
1221 let user_channel_id: u128 = rng().random();
1222 let allow_0conf = self.config.trusted_peers_0conf.contains(&counterparty_node_id);
1223 let mut channel_override_config = None;
1224 if let Some((lsp_node_id, _)) = self
1225 .liquidity_source
1226 .as_ref()
1227 .and_then(|ls| ls.as_ref().get_lsps2_lsp_details())
1228 {
1229 if lsp_node_id == counterparty_node_id {
1230 channel_override_config = Some(ChannelConfigOverrides {
1237 handshake_overrides: Some(ChannelHandshakeConfigUpdate {
1238 max_inbound_htlc_value_in_flight_percent_of_channel: Some(100),
1239 ..Default::default()
1240 }),
1241 update_overrides: Some(ChannelConfigUpdate {
1242 accept_underpaying_htlcs: Some(true),
1243 ..Default::default()
1244 }),
1245 });
1246 }
1247 }
1248 let res = if allow_0conf {
1249 self.channel_manager.accept_inbound_channel_from_trusted_peer_0conf(
1250 &temporary_channel_id,
1251 &counterparty_node_id,
1252 user_channel_id,
1253 channel_override_config,
1254 )
1255 } else {
1256 self.channel_manager.accept_inbound_channel(
1257 &temporary_channel_id,
1258 &counterparty_node_id,
1259 user_channel_id,
1260 channel_override_config,
1261 )
1262 };
1263
1264 match res {
1265 Ok(()) => {
1266 log_info!(
1267 self.logger,
1268 "Accepting inbound{}{} channel of {}sats from{} peer {}",
1269 if allow_0conf { " 0conf" } else { "" },
1270 if anchor_channel { " Anchor" } else { "" },
1271 funding_satoshis,
1272 if allow_0conf { " trusted" } else { "" },
1273 counterparty_node_id,
1274 );
1275 },
1276 Err(e) => {
1277 log_error!(
1278 self.logger,
1279 "Error while accepting inbound{}{} channel from{} peer {}: {:?}",
1280 if allow_0conf { " 0conf" } else { "" },
1281 if anchor_channel { " Anchor" } else { "" },
1282 counterparty_node_id,
1283 if allow_0conf { " trusted" } else { "" },
1284 e,
1285 );
1286 },
1287 }
1288 },
1289 LdkEvent::PaymentForwarded {
1290 prev_channel_id,
1291 next_channel_id,
1292 prev_user_channel_id,
1293 next_user_channel_id,
1294 prev_node_id,
1295 next_node_id,
1296 total_fee_earned_msat,
1297 skimmed_fee_msat,
1298 claim_from_onchain_tx,
1299 outbound_amount_forwarded_msat,
1300 } => {
1301 {
1302 let read_only_network_graph = self.network_graph.read_only();
1303 let nodes = read_only_network_graph.nodes();
1304 let channels = self.channel_manager.list_channels();
1305
1306 let node_str = |channel_id: &Option<ChannelId>| {
1307 channel_id
1308 .and_then(|channel_id| {
1309 channels.iter().find(|c| c.channel_id == channel_id)
1310 })
1311 .and_then(|channel| {
1312 nodes.get(&NodeId::from_pubkey(&channel.counterparty.node_id))
1313 })
1314 .map_or("private_node".to_string(), |node| {
1315 node.announcement_info
1316 .as_ref()
1317 .map_or("unnamed node".to_string(), |ann| {
1318 format!("node {}", ann.alias())
1319 })
1320 })
1321 };
1322 let channel_str = |channel_id: &Option<ChannelId>| {
1323 channel_id
1324 .map(|channel_id| format!(" with channel {}", channel_id))
1325 .unwrap_or_default()
1326 };
1327 let from_prev_str = format!(
1328 " from {}{}",
1329 node_str(&prev_channel_id),
1330 channel_str(&prev_channel_id)
1331 );
1332 let to_next_str = format!(
1333 " to {}{}",
1334 node_str(&next_channel_id),
1335 channel_str(&next_channel_id)
1336 );
1337
1338 let fee_earned = total_fee_earned_msat.unwrap_or(0);
1339 if claim_from_onchain_tx {
1340 log_info!(
1341 self.logger,
1342 "Forwarded payment{}{} of {}msat, earning {}msat in fees from claiming onchain.",
1343 from_prev_str,
1344 to_next_str,
1345 outbound_amount_forwarded_msat.unwrap_or(0),
1346 fee_earned,
1347 );
1348 } else {
1349 log_info!(
1350 self.logger,
1351 "Forwarded payment{}{} of {}msat, earning {}msat in fees.",
1352 from_prev_str,
1353 to_next_str,
1354 outbound_amount_forwarded_msat.unwrap_or(0),
1355 fee_earned,
1356 );
1357 }
1358 }
1359
1360 if let Some(liquidity_source) = self.liquidity_source.as_ref() {
1361 let skimmed_fee_msat = skimmed_fee_msat.unwrap_or(0);
1362 liquidity_source
1363 .handle_payment_forwarded(next_channel_id, skimmed_fee_msat)
1364 .await;
1365 }
1366
1367 let event = Event::PaymentForwarded {
1368 prev_channel_id: prev_channel_id.expect("prev_channel_id expected for events generated by LDK versions greater than 0.0.107."),
1369 next_channel_id: next_channel_id.expect("next_channel_id expected for events generated by LDK versions greater than 0.0.107."),
1370 prev_user_channel_id: prev_user_channel_id.map(UserChannelId),
1371 next_user_channel_id: next_user_channel_id.map(UserChannelId),
1372 prev_node_id,
1373 next_node_id,
1374 total_fee_earned_msat,
1375 skimmed_fee_msat,
1376 claim_from_onchain_tx,
1377 outbound_amount_forwarded_msat,
1378 };
1379 self.event_queue.add_event(event).await.map_err(|e| {
1380 log_error!(self.logger, "Failed to push to event queue: {}", e);
1381 ReplayEvent()
1382 })?;
1383 },
1384 LdkEvent::ChannelPending {
1385 channel_id,
1386 user_channel_id,
1387 former_temporary_channel_id,
1388 counterparty_node_id,
1389 funding_txo,
1390 ..
1391 } => {
1392 log_info!(
1393 self.logger,
1394 "New channel {} with counterparty {} has been created and is pending confirmation on chain.",
1395 channel_id,
1396 counterparty_node_id,
1397 );
1398
1399 let event = Event::ChannelPending {
1400 channel_id,
1401 user_channel_id: UserChannelId(user_channel_id),
1402 former_temporary_channel_id: former_temporary_channel_id.unwrap(),
1403 counterparty_node_id,
1404 funding_txo,
1405 };
1406 match self.event_queue.add_event(event).await {
1407 Ok(_) => {},
1408 Err(e) => {
1409 log_error!(self.logger, "Failed to push to event queue: {}", e);
1410 return Err(ReplayEvent());
1411 },
1412 };
1413
1414 let network_graph = self.network_graph.read_only();
1415 let channels =
1416 self.channel_manager.list_channels_with_counterparty(&counterparty_node_id);
1417 if let Some(pending_channel) =
1418 channels.into_iter().find(|c| c.channel_id == channel_id)
1419 {
1420 if !pending_channel.is_outbound
1421 && self.peer_store.get_peer(&counterparty_node_id).is_none()
1422 {
1423 if let Some(address) = network_graph
1424 .nodes()
1425 .get(&NodeId::from_pubkey(&counterparty_node_id))
1426 .and_then(|node_info| node_info.announcement_info.as_ref())
1427 .and_then(|ann_info| ann_info.addresses().first())
1428 {
1429 let peer = PeerInfo {
1430 node_id: counterparty_node_id,
1431 address: address.clone(),
1432 };
1433
1434 self.peer_store.add_peer(peer).unwrap_or_else(|e| {
1435 log_error!(
1436 self.logger,
1437 "Failed to add peer {} to peer store: {}",
1438 counterparty_node_id,
1439 e
1440 );
1441 });
1442 }
1443 }
1444 }
1445 },
1446 LdkEvent::ChannelReady {
1447 channel_id,
1448 user_channel_id,
1449 counterparty_node_id,
1450 funding_txo,
1451 ..
1452 } => {
1453 if let Some(funding_txo) = funding_txo {
1454 log_info!(
1455 self.logger,
1456 "Channel {} with counterparty {} ready to be used with funding_txo {}",
1457 channel_id,
1458 counterparty_node_id,
1459 funding_txo,
1460 );
1461 } else {
1462 log_info!(
1463 self.logger,
1464 "Channel {} with counterparty {} ready to be used",
1465 channel_id,
1466 counterparty_node_id,
1467 );
1468 }
1469
1470 if let Some(liquidity_source) = self.liquidity_source.as_ref() {
1471 liquidity_source
1472 .handle_channel_ready(user_channel_id, &channel_id, &counterparty_node_id)
1473 .await;
1474 }
1475
1476 let event = Event::ChannelReady {
1477 channel_id,
1478 user_channel_id: UserChannelId(user_channel_id),
1479 counterparty_node_id: Some(counterparty_node_id),
1480 funding_txo,
1481 };
1482 match self.event_queue.add_event(event).await {
1483 Ok(_) => {},
1484 Err(e) => {
1485 log_error!(self.logger, "Failed to push to event queue: {}", e);
1486 return Err(ReplayEvent());
1487 },
1488 };
1489 },
1490 LdkEvent::ChannelClosed {
1491 channel_id,
1492 reason,
1493 user_channel_id,
1494 counterparty_node_id,
1495 ..
1496 } => {
1497 log_info!(self.logger, "Channel {} closed due to: {}", channel_id, reason);
1498
1499 let event = Event::ChannelClosed {
1500 channel_id,
1501 user_channel_id: UserChannelId(user_channel_id),
1502 counterparty_node_id,
1503 reason: Some(reason),
1504 };
1505
1506 match self.event_queue.add_event(event).await {
1507 Ok(_) => {},
1508 Err(e) => {
1509 log_error!(self.logger, "Failed to push to event queue: {}", e);
1510 return Err(ReplayEvent());
1511 },
1512 };
1513 },
1514 LdkEvent::DiscardFunding { .. } => {},
1515 LdkEvent::HTLCIntercepted {
1516 requested_next_hop_scid,
1517 intercept_id,
1518 expected_outbound_amount_msat,
1519 payment_hash,
1520 ..
1521 } => {
1522 if let Some(liquidity_source) = self.liquidity_source.as_ref() {
1523 liquidity_source
1524 .handle_htlc_intercepted(
1525 requested_next_hop_scid,
1526 intercept_id,
1527 expected_outbound_amount_msat,
1528 payment_hash,
1529 )
1530 .await;
1531 }
1532 },
1533 LdkEvent::InvoiceReceived { .. } => {
1534 debug_assert!(false, "We currently don't handle BOLT12 invoices manually, so this event should never be emitted.");
1535 },
1536 LdkEvent::ConnectionNeeded { node_id, addresses } => {
1537 let spawn_logger = self.logger.clone();
1538 let spawn_cm = Arc::clone(&self.connection_manager);
1539 let future = async move {
1540 for addr in &addresses {
1541 match spawn_cm.connect_peer_if_necessary(node_id, addr.clone()).await {
1542 Ok(()) => {
1543 return;
1544 },
1545 Err(e) => {
1546 log_error!(
1547 spawn_logger,
1548 "Failed to establish connection to peer {}@{}: {}",
1549 node_id,
1550 addr,
1551 e
1552 );
1553 },
1554 }
1555 }
1556 };
1557 self.runtime.spawn_cancellable_background_task(future);
1558 },
1559 LdkEvent::BumpTransaction(bte) => {
1560 match bte {
1561 BumpTransactionEvent::ChannelClose {
1562 ref channel_id,
1563 ref counterparty_node_id,
1564 ..
1565 } => {
1566 if let Some(anchor_channels_config) =
1568 self.config.anchor_channels_config.as_ref()
1569 {
1570 if anchor_channels_config
1571 .trusted_peers_no_reserve
1572 .contains(counterparty_node_id)
1573 {
1574 log_debug!(self.logger,
1575 "Ignoring BumpTransactionEvent::ChannelClose for channel {} due to trusted counterparty {}",
1576 channel_id, counterparty_node_id
1577 );
1578 return Ok(());
1579 }
1580 }
1581 },
1582 BumpTransactionEvent::HTLCResolution { .. } => {},
1583 }
1584
1585 self.bump_tx_event_handler.handle_event(&bte).await;
1586 },
1587 LdkEvent::OnionMessageIntercepted { peer_node_id, message } => {
1588 if let Some(om_mailbox) = self.om_mailbox.as_ref() {
1589 om_mailbox.onion_message_intercepted(peer_node_id, message);
1590 } else {
1591 log_trace!(
1592 self.logger,
1593 "Onion message intercepted, but no onion message mailbox available"
1594 );
1595 }
1596 },
1597 LdkEvent::OnionMessagePeerConnected { peer_node_id } => {
1598 if let Some(om_mailbox) = self.om_mailbox.as_ref() {
1599 let messages = om_mailbox.onion_message_peer_connected(peer_node_id);
1600
1601 for message in messages {
1602 if let Err(e) =
1603 self.onion_messenger.forward_onion_message(message, &peer_node_id)
1604 {
1605 log_trace!(
1606 self.logger,
1607 "Failed to forward onion message to peer {}: {:?}",
1608 peer_node_id,
1609 e
1610 );
1611 }
1612 }
1613 }
1614 },
1615
1616 LdkEvent::PersistStaticInvoice {
1617 invoice,
1618 invoice_request_path,
1619 invoice_slot,
1620 recipient_id,
1621 invoice_persisted_path,
1622 } => {
1623 if let Some(store) = self.static_invoice_store.as_ref() {
1624 match store
1625 .handle_persist_static_invoice(
1626 invoice,
1627 invoice_request_path,
1628 invoice_slot,
1629 recipient_id,
1630 )
1631 .await
1632 {
1633 Ok(_) => {
1634 self.channel_manager.static_invoice_persisted(invoice_persisted_path);
1635 },
1636 Err(e) => {
1637 log_error!(self.logger, "Failed to persist static invoice: {}", e);
1638 return Err(ReplayEvent());
1639 },
1640 };
1641 }
1642 },
1643 LdkEvent::StaticInvoiceRequested {
1644 recipient_id,
1645 invoice_slot,
1646 reply_path,
1647 invoice_request,
1648 } => {
1649 if let Some(store) = self.static_invoice_store.as_ref() {
1650 let invoice =
1651 store.handle_static_invoice_requested(&recipient_id, invoice_slot).await;
1652
1653 match invoice {
1654 Ok(Some((invoice, invoice_request_path))) => {
1655 if let Err(e) = self.channel_manager.respond_to_static_invoice_request(
1656 invoice,
1657 reply_path,
1658 invoice_request,
1659 invoice_request_path,
1660 ) {
1661 log_error!(self.logger, "Failed to send static invoice: {:?}", e);
1662 }
1663 },
1664 Ok(None) => {
1665 log_trace!(
1666 self.logger,
1667 "No static invoice found for recipient {} and slot {}",
1668 hex_utils::to_string(&recipient_id),
1669 invoice_slot
1670 );
1671 },
1672 Err(e) => {
1673 log_error!(self.logger, "Failed to retrieve static invoice: {}", e);
1674 return Err(ReplayEvent());
1675 },
1676 }
1677 }
1678 },
1679 LdkEvent::FundingTransactionReadyForSigning {
1681 channel_id,
1682 counterparty_node_id,
1683 unsigned_transaction,
1684 ..
1685 } => match self.wallet.sign_owned_inputs(unsigned_transaction) {
1686 Ok(partially_signed_tx) => {
1687 match self.channel_manager.funding_transaction_signed(
1688 &channel_id,
1689 &counterparty_node_id,
1690 partially_signed_tx,
1691 ) {
1692 Ok(()) => {
1693 log_info!(
1694 self.logger,
1695 "Signed funding transaction for channel {} with counterparty {}",
1696 channel_id,
1697 counterparty_node_id
1698 );
1699 },
1700 Err(e) => {
1701 debug_assert!(false, "Failed signing funding transaction: {:?}", e);
1703 log_error!(self.logger, "Failed signing funding transaction: {:?}", e);
1704 },
1705 }
1706 },
1707 Err(()) => log_error!(self.logger, "Failed signing funding transaction"),
1708 },
1709 LdkEvent::SplicePending {
1710 channel_id,
1711 user_channel_id,
1712 counterparty_node_id,
1713 new_funding_txo,
1714 ..
1715 } => {
1716 log_info!(
1717 self.logger,
1718 "Channel {} with counterparty {} pending splice with funding_txo {}",
1719 channel_id,
1720 counterparty_node_id,
1721 new_funding_txo,
1722 );
1723
1724 let event = Event::SplicePending {
1725 channel_id,
1726 user_channel_id: UserChannelId(user_channel_id),
1727 counterparty_node_id,
1728 new_funding_txo,
1729 };
1730
1731 match self.event_queue.add_event(event).await {
1732 Ok(_) => {},
1733 Err(e) => {
1734 log_error!(self.logger, "Failed to push to event queue: {}", e);
1735 return Err(ReplayEvent());
1736 },
1737 };
1738 },
1739 LdkEvent::SpliceFailed {
1740 channel_id,
1741 user_channel_id,
1742 counterparty_node_id,
1743 abandoned_funding_txo,
1744 contributed_outputs,
1745 ..
1746 } => {
1747 if let Some(funding_txo) = abandoned_funding_txo {
1748 log_info!(
1749 self.logger,
1750 "Channel {} with counterparty {} failed splice with funding_txo {}",
1751 channel_id,
1752 counterparty_node_id,
1753 funding_txo,
1754 );
1755 } else {
1756 log_info!(
1757 self.logger,
1758 "Channel {} with counterparty {} failed splice",
1759 channel_id,
1760 counterparty_node_id,
1761 );
1762 }
1763
1764 let tx = bitcoin::Transaction {
1765 version: bitcoin::transaction::Version::TWO,
1766 lock_time: bitcoin::absolute::LockTime::ZERO,
1767 input: vec![],
1768 output: contributed_outputs,
1769 };
1770 if let Err(e) = self.wallet.cancel_tx(&tx) {
1771 log_error!(self.logger, "Failed reclaiming unused addresses: {}", e);
1772 return Err(ReplayEvent());
1773 }
1774
1775 let event = Event::SpliceFailed {
1776 channel_id,
1777 user_channel_id: UserChannelId(user_channel_id),
1778 counterparty_node_id,
1779 abandoned_funding_txo,
1780 };
1781
1782 match self.event_queue.add_event(event).await {
1783 Ok(_) => {},
1784 Err(e) => {
1785 log_error!(self.logger, "Failed to push to event queue: {}", e);
1786 return Err(ReplayEvent());
1787 },
1788 };
1789 },
1790 }
1791 Ok(())
1792 }
1793}
1794
1795#[cfg(test)]
1796mod tests {
1797 use std::sync::atomic::{AtomicU16, Ordering};
1798 use std::time::Duration;
1799
1800 use lightning::util::test_utils::TestLogger;
1801
1802 use super::*;
1803 use crate::io::test_utils::InMemoryStore;
1804
1805 #[tokio::test]
1806 async fn event_queue_persistence() {
1807 let store: Arc<DynStore> = Arc::new(InMemoryStore::new());
1808 let logger = Arc::new(TestLogger::new());
1809 let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger)));
1810 assert_eq!(event_queue.next_event(), None);
1811
1812 let expected_event = Event::ChannelReady {
1813 channel_id: ChannelId([23u8; 32]),
1814 user_channel_id: UserChannelId(2323),
1815 counterparty_node_id: None,
1816 funding_txo: None,
1817 };
1818 event_queue.add_event(expected_event.clone()).await.unwrap();
1819
1820 for _ in 0..5 {
1822 assert_eq!(event_queue.next_event_async().await, expected_event);
1823 assert_eq!(event_queue.next_event(), Some(expected_event.clone()));
1824 }
1825
1826 let persisted_bytes = KVStore::read(
1828 &*store,
1829 EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
1830 EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
1831 EVENT_QUEUE_PERSISTENCE_KEY,
1832 )
1833 .await
1834 .unwrap();
1835 let deser_event_queue =
1836 EventQueue::read(&mut &persisted_bytes[..], (Arc::clone(&store), logger)).unwrap();
1837 assert_eq!(deser_event_queue.next_event_async().await, expected_event);
1838
1839 event_queue.event_handled().await.unwrap();
1840 assert_eq!(event_queue.next_event(), None);
1841 }
1842
1843 #[tokio::test]
1844 async fn event_queue_concurrency() {
1845 let store: Arc<DynStore> = Arc::new(InMemoryStore::new());
1846 let logger = Arc::new(TestLogger::new());
1847 let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger)));
1848 assert_eq!(event_queue.next_event(), None);
1849
1850 let expected_event = Event::ChannelReady {
1851 channel_id: ChannelId([23u8; 32]),
1852 user_channel_id: UserChannelId(2323),
1853 counterparty_node_id: None,
1854 funding_txo: None,
1855 };
1856
1857 tokio::select! {
1859 _ = tokio::time::sleep(Duration::from_secs(1)) => {
1860 }
1862 _ = event_queue.next_event_async() => {
1863 panic!();
1864 }
1865 }
1866
1867 assert_eq!(event_queue.next_event(), None);
1868 let enqueued_events = AtomicU16::new(0);
1870 let received_events = AtomicU16::new(0);
1871 let mut delayed_enqueue = false;
1872
1873 for _ in 0..25 {
1874 event_queue.add_event(expected_event.clone()).await.unwrap();
1875 enqueued_events.fetch_add(1, Ordering::SeqCst);
1876 }
1877
1878 loop {
1879 tokio::select! {
1880 _ = tokio::time::sleep(Duration::from_millis(10)), if !delayed_enqueue => {
1881 event_queue.add_event(expected_event.clone()).await.unwrap();
1882 enqueued_events.fetch_add(1, Ordering::SeqCst);
1883 delayed_enqueue = true;
1884 }
1885 e = event_queue.next_event_async() => {
1886 assert_eq!(e, expected_event);
1887 event_queue.event_handled().await.unwrap();
1888 received_events.fetch_add(1, Ordering::SeqCst);
1889
1890 event_queue.add_event(expected_event.clone()).await.unwrap();
1891 enqueued_events.fetch_add(1, Ordering::SeqCst);
1892 }
1893 e = event_queue.next_event_async() => {
1894 assert_eq!(e, expected_event);
1895 event_queue.event_handled().await.unwrap();
1896 received_events.fetch_add(1, Ordering::SeqCst);
1897 }
1898 }
1899
1900 if delayed_enqueue
1901 && received_events.load(Ordering::SeqCst) == enqueued_events.load(Ordering::SeqCst)
1902 {
1903 break;
1904 }
1905 }
1906 assert_eq!(event_queue.next_event(), None);
1907 }
1908}