ldk_node/
event.rs

1// This file is Copyright its original authors, visible in version control history.
2//
3// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
6// accordance with one or both of these licenses.
7
8use core::future::Future;
9use core::task::{Poll, Waker};
10use std::collections::VecDeque;
11use std::ops::Deref;
12use std::sync::{Arc, Mutex};
13
14use bitcoin::blockdata::locktime::absolute::LockTime;
15use bitcoin::secp256k1::PublicKey;
16use bitcoin::{Amount, OutPoint};
17use lightning::events::bump_transaction::BumpTransactionEvent;
18use lightning::events::{
19	ClosureReason, Event as LdkEvent, PaymentFailureReason, PaymentPurpose, ReplayEvent,
20};
21use lightning::impl_writeable_tlv_based_enum;
22use lightning::ln::channelmanager::PaymentId;
23use lightning::ln::types::ChannelId;
24use lightning::routing::gossip::NodeId;
25use lightning::util::config::{
26	ChannelConfigOverrides, ChannelConfigUpdate, ChannelHandshakeConfigUpdate,
27};
28use lightning::util::errors::APIError;
29use lightning::util::persist::KVStore;
30use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
31use lightning_liquidity::lsps2::utils::compute_opening_fee;
32use lightning_types::payment::{PaymentHash, PaymentPreimage};
33use rand::{rng, Rng};
34
35use crate::config::{may_announce_channel, Config};
36use crate::connection::ConnectionManager;
37use crate::data_store::DataStoreUpdateResult;
38use crate::fee_estimator::ConfirmationTarget;
39use crate::io::{
40	EVENT_QUEUE_PERSISTENCE_KEY, EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
41	EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
42};
43use crate::liquidity::LiquiditySource;
44use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger};
45use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox;
46use crate::payment::asynchronous::static_invoice_store::StaticInvoiceStore;
47use crate::payment::store::{
48	PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus,
49};
50use crate::runtime::Runtime;
51use crate::types::{CustomTlvRecord, DynStore, OnionMessenger, PaymentStore, Sweeper, Wallet};
52use crate::{
53	hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore,
54	UserChannelId,
55};
56
57/// An event emitted by [`Node`], which should be handled by the user.
58///
59/// [`Node`]: [`crate::Node`]
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub enum Event {
62	/// A sent payment was successful.
63	PaymentSuccessful {
64		/// A local identifier used to track the payment.
65		///
66		/// Will only be `None` for events serialized with LDK Node v0.2.1 or prior.
67		payment_id: Option<PaymentId>,
68		/// The hash of the payment.
69		payment_hash: PaymentHash,
70		/// The preimage to the `payment_hash`.
71		///
72		/// Note that this serves as a payment receipt.
73		///
74		/// Will only be `None` for events serialized with LDK Node v0.4.2 or prior.
75		payment_preimage: Option<PaymentPreimage>,
76		/// The total fee which was spent at intermediate hops in this payment.
77		fee_paid_msat: Option<u64>,
78	},
79	/// A sent payment has failed.
80	PaymentFailed {
81		/// A local identifier used to track the payment.
82		///
83		/// Will only be `None` for events serialized with LDK Node v0.2.1 or prior.
84		payment_id: Option<PaymentId>,
85		/// The hash of the payment.
86		///
87		/// This will be `None` if the payment failed before receiving an invoice when paying a
88		/// BOLT12 [`Offer`].
89		///
90		/// [`Offer`]: lightning::offers::offer::Offer
91		payment_hash: Option<PaymentHash>,
92		/// The reason why the payment failed.
93		///
94		/// This will be `None` for events serialized by LDK Node v0.2.1 and prior.
95		reason: Option<PaymentFailureReason>,
96	},
97	/// A payment has been received.
98	PaymentReceived {
99		/// A local identifier used to track the payment.
100		///
101		/// Will only be `None` for events serialized with LDK Node v0.2.1 or prior.
102		payment_id: Option<PaymentId>,
103		/// The hash of the payment.
104		payment_hash: PaymentHash,
105		/// The value, in thousandths of a satoshi, that has been received.
106		amount_msat: u64,
107		/// Custom TLV records received on the payment
108		custom_records: Vec<CustomTlvRecord>,
109	},
110	/// A payment has been forwarded.
111	PaymentForwarded {
112		/// The channel id of the incoming channel between the previous node and us.
113		prev_channel_id: ChannelId,
114		/// The channel id of the outgoing channel between the next node and us.
115		next_channel_id: ChannelId,
116		/// The `user_channel_id` of the incoming channel between the previous node and us.
117		///
118		/// Will only be `None` for events serialized with LDK Node v0.3.0 or prior.
119		prev_user_channel_id: Option<UserChannelId>,
120		/// The `user_channel_id` of the outgoing channel between the next node and us.
121		///
122		/// This will be `None` if the payment was settled via an on-chain transaction. See the
123		/// caveat described for the `total_fee_earned_msat` field.
124		next_user_channel_id: Option<UserChannelId>,
125		/// The node id of the previous node.
126		///
127		/// This is only `None` for HTLCs received prior to LDK Node v0.5 or for events serialized by
128		/// versions prior to v0.5.
129		prev_node_id: Option<PublicKey>,
130		/// The node id of the next node.
131		///
132		/// This is only `None` for HTLCs received prior to LDK Node v0.5 or for events serialized by
133		/// versions prior to v0.5.
134		next_node_id: Option<PublicKey>,
135		/// The total fee, in milli-satoshis, which was earned as a result of the payment.
136		///
137		/// Note that if we force-closed the channel over which we forwarded an HTLC while the HTLC
138		/// was pending, the amount the next hop claimed will have been rounded down to the nearest
139		/// whole satoshi. Thus, the fee calculated here may be higher than expected as we still
140		/// claimed the full value in millisatoshis from the source. In this case,
141		/// `claim_from_onchain_tx` will be set.
142		///
143		/// If the channel which sent us the payment has been force-closed, we will claim the funds
144		/// via an on-chain transaction. In that case we do not yet know the on-chain transaction
145		/// fees which we will spend and will instead set this to `None`.
146		total_fee_earned_msat: Option<u64>,
147		/// The share of the total fee, in milli-satoshis, which was withheld in addition to the
148		/// forwarding fee.
149		///
150		/// This will only be `Some` if we forwarded an intercepted HTLC with less than the
151		/// expected amount. This means our counterparty accepted to receive less than the invoice
152		/// amount.
153		///
154		/// The caveat described above the `total_fee_earned_msat` field applies here as well.
155		skimmed_fee_msat: Option<u64>,
156		/// If this is `true`, the forwarded HTLC was claimed by our counterparty via an on-chain
157		/// transaction.
158		claim_from_onchain_tx: bool,
159		/// The final amount forwarded, in milli-satoshis, after the fee is deducted.
160		///
161		/// The caveat described above the `total_fee_earned_msat` field applies here as well.
162		outbound_amount_forwarded_msat: Option<u64>,
163	},
164	/// A payment for a previously-registered payment hash has been received.
165	///
166	/// This needs to be manually claimed by supplying the correct preimage to [`claim_for_hash`].
167	///
168	/// If the the provided parameters don't match the expectations or the preimage can't be
169	/// retrieved in time, should be failed-back via [`fail_for_hash`].
170	///
171	/// Note claiming will necessarily fail after the `claim_deadline` has been reached.
172	///
173	/// [`claim_for_hash`]: crate::payment::Bolt11Payment::claim_for_hash
174	/// [`fail_for_hash`]: crate::payment::Bolt11Payment::fail_for_hash
175	PaymentClaimable {
176		/// A local identifier used to track the payment.
177		payment_id: PaymentId,
178		/// The hash of the payment.
179		payment_hash: PaymentHash,
180		/// The value, in thousandths of a satoshi, that is claimable.
181		claimable_amount_msat: u64,
182		/// The block height at which this payment will be failed back and will no longer be
183		/// eligible for claiming.
184		claim_deadline: Option<u32>,
185		/// Custom TLV records attached to the payment
186		custom_records: Vec<CustomTlvRecord>,
187	},
188	/// A channel has been created and is pending confirmation on-chain.
189	ChannelPending {
190		/// The `channel_id` of the channel.
191		channel_id: ChannelId,
192		/// The `user_channel_id` of the channel.
193		user_channel_id: UserChannelId,
194		/// The `temporary_channel_id` this channel used to be known by during channel establishment.
195		former_temporary_channel_id: ChannelId,
196		/// The `node_id` of the channel counterparty.
197		counterparty_node_id: PublicKey,
198		/// The outpoint of the channel's funding transaction.
199		funding_txo: OutPoint,
200	},
201	/// A channel is ready to be used.
202	///
203	/// This event is emitted when:
204	/// - A new channel has been established and is ready for use
205	/// - An existing channel has been spliced and is ready with the new funding output
206	ChannelReady {
207		/// The `channel_id` of the channel.
208		channel_id: ChannelId,
209		/// The `user_channel_id` of the channel.
210		user_channel_id: UserChannelId,
211		/// The `node_id` of the channel counterparty.
212		///
213		/// This will be `None` for events serialized by LDK Node v0.1.0 and prior.
214		counterparty_node_id: Option<PublicKey>,
215		/// The outpoint of the channel's funding transaction.
216		///
217		/// This represents the channel's current funding output, which may change when the
218		/// channel is spliced. For spliced channels, this will contain the new funding output
219		/// from the confirmed splice transaction.
220		///
221		/// This will be `None` for events serialized by LDK Node v0.6.0 and prior.
222		funding_txo: Option<OutPoint>,
223	},
224	/// A channel has been closed.
225	ChannelClosed {
226		/// The `channel_id` of the channel.
227		channel_id: ChannelId,
228		/// The `user_channel_id` of the channel.
229		user_channel_id: UserChannelId,
230		/// The `node_id` of the channel counterparty.
231		///
232		/// This will be `None` for events serialized by LDK Node v0.1.0 and prior.
233		counterparty_node_id: Option<PublicKey>,
234		/// This will be `None` for events serialized by LDK Node v0.2.1 and prior.
235		reason: Option<ClosureReason>,
236	},
237	/// A channel splice is pending confirmation on-chain.
238	SplicePending {
239		/// The `channel_id` of the channel.
240		channel_id: ChannelId,
241		/// The `user_channel_id` of the channel.
242		user_channel_id: UserChannelId,
243		/// The `node_id` of the channel counterparty.
244		counterparty_node_id: PublicKey,
245		/// The outpoint of the channel's splice funding transaction.
246		new_funding_txo: OutPoint,
247	},
248	/// A channel splice has failed.
249	SpliceFailed {
250		/// The `channel_id` of the channel.
251		channel_id: ChannelId,
252		/// The `user_channel_id` of the channel.
253		user_channel_id: UserChannelId,
254		/// The `node_id` of the channel counterparty.
255		counterparty_node_id: PublicKey,
256		/// The outpoint of the channel's splice funding transaction, if one was created.
257		abandoned_funding_txo: Option<OutPoint>,
258	},
259}
260
261impl_writeable_tlv_based_enum!(Event,
262	(0, PaymentSuccessful) => {
263		(0, payment_hash, required),
264		(1, fee_paid_msat, option),
265		(3, payment_id, option),
266		(5, payment_preimage, option),
267	},
268	(1, PaymentFailed) => {
269		(0, payment_hash, option),
270		(1, reason, upgradable_option),
271		(3, payment_id, option),
272	},
273	(2, PaymentReceived) => {
274		(0, payment_hash, required),
275		(1, payment_id, option),
276		(2, amount_msat, required),
277		(3, custom_records, optional_vec),
278	},
279	(3, ChannelReady) => {
280		(0, channel_id, required),
281		(1, counterparty_node_id, option),
282		(2, user_channel_id, required),
283		(3, funding_txo, option),
284	},
285	(4, ChannelPending) => {
286		(0, channel_id, required),
287		(2, user_channel_id, required),
288		(4, former_temporary_channel_id, required),
289		(6, counterparty_node_id, required),
290		(8, funding_txo, required),
291	},
292	(5, ChannelClosed) => {
293		(0, channel_id, required),
294		(1, counterparty_node_id, option),
295		(2, user_channel_id, required),
296		(3, reason, upgradable_option),
297	},
298	(6, PaymentClaimable) => {
299		(0, payment_hash, required),
300		(2, payment_id, required),
301		(4, claimable_amount_msat, required),
302		(6, claim_deadline, option),
303		(7, custom_records, optional_vec),
304	},
305	(7, PaymentForwarded) => {
306		(0, prev_channel_id, required),
307		(1, prev_node_id, option),
308		(2, next_channel_id, required),
309		(3, next_node_id, option),
310		(4, prev_user_channel_id, option),
311		(6, next_user_channel_id, option),
312		(8, total_fee_earned_msat, option),
313		(10, skimmed_fee_msat, option),
314		(12, claim_from_onchain_tx, required),
315		(14, outbound_amount_forwarded_msat, option),
316	},
317	(8, SplicePending) => {
318		(1, channel_id, required),
319		(3, counterparty_node_id, required),
320		(5, user_channel_id, required),
321		(7, new_funding_txo, required),
322	},
323	(9, SpliceFailed) => {
324		(1, channel_id, required),
325		(3, counterparty_node_id, required),
326		(5, user_channel_id, required),
327		(7, abandoned_funding_txo, option),
328	},
329);
330
331pub struct EventQueue<L: Deref>
332where
333	L::Target: LdkLogger,
334{
335	queue: Arc<Mutex<VecDeque<Event>>>,
336	waker: Arc<Mutex<Option<Waker>>>,
337	kv_store: Arc<DynStore>,
338	logger: L,
339}
340
341impl<L: Deref> EventQueue<L>
342where
343	L::Target: LdkLogger,
344{
345	pub(crate) fn new(kv_store: Arc<DynStore>, logger: L) -> Self {
346		let queue = Arc::new(Mutex::new(VecDeque::new()));
347		let waker = Arc::new(Mutex::new(None));
348		Self { queue, waker, kv_store, logger }
349	}
350
351	pub(crate) async fn add_event(&self, event: Event) -> Result<(), Error> {
352		let data = {
353			let mut locked_queue = self.queue.lock().unwrap();
354			locked_queue.push_back(event);
355			EventQueueSerWrapper(&locked_queue).encode()
356		};
357
358		self.persist_queue(data).await?;
359
360		if let Some(waker) = self.waker.lock().unwrap().take() {
361			waker.wake();
362		}
363		Ok(())
364	}
365
366	pub(crate) fn next_event(&self) -> Option<Event> {
367		let locked_queue = self.queue.lock().unwrap();
368		locked_queue.front().cloned()
369	}
370
371	pub(crate) async fn next_event_async(&self) -> Event {
372		EventFuture { event_queue: Arc::clone(&self.queue), waker: Arc::clone(&self.waker) }.await
373	}
374
375	pub(crate) async fn event_handled(&self) -> Result<(), Error> {
376		let data = {
377			let mut locked_queue = self.queue.lock().unwrap();
378			locked_queue.pop_front();
379			EventQueueSerWrapper(&locked_queue).encode()
380		};
381
382		self.persist_queue(data).await?;
383
384		if let Some(waker) = self.waker.lock().unwrap().take() {
385			waker.wake();
386		}
387		Ok(())
388	}
389
390	async fn persist_queue(&self, encoded_queue: Vec<u8>) -> Result<(), Error> {
391		KVStore::write(
392			&*self.kv_store,
393			EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
394			EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
395			EVENT_QUEUE_PERSISTENCE_KEY,
396			encoded_queue,
397		)
398		.await
399		.map_err(|e| {
400			log_error!(
401				self.logger,
402				"Write for key {}/{}/{} failed due to: {}",
403				EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
404				EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
405				EVENT_QUEUE_PERSISTENCE_KEY,
406				e
407			);
408			Error::PersistenceFailed
409		})?;
410		Ok(())
411	}
412}
413
414impl<L: Deref> ReadableArgs<(Arc<DynStore>, L)> for EventQueue<L>
415where
416	L::Target: LdkLogger,
417{
418	#[inline]
419	fn read<R: lightning::io::Read>(
420		reader: &mut R, args: (Arc<DynStore>, L),
421	) -> Result<Self, lightning::ln::msgs::DecodeError> {
422		let (kv_store, logger) = args;
423		let read_queue: EventQueueDeserWrapper = Readable::read(reader)?;
424		let queue = Arc::new(Mutex::new(read_queue.0));
425		let waker = Arc::new(Mutex::new(None));
426		Ok(Self { queue, waker, kv_store, logger })
427	}
428}
429
430struct EventQueueDeserWrapper(VecDeque<Event>);
431
432impl Readable for EventQueueDeserWrapper {
433	fn read<R: lightning::io::Read>(
434		reader: &mut R,
435	) -> Result<Self, lightning::ln::msgs::DecodeError> {
436		let len: u16 = Readable::read(reader)?;
437		let mut queue = VecDeque::with_capacity(len as usize);
438		for _ in 0..len {
439			queue.push_back(Readable::read(reader)?);
440		}
441		Ok(Self(queue))
442	}
443}
444
445struct EventQueueSerWrapper<'a>(&'a VecDeque<Event>);
446
447impl Writeable for EventQueueSerWrapper<'_> {
448	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), lightning::io::Error> {
449		(self.0.len() as u16).write(writer)?;
450		for e in self.0.iter() {
451			e.write(writer)?;
452		}
453		Ok(())
454	}
455}
456
457struct EventFuture {
458	event_queue: Arc<Mutex<VecDeque<Event>>>,
459	waker: Arc<Mutex<Option<Waker>>>,
460}
461
462impl Future for EventFuture {
463	type Output = Event;
464
465	fn poll(
466		self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>,
467	) -> core::task::Poll<Self::Output> {
468		if let Some(event) = self.event_queue.lock().unwrap().front() {
469			Poll::Ready(event.clone())
470		} else {
471			*self.waker.lock().unwrap() = Some(cx.waker().clone());
472			Poll::Pending
473		}
474	}
475}
476
477pub(crate) struct EventHandler<L: Deref + Clone + Sync + Send + 'static>
478where
479	L::Target: LdkLogger,
480{
481	event_queue: Arc<EventQueue<L>>,
482	wallet: Arc<Wallet>,
483	bump_tx_event_handler: Arc<BumpTransactionEventHandler>,
484	channel_manager: Arc<ChannelManager>,
485	connection_manager: Arc<ConnectionManager<L>>,
486	output_sweeper: Arc<Sweeper>,
487	network_graph: Arc<Graph>,
488	liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
489	payment_store: Arc<PaymentStore>,
490	peer_store: Arc<PeerStore<L>>,
491	runtime: Arc<Runtime>,
492	logger: L,
493	config: Arc<Config>,
494	static_invoice_store: Option<StaticInvoiceStore>,
495	onion_messenger: Arc<OnionMessenger>,
496	om_mailbox: Option<Arc<OnionMessageMailbox>>,
497}
498
499impl<L: Deref + Clone + Sync + Send + 'static> EventHandler<L>
500where
501	L::Target: LdkLogger,
502{
503	pub fn new(
504		event_queue: Arc<EventQueue<L>>, wallet: Arc<Wallet>,
505		bump_tx_event_handler: Arc<BumpTransactionEventHandler>,
506		channel_manager: Arc<ChannelManager>, connection_manager: Arc<ConnectionManager<L>>,
507		output_sweeper: Arc<Sweeper>, network_graph: Arc<Graph>,
508		liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
509		payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>,
510		static_invoice_store: Option<StaticInvoiceStore>, onion_messenger: Arc<OnionMessenger>,
511		om_mailbox: Option<Arc<OnionMessageMailbox>>, runtime: Arc<Runtime>, logger: L,
512		config: Arc<Config>,
513	) -> Self {
514		Self {
515			event_queue,
516			wallet,
517			bump_tx_event_handler,
518			channel_manager,
519			connection_manager,
520			output_sweeper,
521			network_graph,
522			liquidity_source,
523			payment_store,
524			peer_store,
525			logger,
526			runtime,
527			config,
528			static_invoice_store,
529			onion_messenger,
530			om_mailbox,
531		}
532	}
533
534	pub async fn handle_event(&self, event: LdkEvent) -> Result<(), ReplayEvent> {
535		match event {
536			LdkEvent::FundingGenerationReady {
537				temporary_channel_id,
538				counterparty_node_id,
539				channel_value_satoshis,
540				output_script,
541				user_channel_id,
542			} => {
543				// Construct the raw transaction with the output that is paid the amount of the
544				// channel.
545				let confirmation_target = ConfirmationTarget::ChannelFunding;
546
547				// We set nLockTime to the current height to discourage fee sniping.
548				let cur_height = self.channel_manager.current_best_block().height;
549				let locktime = LockTime::from_height(cur_height).unwrap_or(LockTime::ZERO);
550
551				// Sign the final funding transaction and broadcast it.
552				let channel_amount = Amount::from_sat(channel_value_satoshis);
553				match self.wallet.create_funding_transaction(
554					output_script,
555					channel_amount,
556					confirmation_target,
557					locktime,
558				) {
559					Ok(final_tx) => {
560						let needs_manual_broadcast =
561							self.liquidity_source.as_ref().map_or(false, |ls| {
562								ls.as_ref().lsps2_channel_needs_manual_broadcast(
563									counterparty_node_id,
564									user_channel_id,
565								)
566							});
567
568						let result = if needs_manual_broadcast {
569							self.liquidity_source.as_ref().map(|ls| {
570								ls.lsps2_store_funding_transaction(
571									user_channel_id,
572									counterparty_node_id,
573									final_tx.clone(),
574								);
575							});
576							self.channel_manager.funding_transaction_generated_manual_broadcast(
577								temporary_channel_id,
578								counterparty_node_id,
579								final_tx,
580							)
581						} else {
582							self.channel_manager.funding_transaction_generated(
583								temporary_channel_id,
584								counterparty_node_id,
585								final_tx,
586							)
587						};
588
589						match result {
590							Ok(()) => {},
591							Err(APIError::APIMisuseError { err }) => {
592								log_error!(
593									self.logger,
594									"Encountered APIMisuseError, this should never happen: {}",
595									err
596								);
597								debug_assert!(false, "APIMisuseError: {}", err);
598							},
599							Err(APIError::ChannelUnavailable { err }) => {
600								log_error!(
601									self.logger,
602									"Failed to process funding transaction as channel went away before we could fund it: {}",
603									err
604								)
605							},
606							Err(err) => {
607								log_error!(
608									self.logger,
609									"Failed to process funding transaction: {:?}",
610									err
611								)
612							},
613						}
614					},
615					Err(err) => {
616						log_error!(self.logger, "Failed to create funding transaction: {}", err);
617						self.channel_manager
618							.force_close_broadcasting_latest_txn(
619								&temporary_channel_id,
620								&counterparty_node_id,
621								"Failed to create funding transaction".to_string(),
622							)
623							.unwrap_or_else(|e| {
624								log_error!(self.logger, "Failed to force close channel after funding generation failed: {:?}", e);
625								debug_assert!(false,
626									"Failed to force close channel after funding generation failed"
627								);
628							});
629					},
630				}
631			},
632			LdkEvent::FundingTxBroadcastSafe { user_channel_id, counterparty_node_id, .. } => {
633				self.liquidity_source.as_ref().map(|ls| {
634					ls.lsps2_funding_tx_broadcast_safe(user_channel_id, counterparty_node_id);
635				});
636			},
637			LdkEvent::PaymentClaimable {
638				payment_hash,
639				purpose,
640				amount_msat,
641				claim_deadline,
642				onion_fields,
643				counterparty_skimmed_fee_msat,
644				..
645			} => {
646				let payment_id = PaymentId(payment_hash.0);
647				if let Some(info) = self.payment_store.get(&payment_id) {
648					if info.direction == PaymentDirection::Outbound {
649						log_info!(
650							self.logger,
651							"Refused inbound payment with ID {}: circular payments are unsupported.",
652							payment_id
653						);
654						self.channel_manager.fail_htlc_backwards(&payment_hash);
655
656						let update = PaymentDetailsUpdate {
657							status: Some(PaymentStatus::Failed),
658							..PaymentDetailsUpdate::new(payment_id)
659						};
660						match self.payment_store.update(&update) {
661							Ok(_) => return Ok(()),
662							Err(e) => {
663								log_error!(self.logger, "Failed to access payment store: {}", e);
664								return Err(ReplayEvent());
665							},
666						};
667					}
668
669					if info.status == PaymentStatus::Succeeded
670						|| matches!(info.kind, PaymentKind::Spontaneous { .. })
671					{
672						log_info!(
673							self.logger,
674							"Refused duplicate inbound payment from payment hash {} of {}msat",
675							hex_utils::to_string(&payment_hash.0),
676							amount_msat,
677						);
678						self.channel_manager.fail_htlc_backwards(&payment_hash);
679
680						let update = PaymentDetailsUpdate {
681							status: Some(PaymentStatus::Failed),
682							..PaymentDetailsUpdate::new(payment_id)
683						};
684						match self.payment_store.update(&update) {
685							Ok(_) => return Ok(()),
686							Err(e) => {
687								log_error!(self.logger, "Failed to access payment store: {}", e);
688								return Err(ReplayEvent());
689							},
690						};
691					}
692
693					let max_total_opening_fee_msat = match info.kind {
694						PaymentKind::Bolt11Jit { lsp_fee_limits, .. } => {
695							lsp_fee_limits
696								.max_total_opening_fee_msat
697								.or_else(|| {
698									lsp_fee_limits.max_proportional_opening_fee_ppm_msat.and_then(
699										|max_prop_fee| {
700											// If it's a variable amount payment, compute the actual fee.
701											compute_opening_fee(amount_msat, 0, max_prop_fee)
702										},
703									)
704								})
705								.unwrap_or(0)
706						},
707						_ => 0,
708					};
709
710					if counterparty_skimmed_fee_msat > max_total_opening_fee_msat {
711						log_info!(
712							self.logger,
713							"Refusing inbound payment with hash {} as the counterparty-withheld fee of {}msat exceeds our limit of {}msat",
714							hex_utils::to_string(&payment_hash.0),
715							counterparty_skimmed_fee_msat,
716							max_total_opening_fee_msat,
717						);
718						self.channel_manager.fail_htlc_backwards(&payment_hash);
719
720						let update = PaymentDetailsUpdate {
721							hash: Some(Some(payment_hash)),
722							status: Some(PaymentStatus::Failed),
723							..PaymentDetailsUpdate::new(payment_id)
724						};
725						match self.payment_store.update(&update) {
726							Ok(_) => return Ok(()),
727							Err(e) => {
728								log_error!(self.logger, "Failed to access payment store: {}", e);
729								return Err(ReplayEvent());
730							},
731						};
732					}
733
734					// If the LSP skimmed anything, update our stored payment.
735					if counterparty_skimmed_fee_msat > 0 {
736						match info.kind {
737							PaymentKind::Bolt11Jit { .. } => {
738								let update = PaymentDetailsUpdate {
739									counterparty_skimmed_fee_msat: Some(Some(counterparty_skimmed_fee_msat)),
740									..PaymentDetailsUpdate::new(payment_id)
741								};
742								match self.payment_store.update(&update) {
743									Ok(_) => (),
744									Err(e) => {
745										log_error!(self.logger, "Failed to access payment store: {}", e);
746										return Err(ReplayEvent());
747									},
748								};
749							}
750							_ => debug_assert!(false, "We only expect the counterparty to get away with withholding fees for JIT payments."),
751						}
752					}
753
754					// If this is known by the store but ChannelManager doesn't know the preimage,
755					// the payment has been registered via `_for_hash` variants and needs to be manually claimed via
756					// user interaction.
757					match info.kind {
758						PaymentKind::Bolt11 { preimage, .. }
759						| PaymentKind::Bolt11Jit { preimage, .. } => {
760							if purpose.preimage().is_none() {
761								debug_assert!(
762									preimage.is_none(),
763									"We would have registered the preimage if we knew"
764								);
765
766								let custom_records = onion_fields
767									.map(|cf| {
768										cf.custom_tlvs().into_iter().map(|tlv| tlv.into()).collect()
769									})
770									.unwrap_or_default();
771								let event = Event::PaymentClaimable {
772									payment_id,
773									payment_hash,
774									claimable_amount_msat: amount_msat,
775									claim_deadline,
776									custom_records,
777								};
778								match self.event_queue.add_event(event).await {
779									Ok(_) => return Ok(()),
780									Err(e) => {
781										log_error!(
782											self.logger,
783											"Failed to push to event queue: {}",
784											e
785										);
786										return Err(ReplayEvent());
787									},
788								};
789							}
790						},
791						_ => {},
792					}
793				}
794
795				log_info!(
796					self.logger,
797					"Received payment from payment hash {} of {}msat",
798					hex_utils::to_string(&payment_hash.0),
799					amount_msat,
800				);
801				let payment_preimage = match purpose {
802					PaymentPurpose::Bolt11InvoicePayment { payment_preimage, .. } => {
803						payment_preimage
804					},
805					PaymentPurpose::Bolt12OfferPayment {
806						payment_preimage,
807						payment_secret,
808						payment_context,
809						..
810					} => {
811						let payer_note = payment_context.invoice_request.payer_note_truncated;
812						let offer_id = payment_context.offer_id;
813						let quantity = payment_context.invoice_request.quantity;
814						let kind = PaymentKind::Bolt12Offer {
815							hash: Some(payment_hash),
816							preimage: payment_preimage,
817							secret: Some(payment_secret),
818							offer_id,
819							payer_note,
820							quantity,
821						};
822
823						let payment = PaymentDetails::new(
824							payment_id,
825							kind,
826							Some(amount_msat),
827							None,
828							PaymentDirection::Inbound,
829							PaymentStatus::Pending,
830						);
831
832						match self.payment_store.insert(payment) {
833							Ok(false) => (),
834							Ok(true) => {
835								log_error!(
836									self.logger,
837									"Bolt12OfferPayment with ID {} was previously known",
838									payment_id,
839								);
840								debug_assert!(false);
841							},
842							Err(e) => {
843								log_error!(
844									self.logger,
845									"Failed to insert payment with ID {}: {}",
846									payment_id,
847									e
848								);
849								debug_assert!(false);
850							},
851						}
852						payment_preimage
853					},
854					PaymentPurpose::Bolt12RefundPayment { payment_preimage, .. } => {
855						payment_preimage
856					},
857					PaymentPurpose::SpontaneousPayment(preimage) => {
858						// Since it's spontaneous, we insert it now into our store.
859						let kind = PaymentKind::Spontaneous {
860							hash: payment_hash,
861							preimage: Some(preimage),
862						};
863
864						let payment = PaymentDetails::new(
865							payment_id,
866							kind,
867							Some(amount_msat),
868							None,
869							PaymentDirection::Inbound,
870							PaymentStatus::Pending,
871						);
872
873						match self.payment_store.insert(payment) {
874							Ok(false) => (),
875							Ok(true) => {
876								log_error!(
877									self.logger,
878									"Spontaneous payment with ID {} was previously known",
879									payment_id,
880								);
881								debug_assert!(false);
882							},
883							Err(e) => {
884								log_error!(
885									self.logger,
886									"Failed to insert payment with ID {}: {}",
887									payment_id,
888									e
889								);
890								debug_assert!(false);
891							},
892						}
893
894						Some(preimage)
895					},
896				};
897
898				if let Some(preimage) = payment_preimage {
899					self.channel_manager.claim_funds(preimage);
900				} else {
901					log_error!(
902						self.logger,
903						"Failed to claim payment with ID {}: preimage unknown.",
904						payment_id,
905					);
906					self.channel_manager.fail_htlc_backwards(&payment_hash);
907
908					let update = PaymentDetailsUpdate {
909						hash: Some(Some(payment_hash)),
910						status: Some(PaymentStatus::Failed),
911						..PaymentDetailsUpdate::new(payment_id)
912					};
913					match self.payment_store.update(&update) {
914						Ok(_) => return Ok(()),
915						Err(e) => {
916							log_error!(self.logger, "Failed to access payment store: {}", e);
917							return Err(ReplayEvent());
918						},
919					};
920				}
921			},
922			LdkEvent::PaymentClaimed {
923				payment_hash,
924				purpose,
925				amount_msat,
926				receiver_node_id: _,
927				htlcs: _,
928				sender_intended_total_msat: _,
929				onion_fields,
930				payment_id: _,
931			} => {
932				let payment_id = PaymentId(payment_hash.0);
933				log_info!(
934					self.logger,
935					"Claimed payment with ID {} from payment hash {} of {}msat.",
936					payment_id,
937					hex_utils::to_string(&payment_hash.0),
938					amount_msat,
939				);
940
941				let update = match purpose {
942					PaymentPurpose::Bolt11InvoicePayment {
943						payment_preimage,
944						payment_secret,
945						..
946					} => PaymentDetailsUpdate {
947						preimage: Some(payment_preimage),
948						secret: Some(Some(payment_secret)),
949						amount_msat: Some(Some(amount_msat)),
950						status: Some(PaymentStatus::Succeeded),
951						..PaymentDetailsUpdate::new(payment_id)
952					},
953					PaymentPurpose::Bolt12OfferPayment {
954						payment_preimage, payment_secret, ..
955					} => PaymentDetailsUpdate {
956						preimage: Some(payment_preimage),
957						secret: Some(Some(payment_secret)),
958						amount_msat: Some(Some(amount_msat)),
959						status: Some(PaymentStatus::Succeeded),
960						..PaymentDetailsUpdate::new(payment_id)
961					},
962					PaymentPurpose::Bolt12RefundPayment {
963						payment_preimage,
964						payment_secret,
965						..
966					} => PaymentDetailsUpdate {
967						preimage: Some(payment_preimage),
968						secret: Some(Some(payment_secret)),
969						amount_msat: Some(Some(amount_msat)),
970						status: Some(PaymentStatus::Succeeded),
971						..PaymentDetailsUpdate::new(payment_id)
972					},
973					PaymentPurpose::SpontaneousPayment(preimage) => PaymentDetailsUpdate {
974						preimage: Some(Some(preimage)),
975						amount_msat: Some(Some(amount_msat)),
976						status: Some(PaymentStatus::Succeeded),
977						..PaymentDetailsUpdate::new(payment_id)
978					},
979				};
980
981				match self.payment_store.update(&update) {
982					Ok(DataStoreUpdateResult::Updated) | Ok(DataStoreUpdateResult::Unchanged) => (
983						// No need to do anything if the idempotent update was applied, which might
984						// be the result of a replayed event.
985					),
986					Ok(DataStoreUpdateResult::NotFound) => {
987						log_error!(
988							self.logger,
989							"Claimed payment with ID {} couldn't be found in store",
990							payment_id,
991						);
992					},
993					Err(e) => {
994						log_error!(
995							self.logger,
996							"Failed to update payment with ID {}: {}",
997							payment_id,
998							e
999						);
1000						return Err(ReplayEvent());
1001					},
1002				}
1003
1004				let event = Event::PaymentReceived {
1005					payment_id: Some(payment_id),
1006					payment_hash,
1007					amount_msat,
1008					custom_records: onion_fields
1009						.map(|cf| cf.custom_tlvs().into_iter().map(|tlv| tlv.into()).collect())
1010						.unwrap_or_default(),
1011				};
1012				match self.event_queue.add_event(event).await {
1013					Ok(_) => return Ok(()),
1014					Err(e) => {
1015						log_error!(self.logger, "Failed to push to event queue: {}", e);
1016						return Err(ReplayEvent());
1017					},
1018				};
1019			},
1020			LdkEvent::PaymentSent {
1021				payment_id,
1022				payment_preimage,
1023				payment_hash,
1024				fee_paid_msat,
1025				..
1026			} => {
1027				let payment_id = if let Some(id) = payment_id {
1028					id
1029				} else {
1030					debug_assert!(false, "payment_id should always be set.");
1031					return Ok(());
1032				};
1033
1034				let update = PaymentDetailsUpdate {
1035					hash: Some(Some(payment_hash)),
1036					preimage: Some(Some(payment_preimage)),
1037					fee_paid_msat: Some(fee_paid_msat),
1038					status: Some(PaymentStatus::Succeeded),
1039					..PaymentDetailsUpdate::new(payment_id)
1040				};
1041
1042				match self.payment_store.update(&update) {
1043					Ok(_) => {},
1044					Err(e) => {
1045						log_error!(self.logger, "Failed to access payment store: {}", e);
1046						return Err(ReplayEvent());
1047					},
1048				};
1049
1050				self.payment_store.get(&payment_id).map(|payment| {
1051					log_info!(
1052						self.logger,
1053						"Successfully sent payment of {}msat{} from \
1054						payment hash {:?} with preimage {:?}",
1055						payment.amount_msat.unwrap(),
1056						if let Some(fee) = fee_paid_msat {
1057							format!(" (fee {} msat)", fee)
1058						} else {
1059							"".to_string()
1060						},
1061						hex_utils::to_string(&payment_hash.0),
1062						hex_utils::to_string(&payment_preimage.0)
1063					);
1064				});
1065				let event = Event::PaymentSuccessful {
1066					payment_id: Some(payment_id),
1067					payment_hash,
1068					payment_preimage: Some(payment_preimage),
1069					fee_paid_msat,
1070				};
1071
1072				match self.event_queue.add_event(event).await {
1073					Ok(_) => return Ok(()),
1074					Err(e) => {
1075						log_error!(self.logger, "Failed to push to event queue: {}", e);
1076						return Err(ReplayEvent());
1077					},
1078				};
1079			},
1080			LdkEvent::PaymentFailed { payment_id, payment_hash, reason, .. } => {
1081				log_info!(
1082					self.logger,
1083					"Failed to send payment with ID {} due to {:?}.",
1084					payment_id,
1085					reason
1086				);
1087
1088				let update = PaymentDetailsUpdate {
1089					hash: Some(payment_hash),
1090					status: Some(PaymentStatus::Failed),
1091					..PaymentDetailsUpdate::new(payment_id)
1092				};
1093				match self.payment_store.update(&update) {
1094					Ok(_) => {},
1095					Err(e) => {
1096						log_error!(self.logger, "Failed to access payment store: {}", e);
1097						return Err(ReplayEvent());
1098					},
1099				};
1100
1101				let event =
1102					Event::PaymentFailed { payment_id: Some(payment_id), payment_hash, reason };
1103				match self.event_queue.add_event(event).await {
1104					Ok(_) => return Ok(()),
1105					Err(e) => {
1106						log_error!(self.logger, "Failed to push to event queue: {}", e);
1107						return Err(ReplayEvent());
1108					},
1109				};
1110			},
1111
1112			LdkEvent::PaymentPathSuccessful { .. } => {},
1113			LdkEvent::PaymentPathFailed { .. } => {},
1114			LdkEvent::ProbeSuccessful { .. } => {},
1115			LdkEvent::ProbeFailed { .. } => {},
1116			LdkEvent::HTLCHandlingFailed { failure_type, .. } => {
1117				if let Some(liquidity_source) = self.liquidity_source.as_ref() {
1118					liquidity_source.handle_htlc_handling_failed(failure_type).await;
1119				}
1120			},
1121			LdkEvent::SpendableOutputs { outputs, channel_id } => {
1122				match self
1123					.output_sweeper
1124					.track_spendable_outputs(outputs, channel_id, true, None)
1125					.await
1126				{
1127					Ok(_) => return Ok(()),
1128					Err(_) => {
1129						log_error!(self.logger, "Failed to track spendable outputs");
1130						return Err(ReplayEvent());
1131					},
1132				};
1133			},
1134			LdkEvent::OpenChannelRequest {
1135				temporary_channel_id,
1136				counterparty_node_id,
1137				funding_satoshis,
1138				channel_type,
1139				channel_negotiation_type: _,
1140				is_announced,
1141				params: _,
1142			} => {
1143				if is_announced {
1144					if let Err(err) = may_announce_channel(&*self.config) {
1145						log_error!(self.logger, "Rejecting inbound announced channel from peer {} due to missing configuration: {}", counterparty_node_id, err);
1146
1147						self.channel_manager
1148							.force_close_broadcasting_latest_txn(
1149								&temporary_channel_id,
1150								&counterparty_node_id,
1151								"Channel request rejected".to_string(),
1152							)
1153							.unwrap_or_else(|e| {
1154								log_error!(self.logger, "Failed to reject channel: {:?}", e)
1155							});
1156						return Ok(());
1157					}
1158				}
1159
1160				let anchor_channel = channel_type.requires_anchors_zero_fee_htlc_tx();
1161				if anchor_channel {
1162					if let Some(anchor_channels_config) =
1163						self.config.anchor_channels_config.as_ref()
1164					{
1165						let cur_anchor_reserve_sats = crate::total_anchor_channels_reserve_sats(
1166							&self.channel_manager,
1167							&self.config,
1168						);
1169						let spendable_amount_sats = self
1170							.wallet
1171							.get_spendable_amount_sats(cur_anchor_reserve_sats)
1172							.unwrap_or(0);
1173
1174						let required_amount_sats = if anchor_channels_config
1175							.trusted_peers_no_reserve
1176							.contains(&counterparty_node_id)
1177						{
1178							0
1179						} else {
1180							anchor_channels_config.per_channel_reserve_sats
1181						};
1182
1183						if spendable_amount_sats < required_amount_sats {
1184							log_error!(
1185								self.logger,
1186								"Rejecting inbound Anchor channel from peer {} due to insufficient available on-chain reserves. Available: {}/{}sats",
1187								counterparty_node_id,
1188								spendable_amount_sats,
1189								required_amount_sats,
1190							);
1191							self.channel_manager
1192								.force_close_broadcasting_latest_txn(
1193									&temporary_channel_id,
1194									&counterparty_node_id,
1195									"Channel request rejected".to_string(),
1196								)
1197								.unwrap_or_else(|e| {
1198									log_error!(self.logger, "Failed to reject channel: {:?}", e)
1199								});
1200							return Ok(());
1201						}
1202					} else {
1203						log_error!(
1204							self.logger,
1205							"Rejecting inbound channel from peer {} due to Anchor channels being disabled.",
1206							counterparty_node_id,
1207						);
1208						self.channel_manager
1209							.force_close_broadcasting_latest_txn(
1210								&temporary_channel_id,
1211								&counterparty_node_id,
1212								"Channel request rejected".to_string(),
1213							)
1214							.unwrap_or_else(|e| {
1215								log_error!(self.logger, "Failed to reject channel: {:?}", e)
1216							});
1217						return Ok(());
1218					}
1219				}
1220
1221				let user_channel_id: u128 = rng().random();
1222				let allow_0conf = self.config.trusted_peers_0conf.contains(&counterparty_node_id);
1223				let mut channel_override_config = None;
1224				if let Some((lsp_node_id, _)) = self
1225					.liquidity_source
1226					.as_ref()
1227					.and_then(|ls| ls.as_ref().get_lsps2_lsp_details())
1228				{
1229					if lsp_node_id == counterparty_node_id {
1230						// When we're an LSPS2 client, allow claiming underpaying HTLCs as the LSP will skim off some fee. We'll
1231						// check that they don't take too much before claiming.
1232						//
1233						// We also set maximum allowed inbound HTLC value in flight
1234						// to 100%. We should eventually be able to set this on a per-channel basis, but for
1235						// now we just bump the default for all channels.
1236						channel_override_config = Some(ChannelConfigOverrides {
1237							handshake_overrides: Some(ChannelHandshakeConfigUpdate {
1238								max_inbound_htlc_value_in_flight_percent_of_channel: Some(100),
1239								..Default::default()
1240							}),
1241							update_overrides: Some(ChannelConfigUpdate {
1242								accept_underpaying_htlcs: Some(true),
1243								..Default::default()
1244							}),
1245						});
1246					}
1247				}
1248				let res = if allow_0conf {
1249					self.channel_manager.accept_inbound_channel_from_trusted_peer_0conf(
1250						&temporary_channel_id,
1251						&counterparty_node_id,
1252						user_channel_id,
1253						channel_override_config,
1254					)
1255				} else {
1256					self.channel_manager.accept_inbound_channel(
1257						&temporary_channel_id,
1258						&counterparty_node_id,
1259						user_channel_id,
1260						channel_override_config,
1261					)
1262				};
1263
1264				match res {
1265					Ok(()) => {
1266						log_info!(
1267							self.logger,
1268							"Accepting inbound{}{} channel of {}sats from{} peer {}",
1269							if allow_0conf { " 0conf" } else { "" },
1270							if anchor_channel { " Anchor" } else { "" },
1271							funding_satoshis,
1272							if allow_0conf { " trusted" } else { "" },
1273							counterparty_node_id,
1274						);
1275					},
1276					Err(e) => {
1277						log_error!(
1278							self.logger,
1279							"Error while accepting inbound{}{} channel from{} peer {}: {:?}",
1280							if allow_0conf { " 0conf" } else { "" },
1281							if anchor_channel { " Anchor" } else { "" },
1282							counterparty_node_id,
1283							if allow_0conf { " trusted" } else { "" },
1284							e,
1285						);
1286					},
1287				}
1288			},
1289			LdkEvent::PaymentForwarded {
1290				prev_channel_id,
1291				next_channel_id,
1292				prev_user_channel_id,
1293				next_user_channel_id,
1294				prev_node_id,
1295				next_node_id,
1296				total_fee_earned_msat,
1297				skimmed_fee_msat,
1298				claim_from_onchain_tx,
1299				outbound_amount_forwarded_msat,
1300			} => {
1301				{
1302					let read_only_network_graph = self.network_graph.read_only();
1303					let nodes = read_only_network_graph.nodes();
1304					let channels = self.channel_manager.list_channels();
1305
1306					let node_str = |channel_id: &Option<ChannelId>| {
1307						channel_id
1308							.and_then(|channel_id| {
1309								channels.iter().find(|c| c.channel_id == channel_id)
1310							})
1311							.and_then(|channel| {
1312								nodes.get(&NodeId::from_pubkey(&channel.counterparty.node_id))
1313							})
1314							.map_or("private_node".to_string(), |node| {
1315								node.announcement_info
1316									.as_ref()
1317									.map_or("unnamed node".to_string(), |ann| {
1318										format!("node {}", ann.alias())
1319									})
1320							})
1321					};
1322					let channel_str = |channel_id: &Option<ChannelId>| {
1323						channel_id
1324							.map(|channel_id| format!(" with channel {}", channel_id))
1325							.unwrap_or_default()
1326					};
1327					let from_prev_str = format!(
1328						" from {}{}",
1329						node_str(&prev_channel_id),
1330						channel_str(&prev_channel_id)
1331					);
1332					let to_next_str = format!(
1333						" to {}{}",
1334						node_str(&next_channel_id),
1335						channel_str(&next_channel_id)
1336					);
1337
1338					let fee_earned = total_fee_earned_msat.unwrap_or(0);
1339					if claim_from_onchain_tx {
1340						log_info!(
1341						self.logger,
1342						"Forwarded payment{}{} of {}msat, earning {}msat in fees from claiming onchain.",
1343						from_prev_str,
1344						to_next_str,
1345						outbound_amount_forwarded_msat.unwrap_or(0),
1346						fee_earned,
1347					);
1348					} else {
1349						log_info!(
1350							self.logger,
1351							"Forwarded payment{}{} of {}msat, earning {}msat in fees.",
1352							from_prev_str,
1353							to_next_str,
1354							outbound_amount_forwarded_msat.unwrap_or(0),
1355							fee_earned,
1356						);
1357					}
1358				}
1359
1360				if let Some(liquidity_source) = self.liquidity_source.as_ref() {
1361					let skimmed_fee_msat = skimmed_fee_msat.unwrap_or(0);
1362					liquidity_source
1363						.handle_payment_forwarded(next_channel_id, skimmed_fee_msat)
1364						.await;
1365				}
1366
1367				let event = Event::PaymentForwarded {
1368					prev_channel_id: prev_channel_id.expect("prev_channel_id expected for events generated by LDK versions greater than 0.0.107."),
1369					next_channel_id: next_channel_id.expect("next_channel_id expected for events generated by LDK versions greater than 0.0.107."),
1370					prev_user_channel_id: prev_user_channel_id.map(UserChannelId),
1371					next_user_channel_id: next_user_channel_id.map(UserChannelId),
1372					prev_node_id,
1373					next_node_id,
1374					total_fee_earned_msat,
1375					skimmed_fee_msat,
1376					claim_from_onchain_tx,
1377					outbound_amount_forwarded_msat,
1378				};
1379				self.event_queue.add_event(event).await.map_err(|e| {
1380					log_error!(self.logger, "Failed to push to event queue: {}", e);
1381					ReplayEvent()
1382				})?;
1383			},
1384			LdkEvent::ChannelPending {
1385				channel_id,
1386				user_channel_id,
1387				former_temporary_channel_id,
1388				counterparty_node_id,
1389				funding_txo,
1390				..
1391			} => {
1392				log_info!(
1393					self.logger,
1394					"New channel {} with counterparty {} has been created and is pending confirmation on chain.",
1395					channel_id,
1396					counterparty_node_id,
1397				);
1398
1399				let event = Event::ChannelPending {
1400					channel_id,
1401					user_channel_id: UserChannelId(user_channel_id),
1402					former_temporary_channel_id: former_temporary_channel_id.unwrap(),
1403					counterparty_node_id,
1404					funding_txo,
1405				};
1406				match self.event_queue.add_event(event).await {
1407					Ok(_) => {},
1408					Err(e) => {
1409						log_error!(self.logger, "Failed to push to event queue: {}", e);
1410						return Err(ReplayEvent());
1411					},
1412				};
1413
1414				let network_graph = self.network_graph.read_only();
1415				let channels =
1416					self.channel_manager.list_channels_with_counterparty(&counterparty_node_id);
1417				if let Some(pending_channel) =
1418					channels.into_iter().find(|c| c.channel_id == channel_id)
1419				{
1420					if !pending_channel.is_outbound
1421						&& self.peer_store.get_peer(&counterparty_node_id).is_none()
1422					{
1423						if let Some(address) = network_graph
1424							.nodes()
1425							.get(&NodeId::from_pubkey(&counterparty_node_id))
1426							.and_then(|node_info| node_info.announcement_info.as_ref())
1427							.and_then(|ann_info| ann_info.addresses().first())
1428						{
1429							let peer = PeerInfo {
1430								node_id: counterparty_node_id,
1431								address: address.clone(),
1432							};
1433
1434							self.peer_store.add_peer(peer).unwrap_or_else(|e| {
1435								log_error!(
1436									self.logger,
1437									"Failed to add peer {} to peer store: {}",
1438									counterparty_node_id,
1439									e
1440								);
1441							});
1442						}
1443					}
1444				}
1445			},
1446			LdkEvent::ChannelReady {
1447				channel_id,
1448				user_channel_id,
1449				counterparty_node_id,
1450				funding_txo,
1451				..
1452			} => {
1453				if let Some(funding_txo) = funding_txo {
1454					log_info!(
1455						self.logger,
1456						"Channel {} with counterparty {} ready to be used with funding_txo {}",
1457						channel_id,
1458						counterparty_node_id,
1459						funding_txo,
1460					);
1461				} else {
1462					log_info!(
1463						self.logger,
1464						"Channel {} with counterparty {} ready to be used",
1465						channel_id,
1466						counterparty_node_id,
1467					);
1468				}
1469
1470				if let Some(liquidity_source) = self.liquidity_source.as_ref() {
1471					liquidity_source
1472						.handle_channel_ready(user_channel_id, &channel_id, &counterparty_node_id)
1473						.await;
1474				}
1475
1476				let event = Event::ChannelReady {
1477					channel_id,
1478					user_channel_id: UserChannelId(user_channel_id),
1479					counterparty_node_id: Some(counterparty_node_id),
1480					funding_txo,
1481				};
1482				match self.event_queue.add_event(event).await {
1483					Ok(_) => {},
1484					Err(e) => {
1485						log_error!(self.logger, "Failed to push to event queue: {}", e);
1486						return Err(ReplayEvent());
1487					},
1488				};
1489			},
1490			LdkEvent::ChannelClosed {
1491				channel_id,
1492				reason,
1493				user_channel_id,
1494				counterparty_node_id,
1495				..
1496			} => {
1497				log_info!(self.logger, "Channel {} closed due to: {}", channel_id, reason);
1498
1499				let event = Event::ChannelClosed {
1500					channel_id,
1501					user_channel_id: UserChannelId(user_channel_id),
1502					counterparty_node_id,
1503					reason: Some(reason),
1504				};
1505
1506				match self.event_queue.add_event(event).await {
1507					Ok(_) => {},
1508					Err(e) => {
1509						log_error!(self.logger, "Failed to push to event queue: {}", e);
1510						return Err(ReplayEvent());
1511					},
1512				};
1513			},
1514			LdkEvent::DiscardFunding { .. } => {},
1515			LdkEvent::HTLCIntercepted {
1516				requested_next_hop_scid,
1517				intercept_id,
1518				expected_outbound_amount_msat,
1519				payment_hash,
1520				..
1521			} => {
1522				if let Some(liquidity_source) = self.liquidity_source.as_ref() {
1523					liquidity_source
1524						.handle_htlc_intercepted(
1525							requested_next_hop_scid,
1526							intercept_id,
1527							expected_outbound_amount_msat,
1528							payment_hash,
1529						)
1530						.await;
1531				}
1532			},
1533			LdkEvent::InvoiceReceived { .. } => {
1534				debug_assert!(false, "We currently don't handle BOLT12 invoices manually, so this event should never be emitted.");
1535			},
1536			LdkEvent::ConnectionNeeded { node_id, addresses } => {
1537				let spawn_logger = self.logger.clone();
1538				let spawn_cm = Arc::clone(&self.connection_manager);
1539				let future = async move {
1540					for addr in &addresses {
1541						match spawn_cm.connect_peer_if_necessary(node_id, addr.clone()).await {
1542							Ok(()) => {
1543								return;
1544							},
1545							Err(e) => {
1546								log_error!(
1547									spawn_logger,
1548									"Failed to establish connection to peer {}@{}: {}",
1549									node_id,
1550									addr,
1551									e
1552								);
1553							},
1554						}
1555					}
1556				};
1557				self.runtime.spawn_cancellable_background_task(future);
1558			},
1559			LdkEvent::BumpTransaction(bte) => {
1560				match bte {
1561					BumpTransactionEvent::ChannelClose {
1562						ref channel_id,
1563						ref counterparty_node_id,
1564						..
1565					} => {
1566						// Skip bumping channel closes if our counterparty is trusted.
1567						if let Some(anchor_channels_config) =
1568							self.config.anchor_channels_config.as_ref()
1569						{
1570							if anchor_channels_config
1571								.trusted_peers_no_reserve
1572								.contains(counterparty_node_id)
1573							{
1574								log_debug!(self.logger,
1575									"Ignoring BumpTransactionEvent::ChannelClose for channel {} due to trusted counterparty {}",
1576									channel_id, counterparty_node_id
1577								);
1578								return Ok(());
1579							}
1580						}
1581					},
1582					BumpTransactionEvent::HTLCResolution { .. } => {},
1583				}
1584
1585				self.bump_tx_event_handler.handle_event(&bte).await;
1586			},
1587			LdkEvent::OnionMessageIntercepted { peer_node_id, message } => {
1588				if let Some(om_mailbox) = self.om_mailbox.as_ref() {
1589					om_mailbox.onion_message_intercepted(peer_node_id, message);
1590				} else {
1591					log_trace!(
1592						self.logger,
1593						"Onion message intercepted, but no onion message mailbox available"
1594					);
1595				}
1596			},
1597			LdkEvent::OnionMessagePeerConnected { peer_node_id } => {
1598				if let Some(om_mailbox) = self.om_mailbox.as_ref() {
1599					let messages = om_mailbox.onion_message_peer_connected(peer_node_id);
1600
1601					for message in messages {
1602						if let Err(e) =
1603							self.onion_messenger.forward_onion_message(message, &peer_node_id)
1604						{
1605							log_trace!(
1606								self.logger,
1607								"Failed to forward onion message to peer {}: {:?}",
1608								peer_node_id,
1609								e
1610							);
1611						}
1612					}
1613				}
1614			},
1615
1616			LdkEvent::PersistStaticInvoice {
1617				invoice,
1618				invoice_request_path,
1619				invoice_slot,
1620				recipient_id,
1621				invoice_persisted_path,
1622			} => {
1623				if let Some(store) = self.static_invoice_store.as_ref() {
1624					match store
1625						.handle_persist_static_invoice(
1626							invoice,
1627							invoice_request_path,
1628							invoice_slot,
1629							recipient_id,
1630						)
1631						.await
1632					{
1633						Ok(_) => {
1634							self.channel_manager.static_invoice_persisted(invoice_persisted_path);
1635						},
1636						Err(e) => {
1637							log_error!(self.logger, "Failed to persist static invoice: {}", e);
1638							return Err(ReplayEvent());
1639						},
1640					};
1641				}
1642			},
1643			LdkEvent::StaticInvoiceRequested {
1644				recipient_id,
1645				invoice_slot,
1646				reply_path,
1647				invoice_request,
1648			} => {
1649				if let Some(store) = self.static_invoice_store.as_ref() {
1650					let invoice =
1651						store.handle_static_invoice_requested(&recipient_id, invoice_slot).await;
1652
1653					match invoice {
1654						Ok(Some((invoice, invoice_request_path))) => {
1655							if let Err(e) = self.channel_manager.respond_to_static_invoice_request(
1656								invoice,
1657								reply_path,
1658								invoice_request,
1659								invoice_request_path,
1660							) {
1661								log_error!(self.logger, "Failed to send static invoice: {:?}", e);
1662							}
1663						},
1664						Ok(None) => {
1665							log_trace!(
1666								self.logger,
1667								"No static invoice found for recipient {} and slot {}",
1668								hex_utils::to_string(&recipient_id),
1669								invoice_slot
1670							);
1671						},
1672						Err(e) => {
1673							log_error!(self.logger, "Failed to retrieve static invoice: {}", e);
1674							return Err(ReplayEvent());
1675						},
1676					}
1677				}
1678			},
1679			// TODO(splicing): Revisit error handling once splicing API is settled in LDK 0.3
1680			LdkEvent::FundingTransactionReadyForSigning {
1681				channel_id,
1682				counterparty_node_id,
1683				unsigned_transaction,
1684				..
1685			} => match self.wallet.sign_owned_inputs(unsigned_transaction) {
1686				Ok(partially_signed_tx) => {
1687					match self.channel_manager.funding_transaction_signed(
1688						&channel_id,
1689						&counterparty_node_id,
1690						partially_signed_tx,
1691					) {
1692						Ok(()) => {
1693							log_info!(
1694								self.logger,
1695								"Signed funding transaction for channel {} with counterparty {}",
1696								channel_id,
1697								counterparty_node_id
1698							);
1699						},
1700						Err(e) => {
1701							// TODO(splicing): Abort splice once supported in LDK 0.3
1702							debug_assert!(false, "Failed signing funding transaction: {:?}", e);
1703							log_error!(self.logger, "Failed signing funding transaction: {:?}", e);
1704						},
1705					}
1706				},
1707				Err(()) => log_error!(self.logger, "Failed signing funding transaction"),
1708			},
1709			LdkEvent::SplicePending {
1710				channel_id,
1711				user_channel_id,
1712				counterparty_node_id,
1713				new_funding_txo,
1714				..
1715			} => {
1716				log_info!(
1717					self.logger,
1718					"Channel {} with counterparty {} pending splice with funding_txo {}",
1719					channel_id,
1720					counterparty_node_id,
1721					new_funding_txo,
1722				);
1723
1724				let event = Event::SplicePending {
1725					channel_id,
1726					user_channel_id: UserChannelId(user_channel_id),
1727					counterparty_node_id,
1728					new_funding_txo,
1729				};
1730
1731				match self.event_queue.add_event(event).await {
1732					Ok(_) => {},
1733					Err(e) => {
1734						log_error!(self.logger, "Failed to push to event queue: {}", e);
1735						return Err(ReplayEvent());
1736					},
1737				};
1738			},
1739			LdkEvent::SpliceFailed {
1740				channel_id,
1741				user_channel_id,
1742				counterparty_node_id,
1743				abandoned_funding_txo,
1744				contributed_outputs,
1745				..
1746			} => {
1747				if let Some(funding_txo) = abandoned_funding_txo {
1748					log_info!(
1749						self.logger,
1750						"Channel {} with counterparty {} failed splice with funding_txo {}",
1751						channel_id,
1752						counterparty_node_id,
1753						funding_txo,
1754					);
1755				} else {
1756					log_info!(
1757						self.logger,
1758						"Channel {} with counterparty {} failed splice",
1759						channel_id,
1760						counterparty_node_id,
1761					);
1762				}
1763
1764				let tx = bitcoin::Transaction {
1765					version: bitcoin::transaction::Version::TWO,
1766					lock_time: bitcoin::absolute::LockTime::ZERO,
1767					input: vec![],
1768					output: contributed_outputs,
1769				};
1770				if let Err(e) = self.wallet.cancel_tx(&tx) {
1771					log_error!(self.logger, "Failed reclaiming unused addresses: {}", e);
1772					return Err(ReplayEvent());
1773				}
1774
1775				let event = Event::SpliceFailed {
1776					channel_id,
1777					user_channel_id: UserChannelId(user_channel_id),
1778					counterparty_node_id,
1779					abandoned_funding_txo,
1780				};
1781
1782				match self.event_queue.add_event(event).await {
1783					Ok(_) => {},
1784					Err(e) => {
1785						log_error!(self.logger, "Failed to push to event queue: {}", e);
1786						return Err(ReplayEvent());
1787					},
1788				};
1789			},
1790		}
1791		Ok(())
1792	}
1793}
1794
1795#[cfg(test)]
1796mod tests {
1797	use std::sync::atomic::{AtomicU16, Ordering};
1798	use std::time::Duration;
1799
1800	use lightning::util::test_utils::TestLogger;
1801
1802	use super::*;
1803	use crate::io::test_utils::InMemoryStore;
1804
1805	#[tokio::test]
1806	async fn event_queue_persistence() {
1807		let store: Arc<DynStore> = Arc::new(InMemoryStore::new());
1808		let logger = Arc::new(TestLogger::new());
1809		let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger)));
1810		assert_eq!(event_queue.next_event(), None);
1811
1812		let expected_event = Event::ChannelReady {
1813			channel_id: ChannelId([23u8; 32]),
1814			user_channel_id: UserChannelId(2323),
1815			counterparty_node_id: None,
1816			funding_txo: None,
1817		};
1818		event_queue.add_event(expected_event.clone()).await.unwrap();
1819
1820		// Check we get the expected event and that it is returned until we mark it handled.
1821		for _ in 0..5 {
1822			assert_eq!(event_queue.next_event_async().await, expected_event);
1823			assert_eq!(event_queue.next_event(), Some(expected_event.clone()));
1824		}
1825
1826		// Check we can read back what we persisted.
1827		let persisted_bytes = KVStore::read(
1828			&*store,
1829			EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
1830			EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
1831			EVENT_QUEUE_PERSISTENCE_KEY,
1832		)
1833		.await
1834		.unwrap();
1835		let deser_event_queue =
1836			EventQueue::read(&mut &persisted_bytes[..], (Arc::clone(&store), logger)).unwrap();
1837		assert_eq!(deser_event_queue.next_event_async().await, expected_event);
1838
1839		event_queue.event_handled().await.unwrap();
1840		assert_eq!(event_queue.next_event(), None);
1841	}
1842
1843	#[tokio::test]
1844	async fn event_queue_concurrency() {
1845		let store: Arc<DynStore> = Arc::new(InMemoryStore::new());
1846		let logger = Arc::new(TestLogger::new());
1847		let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger)));
1848		assert_eq!(event_queue.next_event(), None);
1849
1850		let expected_event = Event::ChannelReady {
1851			channel_id: ChannelId([23u8; 32]),
1852			user_channel_id: UserChannelId(2323),
1853			counterparty_node_id: None,
1854			funding_txo: None,
1855		};
1856
1857		// Check `next_event_async` won't return if the queue is empty and always rather timeout.
1858		tokio::select! {
1859			_ = tokio::time::sleep(Duration::from_secs(1)) => {
1860				// Timeout
1861			}
1862			_ = event_queue.next_event_async() => {
1863				panic!();
1864			}
1865		}
1866
1867		assert_eq!(event_queue.next_event(), None);
1868		// Check we get the expected number of events when polling/enqueuing concurrently.
1869		let enqueued_events = AtomicU16::new(0);
1870		let received_events = AtomicU16::new(0);
1871		let mut delayed_enqueue = false;
1872
1873		for _ in 0..25 {
1874			event_queue.add_event(expected_event.clone()).await.unwrap();
1875			enqueued_events.fetch_add(1, Ordering::SeqCst);
1876		}
1877
1878		loop {
1879			tokio::select! {
1880				_ = tokio::time::sleep(Duration::from_millis(10)), if !delayed_enqueue => {
1881					event_queue.add_event(expected_event.clone()).await.unwrap();
1882					enqueued_events.fetch_add(1, Ordering::SeqCst);
1883					delayed_enqueue = true;
1884				}
1885				e = event_queue.next_event_async() => {
1886					assert_eq!(e, expected_event);
1887					event_queue.event_handled().await.unwrap();
1888					received_events.fetch_add(1, Ordering::SeqCst);
1889
1890					event_queue.add_event(expected_event.clone()).await.unwrap();
1891					enqueued_events.fetch_add(1, Ordering::SeqCst);
1892				}
1893				e = event_queue.next_event_async() => {
1894					assert_eq!(e, expected_event);
1895					event_queue.event_handled().await.unwrap();
1896					received_events.fetch_add(1, Ordering::SeqCst);
1897				}
1898			}
1899
1900			if delayed_enqueue
1901				&& received_events.load(Ordering::SeqCst) == enqueued_events.load(Ordering::SeqCst)
1902			{
1903				break;
1904			}
1905		}
1906		assert_eq!(event_queue.next_event(), None);
1907	}
1908}