ldk_node/
event.rs

1// This file is Copyright its original authors, visible in version control history.
2//
3// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
6// accordance with one or both of these licenses.
7
8use crate::types::{CustomTlvRecord, DynStore, PaymentStore, Sweeper, Wallet};
9
10use crate::{
11	hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore,
12	UserChannelId,
13};
14
15use crate::config::{may_announce_channel, Config};
16use crate::connection::ConnectionManager;
17use crate::data_store::DataStoreUpdateResult;
18use crate::fee_estimator::ConfirmationTarget;
19use crate::liquidity::LiquiditySource;
20use crate::logger::Logger;
21
22use crate::payment::store::{
23	PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus,
24};
25
26use crate::io::{
27	EVENT_QUEUE_PERSISTENCE_KEY, EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
28	EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
29};
30use crate::logger::{log_debug, log_error, log_info, LdkLogger};
31
32use lightning::events::bump_transaction::BumpTransactionEvent;
33use lightning::events::{ClosureReason, PaymentPurpose, ReplayEvent};
34use lightning::events::{Event as LdkEvent, PaymentFailureReason};
35use lightning::impl_writeable_tlv_based_enum;
36use lightning::ln::channelmanager::PaymentId;
37use lightning::ln::types::ChannelId;
38use lightning::routing::gossip::NodeId;
39use lightning::util::errors::APIError;
40use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
41
42use lightning_types::payment::{PaymentHash, PaymentPreimage};
43
44use lightning_liquidity::lsps2::utils::compute_opening_fee;
45
46use bitcoin::blockdata::locktime::absolute::LockTime;
47use bitcoin::secp256k1::PublicKey;
48use bitcoin::{Amount, OutPoint};
49
50use rand::{thread_rng, Rng};
51
52use core::future::Future;
53use core::task::{Poll, Waker};
54use std::collections::VecDeque;
55use std::ops::Deref;
56use std::sync::{Arc, Condvar, Mutex, RwLock};
57use std::time::Duration;
58
59/// An event emitted by [`Node`], which should be handled by the user.
60///
61/// [`Node`]: [`crate::Node`]
62#[derive(Debug, Clone, PartialEq, Eq)]
63pub enum Event {
64	/// A sent payment was successful.
65	PaymentSuccessful {
66		/// A local identifier used to track the payment.
67		///
68		/// Will only be `None` for events serialized with LDK Node v0.2.1 or prior.
69		payment_id: Option<PaymentId>,
70		/// The hash of the payment.
71		payment_hash: PaymentHash,
72		/// The preimage to the `payment_hash`.
73		///
74		/// Note that this serves as a payment receipt.
75		///
76		/// Will only be `None` for events serialized with LDK Node v0.4.2 or prior.
77		payment_preimage: Option<PaymentPreimage>,
78		/// The total fee which was spent at intermediate hops in this payment.
79		fee_paid_msat: Option<u64>,
80	},
81	/// A sent payment has failed.
82	PaymentFailed {
83		/// A local identifier used to track the payment.
84		///
85		/// Will only be `None` for events serialized with LDK Node v0.2.1 or prior.
86		payment_id: Option<PaymentId>,
87		/// The hash of the payment.
88		///
89		/// This will be `None` if the payment failed before receiving an invoice when paying a
90		/// BOLT12 [`Offer`].
91		///
92		/// [`Offer`]: lightning::offers::offer::Offer
93		payment_hash: Option<PaymentHash>,
94		/// The reason why the payment failed.
95		///
96		/// This will be `None` for events serialized by LDK Node v0.2.1 and prior.
97		reason: Option<PaymentFailureReason>,
98	},
99	/// A payment has been received.
100	PaymentReceived {
101		/// A local identifier used to track the payment.
102		///
103		/// Will only be `None` for events serialized with LDK Node v0.2.1 or prior.
104		payment_id: Option<PaymentId>,
105		/// The hash of the payment.
106		payment_hash: PaymentHash,
107		/// The value, in thousandths of a satoshi, that has been received.
108		amount_msat: u64,
109		/// Custom TLV records received on the payment
110		custom_records: Vec<CustomTlvRecord>,
111	},
112	/// A payment has been forwarded.
113	PaymentForwarded {
114		/// The channel id of the incoming channel between the previous node and us.
115		prev_channel_id: ChannelId,
116		/// The channel id of the outgoing channel between the next node and us.
117		next_channel_id: ChannelId,
118		/// The `user_channel_id` of the incoming channel between the previous node and us.
119		///
120		/// Will only be `None` for events serialized with LDK Node v0.3.0 or prior.
121		prev_user_channel_id: Option<UserChannelId>,
122		/// The `user_channel_id` of the outgoing channel between the next node and us.
123		///
124		/// This will be `None` if the payment was settled via an on-chain transaction. See the
125		/// caveat described for the `total_fee_earned_msat` field.
126		next_user_channel_id: Option<UserChannelId>,
127		/// The node id of the previous node.
128		///
129		/// This is only `None` for HTLCs received prior to LDK Node v0.5 or for events serialized by
130		/// versions prior to v0.5.
131		prev_node_id: Option<PublicKey>,
132		/// The node id of the next node.
133		///
134		/// This is only `None` for HTLCs received prior to LDK Node v0.5 or for events serialized by
135		/// versions prior to v0.5.
136		next_node_id: Option<PublicKey>,
137		/// The total fee, in milli-satoshis, which was earned as a result of the payment.
138		///
139		/// Note that if we force-closed the channel over which we forwarded an HTLC while the HTLC
140		/// was pending, the amount the next hop claimed will have been rounded down to the nearest
141		/// whole satoshi. Thus, the fee calculated here may be higher than expected as we still
142		/// claimed the full value in millisatoshis from the source. In this case,
143		/// `claim_from_onchain_tx` will be set.
144		///
145		/// If the channel which sent us the payment has been force-closed, we will claim the funds
146		/// via an on-chain transaction. In that case we do not yet know the on-chain transaction
147		/// fees which we will spend and will instead set this to `None`.
148		total_fee_earned_msat: Option<u64>,
149		/// The share of the total fee, in milli-satoshis, which was withheld in addition to the
150		/// forwarding fee.
151		///
152		/// This will only be `Some` if we forwarded an intercepted HTLC with less than the
153		/// expected amount. This means our counterparty accepted to receive less than the invoice
154		/// amount.
155		///
156		/// The caveat described above the `total_fee_earned_msat` field applies here as well.
157		skimmed_fee_msat: Option<u64>,
158		/// If this is `true`, the forwarded HTLC was claimed by our counterparty via an on-chain
159		/// transaction.
160		claim_from_onchain_tx: bool,
161		/// The final amount forwarded, in milli-satoshis, after the fee is deducted.
162		///
163		/// The caveat described above the `total_fee_earned_msat` field applies here as well.
164		outbound_amount_forwarded_msat: Option<u64>,
165	},
166	/// A payment for a previously-registered payment hash has been received.
167	///
168	/// This needs to be manually claimed by supplying the correct preimage to [`claim_for_hash`].
169	///
170	/// If the the provided parameters don't match the expectations or the preimage can't be
171	/// retrieved in time, should be failed-back via [`fail_for_hash`].
172	///
173	/// Note claiming will necessarily fail after the `claim_deadline` has been reached.
174	///
175	/// [`claim_for_hash`]: crate::payment::Bolt11Payment::claim_for_hash
176	/// [`fail_for_hash`]: crate::payment::Bolt11Payment::fail_for_hash
177	PaymentClaimable {
178		/// A local identifier used to track the payment.
179		payment_id: PaymentId,
180		/// The hash of the payment.
181		payment_hash: PaymentHash,
182		/// The value, in thousandths of a satoshi, that is claimable.
183		claimable_amount_msat: u64,
184		/// The block height at which this payment will be failed back and will no longer be
185		/// eligible for claiming.
186		claim_deadline: Option<u32>,
187		/// Custom TLV records attached to the payment
188		custom_records: Vec<CustomTlvRecord>,
189	},
190	/// A channel has been created and is pending confirmation on-chain.
191	ChannelPending {
192		/// The `channel_id` of the channel.
193		channel_id: ChannelId,
194		/// The `user_channel_id` of the channel.
195		user_channel_id: UserChannelId,
196		/// The `temporary_channel_id` this channel used to be known by during channel establishment.
197		former_temporary_channel_id: ChannelId,
198		/// The `node_id` of the channel counterparty.
199		counterparty_node_id: PublicKey,
200		/// The outpoint of the channel's funding transaction.
201		funding_txo: OutPoint,
202	},
203	/// A channel is ready to be used.
204	ChannelReady {
205		/// The `channel_id` of the channel.
206		channel_id: ChannelId,
207		/// The `user_channel_id` of the channel.
208		user_channel_id: UserChannelId,
209		/// The `node_id` of the channel counterparty.
210		///
211		/// This will be `None` for events serialized by LDK Node v0.1.0 and prior.
212		counterparty_node_id: Option<PublicKey>,
213	},
214	/// A channel has been closed.
215	ChannelClosed {
216		/// The `channel_id` of the channel.
217		channel_id: ChannelId,
218		/// The `user_channel_id` of the channel.
219		user_channel_id: UserChannelId,
220		/// The `node_id` of the channel counterparty.
221		///
222		/// This will be `None` for events serialized by LDK Node v0.1.0 and prior.
223		counterparty_node_id: Option<PublicKey>,
224		/// This will be `None` for events serialized by LDK Node v0.2.1 and prior.
225		reason: Option<ClosureReason>,
226	},
227}
228
229impl_writeable_tlv_based_enum!(Event,
230	(0, PaymentSuccessful) => {
231		(0, payment_hash, required),
232		(1, fee_paid_msat, option),
233		(3, payment_id, option),
234		(5, payment_preimage, option),
235	},
236	(1, PaymentFailed) => {
237		(0, payment_hash, option),
238		(1, reason, upgradable_option),
239		(3, payment_id, option),
240	},
241	(2, PaymentReceived) => {
242		(0, payment_hash, required),
243		(1, payment_id, option),
244		(2, amount_msat, required),
245		(3, custom_records, optional_vec),
246	},
247	(3, ChannelReady) => {
248		(0, channel_id, required),
249		(1, counterparty_node_id, option),
250		(2, user_channel_id, required),
251	},
252	(4, ChannelPending) => {
253		(0, channel_id, required),
254		(2, user_channel_id, required),
255		(4, former_temporary_channel_id, required),
256		(6, counterparty_node_id, required),
257		(8, funding_txo, required),
258	},
259	(5, ChannelClosed) => {
260		(0, channel_id, required),
261		(1, counterparty_node_id, option),
262		(2, user_channel_id, required),
263		(3, reason, upgradable_option),
264	},
265	(6, PaymentClaimable) => {
266		(0, payment_hash, required),
267		(2, payment_id, required),
268		(4, claimable_amount_msat, required),
269		(6, claim_deadline, option),
270		(7, custom_records, optional_vec),
271	},
272	(7, PaymentForwarded) => {
273		(0, prev_channel_id, required),
274		(1, prev_node_id, option),
275		(2, next_channel_id, required),
276		(3, next_node_id, option),
277		(4, prev_user_channel_id, option),
278		(6, next_user_channel_id, option),
279		(8, total_fee_earned_msat, option),
280		(10, skimmed_fee_msat, option),
281		(12, claim_from_onchain_tx, required),
282		(14, outbound_amount_forwarded_msat, option),
283	}
284);
285
286pub struct EventQueue<L: Deref>
287where
288	L::Target: LdkLogger,
289{
290	queue: Arc<Mutex<VecDeque<Event>>>,
291	waker: Arc<Mutex<Option<Waker>>>,
292	notifier: Condvar,
293	kv_store: Arc<DynStore>,
294	logger: L,
295}
296
297impl<L: Deref> EventQueue<L>
298where
299	L::Target: LdkLogger,
300{
301	pub(crate) fn new(kv_store: Arc<DynStore>, logger: L) -> Self {
302		let queue = Arc::new(Mutex::new(VecDeque::new()));
303		let waker = Arc::new(Mutex::new(None));
304		let notifier = Condvar::new();
305		Self { queue, waker, notifier, kv_store, logger }
306	}
307
308	pub(crate) fn add_event(&self, event: Event) -> Result<(), Error> {
309		{
310			let mut locked_queue = self.queue.lock().unwrap();
311			locked_queue.push_back(event);
312			self.persist_queue(&locked_queue)?;
313		}
314
315		self.notifier.notify_one();
316
317		if let Some(waker) = self.waker.lock().unwrap().take() {
318			waker.wake();
319		}
320		Ok(())
321	}
322
323	pub(crate) fn next_event(&self) -> Option<Event> {
324		let locked_queue = self.queue.lock().unwrap();
325		locked_queue.front().cloned()
326	}
327
328	pub(crate) async fn next_event_async(&self) -> Event {
329		EventFuture { event_queue: Arc::clone(&self.queue), waker: Arc::clone(&self.waker) }.await
330	}
331
332	pub(crate) fn wait_next_event(&self) -> Event {
333		let locked_queue =
334			self.notifier.wait_while(self.queue.lock().unwrap(), |queue| queue.is_empty()).unwrap();
335		locked_queue.front().unwrap().clone()
336	}
337
338	pub(crate) fn event_handled(&self) -> Result<(), Error> {
339		{
340			let mut locked_queue = self.queue.lock().unwrap();
341			locked_queue.pop_front();
342			self.persist_queue(&locked_queue)?;
343		}
344		self.notifier.notify_one();
345
346		if let Some(waker) = self.waker.lock().unwrap().take() {
347			waker.wake();
348		}
349		Ok(())
350	}
351
352	fn persist_queue(&self, locked_queue: &VecDeque<Event>) -> Result<(), Error> {
353		let data = EventQueueSerWrapper(locked_queue).encode();
354		self.kv_store
355			.write(
356				EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
357				EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
358				EVENT_QUEUE_PERSISTENCE_KEY,
359				&data,
360			)
361			.map_err(|e| {
362				log_error!(
363					self.logger,
364					"Write for key {}/{}/{} failed due to: {}",
365					EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
366					EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
367					EVENT_QUEUE_PERSISTENCE_KEY,
368					e
369				);
370				Error::PersistenceFailed
371			})?;
372		Ok(())
373	}
374}
375
376impl<L: Deref> ReadableArgs<(Arc<DynStore>, L)> for EventQueue<L>
377where
378	L::Target: LdkLogger,
379{
380	#[inline]
381	fn read<R: lightning::io::Read>(
382		reader: &mut R, args: (Arc<DynStore>, L),
383	) -> Result<Self, lightning::ln::msgs::DecodeError> {
384		let (kv_store, logger) = args;
385		let read_queue: EventQueueDeserWrapper = Readable::read(reader)?;
386		let queue = Arc::new(Mutex::new(read_queue.0));
387		let waker = Arc::new(Mutex::new(None));
388		let notifier = Condvar::new();
389		Ok(Self { queue, waker, notifier, kv_store, logger })
390	}
391}
392
393struct EventQueueDeserWrapper(VecDeque<Event>);
394
395impl Readable for EventQueueDeserWrapper {
396	fn read<R: lightning::io::Read>(
397		reader: &mut R,
398	) -> Result<Self, lightning::ln::msgs::DecodeError> {
399		let len: u16 = Readable::read(reader)?;
400		let mut queue = VecDeque::with_capacity(len as usize);
401		for _ in 0..len {
402			queue.push_back(Readable::read(reader)?);
403		}
404		Ok(Self(queue))
405	}
406}
407
408struct EventQueueSerWrapper<'a>(&'a VecDeque<Event>);
409
410impl Writeable for EventQueueSerWrapper<'_> {
411	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), lightning::io::Error> {
412		(self.0.len() as u16).write(writer)?;
413		for e in self.0.iter() {
414			e.write(writer)?;
415		}
416		Ok(())
417	}
418}
419
420struct EventFuture {
421	event_queue: Arc<Mutex<VecDeque<Event>>>,
422	waker: Arc<Mutex<Option<Waker>>>,
423}
424
425impl Future for EventFuture {
426	type Output = Event;
427
428	fn poll(
429		self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>,
430	) -> core::task::Poll<Self::Output> {
431		if let Some(event) = self.event_queue.lock().unwrap().front() {
432			Poll::Ready(event.clone())
433		} else {
434			*self.waker.lock().unwrap() = Some(cx.waker().clone());
435			Poll::Pending
436		}
437	}
438}
439
440pub(crate) struct EventHandler<L: Deref + Clone + Sync + Send + 'static>
441where
442	L::Target: LdkLogger,
443{
444	event_queue: Arc<EventQueue<L>>,
445	wallet: Arc<Wallet>,
446	bump_tx_event_handler: Arc<BumpTransactionEventHandler>,
447	channel_manager: Arc<ChannelManager>,
448	connection_manager: Arc<ConnectionManager<L>>,
449	output_sweeper: Arc<Sweeper>,
450	network_graph: Arc<Graph>,
451	liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
452	payment_store: Arc<PaymentStore>,
453	peer_store: Arc<PeerStore<L>>,
454	runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>,
455	logger: L,
456	config: Arc<Config>,
457}
458
459impl<L: Deref + Clone + Sync + Send + 'static> EventHandler<L>
460where
461	L::Target: LdkLogger,
462{
463	pub fn new(
464		event_queue: Arc<EventQueue<L>>, wallet: Arc<Wallet>,
465		bump_tx_event_handler: Arc<BumpTransactionEventHandler>,
466		channel_manager: Arc<ChannelManager>, connection_manager: Arc<ConnectionManager<L>>,
467		output_sweeper: Arc<Sweeper>, network_graph: Arc<Graph>,
468		liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
469		payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>,
470		runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>, logger: L, config: Arc<Config>,
471	) -> Self {
472		Self {
473			event_queue,
474			wallet,
475			bump_tx_event_handler,
476			channel_manager,
477			connection_manager,
478			output_sweeper,
479			network_graph,
480			liquidity_source,
481			payment_store,
482			peer_store,
483			logger,
484			runtime,
485			config,
486		}
487	}
488
489	pub async fn handle_event(&self, event: LdkEvent) -> Result<(), ReplayEvent> {
490		match event {
491			LdkEvent::FundingGenerationReady {
492				temporary_channel_id,
493				counterparty_node_id,
494				channel_value_satoshis,
495				output_script,
496				..
497			} => {
498				// Construct the raw transaction with the output that is paid the amount of the
499				// channel.
500				let confirmation_target = ConfirmationTarget::ChannelFunding;
501
502				// We set nLockTime to the current height to discourage fee sniping.
503				let cur_height = self.channel_manager.current_best_block().height;
504				let locktime = LockTime::from_height(cur_height).unwrap_or(LockTime::ZERO);
505
506				// Sign the final funding transaction and broadcast it.
507				let channel_amount = Amount::from_sat(channel_value_satoshis);
508				match self.wallet.create_funding_transaction(
509					output_script,
510					channel_amount,
511					confirmation_target,
512					locktime,
513				) {
514					Ok(final_tx) => {
515						// Give the funding transaction back to LDK for opening the channel.
516						match self.channel_manager.funding_transaction_generated(
517							temporary_channel_id,
518							counterparty_node_id,
519							final_tx,
520						) {
521							Ok(()) => {},
522							Err(APIError::APIMisuseError { err }) => {
523								log_error!(self.logger, "Panicking due to APIMisuseError: {}", err);
524								panic!("APIMisuseError: {}", err);
525							},
526							Err(APIError::ChannelUnavailable { err }) => {
527								log_error!(
528									self.logger,
529									"Failed to process funding transaction as channel went away before we could fund it: {}",
530									err
531								)
532							},
533							Err(err) => {
534								log_error!(
535									self.logger,
536									"Failed to process funding transaction: {:?}",
537									err
538								)
539							},
540						}
541					},
542					Err(err) => {
543						log_error!(self.logger, "Failed to create funding transaction: {}", err);
544						self.channel_manager
545							.force_close_without_broadcasting_txn(
546								&temporary_channel_id,
547								&counterparty_node_id,
548								"Failed to create funding transaction".to_string(),
549							)
550							.unwrap_or_else(|e| {
551								log_error!(self.logger, "Failed to force close channel after funding generation failed: {:?}", e);
552								panic!(
553									"Failed to force close channel after funding generation failed"
554								);
555							});
556					},
557				}
558			},
559			LdkEvent::FundingTxBroadcastSafe { .. } => {
560				debug_assert!(false, "We currently only support safe funding, so this event should never be emitted.");
561			},
562			LdkEvent::PaymentClaimable {
563				payment_hash,
564				purpose,
565				amount_msat,
566				receiver_node_id: _,
567				via_channel_id: _,
568				via_user_channel_id: _,
569				claim_deadline,
570				onion_fields,
571				counterparty_skimmed_fee_msat,
572				payment_id: _,
573			} => {
574				let payment_id = PaymentId(payment_hash.0);
575				if let Some(info) = self.payment_store.get(&payment_id) {
576					if info.direction == PaymentDirection::Outbound {
577						log_info!(
578							self.logger,
579							"Refused inbound payment with ID {}: circular payments are unsupported.",
580							payment_id
581						);
582						self.channel_manager.fail_htlc_backwards(&payment_hash);
583
584						let update = PaymentDetailsUpdate {
585							status: Some(PaymentStatus::Failed),
586							..PaymentDetailsUpdate::new(payment_id)
587						};
588						match self.payment_store.update(&update) {
589							Ok(_) => return Ok(()),
590							Err(e) => {
591								log_error!(self.logger, "Failed to access payment store: {}", e);
592								return Err(ReplayEvent());
593							},
594						};
595					}
596
597					if info.status == PaymentStatus::Succeeded
598						|| matches!(info.kind, PaymentKind::Spontaneous { .. })
599					{
600						log_info!(
601							self.logger,
602							"Refused duplicate inbound payment from payment hash {} of {}msat",
603							hex_utils::to_string(&payment_hash.0),
604							amount_msat,
605						);
606						self.channel_manager.fail_htlc_backwards(&payment_hash);
607
608						let update = PaymentDetailsUpdate {
609							status: Some(PaymentStatus::Failed),
610							..PaymentDetailsUpdate::new(payment_id)
611						};
612						match self.payment_store.update(&update) {
613							Ok(_) => return Ok(()),
614							Err(e) => {
615								log_error!(self.logger, "Failed to access payment store: {}", e);
616								return Err(ReplayEvent());
617							},
618						};
619					}
620
621					let max_total_opening_fee_msat = match info.kind {
622						PaymentKind::Bolt11Jit { lsp_fee_limits, .. } => {
623							lsp_fee_limits
624								.max_total_opening_fee_msat
625								.or_else(|| {
626									lsp_fee_limits.max_proportional_opening_fee_ppm_msat.and_then(
627										|max_prop_fee| {
628											// If it's a variable amount payment, compute the actual fee.
629											compute_opening_fee(amount_msat, 0, max_prop_fee)
630										},
631									)
632								})
633								.unwrap_or(0)
634						},
635						_ => 0,
636					};
637
638					if counterparty_skimmed_fee_msat > max_total_opening_fee_msat {
639						log_info!(
640							self.logger,
641							"Refusing inbound payment with hash {} as the counterparty-withheld fee of {}msat exceeds our limit of {}msat",
642							hex_utils::to_string(&payment_hash.0),
643							counterparty_skimmed_fee_msat,
644							max_total_opening_fee_msat,
645						);
646						self.channel_manager.fail_htlc_backwards(&payment_hash);
647
648						let update = PaymentDetailsUpdate {
649							hash: Some(Some(payment_hash)),
650							status: Some(PaymentStatus::Failed),
651							..PaymentDetailsUpdate::new(payment_id)
652						};
653						match self.payment_store.update(&update) {
654							Ok(_) => return Ok(()),
655							Err(e) => {
656								log_error!(self.logger, "Failed to access payment store: {}", e);
657								return Err(ReplayEvent());
658							},
659						};
660					}
661
662					// If the LSP skimmed anything, update our stored payment.
663					if counterparty_skimmed_fee_msat > 0 {
664						match info.kind {
665							PaymentKind::Bolt11Jit { .. } => {
666								let update = PaymentDetailsUpdate {
667									counterparty_skimmed_fee_msat: Some(Some(counterparty_skimmed_fee_msat)),
668									..PaymentDetailsUpdate::new(payment_id)
669								};
670								match self.payment_store.update(&update) {
671									Ok(_) => (),
672									Err(e) => {
673										log_error!(self.logger, "Failed to access payment store: {}", e);
674										return Err(ReplayEvent());
675									},
676								};
677							}
678							_ => debug_assert!(false, "We only expect the counterparty to get away with withholding fees for JIT payments."),
679						}
680					}
681
682					// If this is known by the store but ChannelManager doesn't know the preimage,
683					// the payment has been registered via `_for_hash` variants and needs to be manually claimed via
684					// user interaction.
685					match info.kind {
686						PaymentKind::Bolt11 { preimage, .. } => {
687							if purpose.preimage().is_none() {
688								debug_assert!(
689									preimage.is_none(),
690									"We would have registered the preimage if we knew"
691								);
692
693								let custom_records = onion_fields
694									.map(|cf| {
695										cf.custom_tlvs().into_iter().map(|tlv| tlv.into()).collect()
696									})
697									.unwrap_or_default();
698								let event = Event::PaymentClaimable {
699									payment_id,
700									payment_hash,
701									claimable_amount_msat: amount_msat,
702									claim_deadline,
703									custom_records,
704								};
705								match self.event_queue.add_event(event) {
706									Ok(_) => return Ok(()),
707									Err(e) => {
708										log_error!(
709											self.logger,
710											"Failed to push to event queue: {}",
711											e
712										);
713										return Err(ReplayEvent());
714									},
715								};
716							}
717						},
718						_ => {},
719					}
720				}
721
722				log_info!(
723					self.logger,
724					"Received payment from payment hash {} of {}msat",
725					hex_utils::to_string(&payment_hash.0),
726					amount_msat,
727				);
728				let payment_preimage = match purpose {
729					PaymentPurpose::Bolt11InvoicePayment { payment_preimage, .. } => {
730						payment_preimage
731					},
732					PaymentPurpose::Bolt12OfferPayment {
733						payment_preimage,
734						payment_secret,
735						payment_context,
736						..
737					} => {
738						let payer_note = payment_context.invoice_request.payer_note_truncated;
739						let offer_id = payment_context.offer_id;
740						let quantity = payment_context.invoice_request.quantity;
741						let kind = PaymentKind::Bolt12Offer {
742							hash: Some(payment_hash),
743							preimage: payment_preimage,
744							secret: Some(payment_secret),
745							offer_id,
746							payer_note,
747							quantity,
748						};
749
750						let payment = PaymentDetails::new(
751							payment_id,
752							kind,
753							Some(amount_msat),
754							None,
755							PaymentDirection::Inbound,
756							PaymentStatus::Pending,
757						);
758
759						match self.payment_store.insert(payment) {
760							Ok(false) => (),
761							Ok(true) => {
762								log_error!(
763									self.logger,
764									"Bolt12OfferPayment with ID {} was previously known",
765									payment_id,
766								);
767								debug_assert!(false);
768							},
769							Err(e) => {
770								log_error!(
771									self.logger,
772									"Failed to insert payment with ID {}: {}",
773									payment_id,
774									e
775								);
776								debug_assert!(false);
777							},
778						}
779						payment_preimage
780					},
781					PaymentPurpose::Bolt12RefundPayment { payment_preimage, .. } => {
782						payment_preimage
783					},
784					PaymentPurpose::SpontaneousPayment(preimage) => {
785						// Since it's spontaneous, we insert it now into our store.
786						let kind = PaymentKind::Spontaneous {
787							hash: payment_hash,
788							preimage: Some(preimage),
789						};
790
791						let payment = PaymentDetails::new(
792							payment_id,
793							kind,
794							Some(amount_msat),
795							None,
796							PaymentDirection::Inbound,
797							PaymentStatus::Pending,
798						);
799
800						match self.payment_store.insert(payment) {
801							Ok(false) => (),
802							Ok(true) => {
803								log_error!(
804									self.logger,
805									"Spontaneous payment with ID {} was previously known",
806									payment_id,
807								);
808								debug_assert!(false);
809							},
810							Err(e) => {
811								log_error!(
812									self.logger,
813									"Failed to insert payment with ID {}: {}",
814									payment_id,
815									e
816								);
817								debug_assert!(false);
818							},
819						}
820
821						Some(preimage)
822					},
823				};
824
825				if let Some(preimage) = payment_preimage {
826					self.channel_manager.claim_funds(preimage);
827				} else {
828					log_error!(
829						self.logger,
830						"Failed to claim payment with ID {}: preimage unknown.",
831						payment_id,
832					);
833					self.channel_manager.fail_htlc_backwards(&payment_hash);
834
835					let update = PaymentDetailsUpdate {
836						hash: Some(Some(payment_hash)),
837						status: Some(PaymentStatus::Failed),
838						..PaymentDetailsUpdate::new(payment_id)
839					};
840					match self.payment_store.update(&update) {
841						Ok(_) => return Ok(()),
842						Err(e) => {
843							log_error!(self.logger, "Failed to access payment store: {}", e);
844							return Err(ReplayEvent());
845						},
846					};
847				}
848			},
849			LdkEvent::PaymentClaimed {
850				payment_hash,
851				purpose,
852				amount_msat,
853				receiver_node_id: _,
854				htlcs: _,
855				sender_intended_total_msat: _,
856				onion_fields,
857				payment_id: _,
858			} => {
859				let payment_id = PaymentId(payment_hash.0);
860				log_info!(
861					self.logger,
862					"Claimed payment with ID {} from payment hash {} of {}msat.",
863					payment_id,
864					hex_utils::to_string(&payment_hash.0),
865					amount_msat,
866				);
867
868				let update = match purpose {
869					PaymentPurpose::Bolt11InvoicePayment {
870						payment_preimage,
871						payment_secret,
872						..
873					} => PaymentDetailsUpdate {
874						preimage: Some(payment_preimage),
875						secret: Some(Some(payment_secret)),
876						amount_msat: Some(Some(amount_msat)),
877						status: Some(PaymentStatus::Succeeded),
878						..PaymentDetailsUpdate::new(payment_id)
879					},
880					PaymentPurpose::Bolt12OfferPayment {
881						payment_preimage, payment_secret, ..
882					} => PaymentDetailsUpdate {
883						preimage: Some(payment_preimage),
884						secret: Some(Some(payment_secret)),
885						amount_msat: Some(Some(amount_msat)),
886						status: Some(PaymentStatus::Succeeded),
887						..PaymentDetailsUpdate::new(payment_id)
888					},
889					PaymentPurpose::Bolt12RefundPayment {
890						payment_preimage,
891						payment_secret,
892						..
893					} => PaymentDetailsUpdate {
894						preimage: Some(payment_preimage),
895						secret: Some(Some(payment_secret)),
896						amount_msat: Some(Some(amount_msat)),
897						status: Some(PaymentStatus::Succeeded),
898						..PaymentDetailsUpdate::new(payment_id)
899					},
900					PaymentPurpose::SpontaneousPayment(preimage) => PaymentDetailsUpdate {
901						preimage: Some(Some(preimage)),
902						amount_msat: Some(Some(amount_msat)),
903						status: Some(PaymentStatus::Succeeded),
904						..PaymentDetailsUpdate::new(payment_id)
905					},
906				};
907
908				match self.payment_store.update(&update) {
909					Ok(DataStoreUpdateResult::Updated) | Ok(DataStoreUpdateResult::Unchanged) => (
910						// No need to do anything if the idempotent update was applied, which might
911						// be the result of a replayed event.
912					),
913					Ok(DataStoreUpdateResult::NotFound) => {
914						log_error!(
915							self.logger,
916							"Claimed payment with ID {} couldn't be found in store",
917							payment_id,
918						);
919					},
920					Err(e) => {
921						log_error!(
922							self.logger,
923							"Failed to update payment with ID {}: {}",
924							payment_id,
925							e
926						);
927						return Err(ReplayEvent());
928					},
929				}
930
931				let event = Event::PaymentReceived {
932					payment_id: Some(payment_id),
933					payment_hash,
934					amount_msat,
935					custom_records: onion_fields
936						.map(|cf| cf.custom_tlvs().into_iter().map(|tlv| tlv.into()).collect())
937						.unwrap_or_default(),
938				};
939				match self.event_queue.add_event(event) {
940					Ok(_) => return Ok(()),
941					Err(e) => {
942						log_error!(self.logger, "Failed to push to event queue: {}", e);
943						return Err(ReplayEvent());
944					},
945				};
946			},
947			LdkEvent::PaymentSent {
948				payment_id,
949				payment_preimage,
950				payment_hash,
951				fee_paid_msat,
952				..
953			} => {
954				let payment_id = if let Some(id) = payment_id {
955					id
956				} else {
957					debug_assert!(false, "payment_id should always be set.");
958					return Ok(());
959				};
960
961				let update = PaymentDetailsUpdate {
962					hash: Some(Some(payment_hash)),
963					preimage: Some(Some(payment_preimage)),
964					fee_paid_msat: Some(fee_paid_msat),
965					status: Some(PaymentStatus::Succeeded),
966					..PaymentDetailsUpdate::new(payment_id)
967				};
968
969				match self.payment_store.update(&update) {
970					Ok(_) => {},
971					Err(e) => {
972						log_error!(self.logger, "Failed to access payment store: {}", e);
973						return Err(ReplayEvent());
974					},
975				};
976
977				self.payment_store.get(&payment_id).map(|payment| {
978					log_info!(
979						self.logger,
980						"Successfully sent payment of {}msat{} from \
981						payment hash {:?} with preimage {:?}",
982						payment.amount_msat.unwrap(),
983						if let Some(fee) = fee_paid_msat {
984							format!(" (fee {} msat)", fee)
985						} else {
986							"".to_string()
987						},
988						hex_utils::to_string(&payment_hash.0),
989						hex_utils::to_string(&payment_preimage.0)
990					);
991				});
992				let event = Event::PaymentSuccessful {
993					payment_id: Some(payment_id),
994					payment_hash,
995					payment_preimage: Some(payment_preimage),
996					fee_paid_msat,
997				};
998
999				match self.event_queue.add_event(event) {
1000					Ok(_) => return Ok(()),
1001					Err(e) => {
1002						log_error!(self.logger, "Failed to push to event queue: {}", e);
1003						return Err(ReplayEvent());
1004					},
1005				};
1006			},
1007			LdkEvent::PaymentFailed { payment_id, payment_hash, reason, .. } => {
1008				log_info!(
1009					self.logger,
1010					"Failed to send payment with ID {} due to {:?}.",
1011					payment_id,
1012					reason
1013				);
1014
1015				let update = PaymentDetailsUpdate {
1016					hash: Some(payment_hash),
1017					status: Some(PaymentStatus::Failed),
1018					..PaymentDetailsUpdate::new(payment_id)
1019				};
1020				match self.payment_store.update(&update) {
1021					Ok(_) => {},
1022					Err(e) => {
1023						log_error!(self.logger, "Failed to access payment store: {}", e);
1024						return Err(ReplayEvent());
1025					},
1026				};
1027
1028				let event =
1029					Event::PaymentFailed { payment_id: Some(payment_id), payment_hash, reason };
1030				match self.event_queue.add_event(event) {
1031					Ok(_) => return Ok(()),
1032					Err(e) => {
1033						log_error!(self.logger, "Failed to push to event queue: {}", e);
1034						return Err(ReplayEvent());
1035					},
1036				};
1037			},
1038
1039			LdkEvent::PaymentPathSuccessful { .. } => {},
1040			LdkEvent::PaymentPathFailed { .. } => {},
1041			LdkEvent::ProbeSuccessful { .. } => {},
1042			LdkEvent::ProbeFailed { .. } => {},
1043			LdkEvent::HTLCHandlingFailed { failed_next_destination, .. } => {
1044				if let Some(liquidity_source) = self.liquidity_source.as_ref() {
1045					liquidity_source.handle_htlc_handling_failed(failed_next_destination);
1046				}
1047			},
1048			LdkEvent::PendingHTLCsForwardable { time_forwardable } => {
1049				let forwarding_channel_manager = self.channel_manager.clone();
1050				let min = time_forwardable.as_millis() as u64;
1051
1052				let runtime_lock = self.runtime.read().unwrap();
1053				debug_assert!(runtime_lock.is_some());
1054
1055				if let Some(runtime) = runtime_lock.as_ref() {
1056					runtime.spawn(async move {
1057						let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64;
1058						tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await;
1059
1060						forwarding_channel_manager.process_pending_htlc_forwards();
1061					});
1062				}
1063			},
1064			LdkEvent::SpendableOutputs { outputs, channel_id } => {
1065				match self.output_sweeper.track_spendable_outputs(outputs, channel_id, true, None) {
1066					Ok(_) => return Ok(()),
1067					Err(_) => {
1068						log_error!(self.logger, "Failed to track spendable outputs");
1069						return Err(ReplayEvent());
1070					},
1071				};
1072			},
1073			LdkEvent::OpenChannelRequest {
1074				temporary_channel_id,
1075				counterparty_node_id,
1076				funding_satoshis,
1077				channel_type,
1078				channel_negotiation_type: _,
1079				is_announced,
1080				params: _,
1081			} => {
1082				if is_announced {
1083					if let Err(err) = may_announce_channel(&*self.config) {
1084						log_error!(self.logger, "Rejecting inbound announced channel from peer {} due to missing configuration: {}", counterparty_node_id, err);
1085
1086						self.channel_manager
1087							.force_close_without_broadcasting_txn(
1088								&temporary_channel_id,
1089								&counterparty_node_id,
1090								"Channel request rejected".to_string(),
1091							)
1092							.unwrap_or_else(|e| {
1093								log_error!(self.logger, "Failed to reject channel: {:?}", e)
1094							});
1095						return Ok(());
1096					}
1097				}
1098
1099				let anchor_channel = channel_type.requires_anchors_zero_fee_htlc_tx();
1100				if anchor_channel {
1101					if let Some(anchor_channels_config) =
1102						self.config.anchor_channels_config.as_ref()
1103					{
1104						let cur_anchor_reserve_sats = crate::total_anchor_channels_reserve_sats(
1105							&self.channel_manager,
1106							&self.config,
1107						);
1108						let spendable_amount_sats = self
1109							.wallet
1110							.get_spendable_amount_sats(cur_anchor_reserve_sats)
1111							.unwrap_or(0);
1112
1113						let required_amount_sats = if anchor_channels_config
1114							.trusted_peers_no_reserve
1115							.contains(&counterparty_node_id)
1116						{
1117							0
1118						} else {
1119							anchor_channels_config.per_channel_reserve_sats
1120						};
1121
1122						if spendable_amount_sats < required_amount_sats {
1123							log_error!(
1124								self.logger,
1125								"Rejecting inbound Anchor channel from peer {} due to insufficient available on-chain reserves. Available: {}/{}sats",
1126								counterparty_node_id,
1127								spendable_amount_sats,
1128								required_amount_sats,
1129							);
1130							self.channel_manager
1131								.force_close_without_broadcasting_txn(
1132									&temporary_channel_id,
1133									&counterparty_node_id,
1134									"Channel request rejected".to_string(),
1135								)
1136								.unwrap_or_else(|e| {
1137									log_error!(self.logger, "Failed to reject channel: {:?}", e)
1138								});
1139							return Ok(());
1140						}
1141					} else {
1142						log_error!(
1143							self.logger,
1144							"Rejecting inbound channel from peer {} due to Anchor channels being disabled.",
1145							counterparty_node_id,
1146						);
1147						self.channel_manager
1148							.force_close_without_broadcasting_txn(
1149								&temporary_channel_id,
1150								&counterparty_node_id,
1151								"Channel request rejected".to_string(),
1152							)
1153							.unwrap_or_else(|e| {
1154								log_error!(self.logger, "Failed to reject channel: {:?}", e)
1155							});
1156						return Ok(());
1157					}
1158				}
1159
1160				let user_channel_id: u128 = rand::thread_rng().gen::<u128>();
1161				let allow_0conf = self.config.trusted_peers_0conf.contains(&counterparty_node_id);
1162				let res = if allow_0conf {
1163					self.channel_manager.accept_inbound_channel_from_trusted_peer_0conf(
1164						&temporary_channel_id,
1165						&counterparty_node_id,
1166						user_channel_id,
1167					)
1168				} else {
1169					self.channel_manager.accept_inbound_channel(
1170						&temporary_channel_id,
1171						&counterparty_node_id,
1172						user_channel_id,
1173					)
1174				};
1175
1176				match res {
1177					Ok(()) => {
1178						log_info!(
1179							self.logger,
1180							"Accepting inbound{}{} channel of {}sats from{} peer {}",
1181							if allow_0conf { " 0conf" } else { "" },
1182							if anchor_channel { " Anchor" } else { "" },
1183							funding_satoshis,
1184							if allow_0conf { " trusted" } else { "" },
1185							counterparty_node_id,
1186						);
1187					},
1188					Err(e) => {
1189						log_error!(
1190							self.logger,
1191							"Error while accepting inbound{}{} channel from{} peer {}: {:?}",
1192							if allow_0conf { " 0conf" } else { "" },
1193							if anchor_channel { " Anchor" } else { "" },
1194							counterparty_node_id,
1195							if allow_0conf { " trusted" } else { "" },
1196							e,
1197						);
1198					},
1199				}
1200			},
1201			LdkEvent::PaymentForwarded {
1202				prev_channel_id,
1203				next_channel_id,
1204				prev_user_channel_id,
1205				next_user_channel_id,
1206				prev_node_id,
1207				next_node_id,
1208				total_fee_earned_msat,
1209				skimmed_fee_msat,
1210				claim_from_onchain_tx,
1211				outbound_amount_forwarded_msat,
1212			} => {
1213				let read_only_network_graph = self.network_graph.read_only();
1214				let nodes = read_only_network_graph.nodes();
1215				let channels = self.channel_manager.list_channels();
1216
1217				let node_str = |channel_id: &Option<ChannelId>| {
1218					channel_id
1219						.and_then(|channel_id| channels.iter().find(|c| c.channel_id == channel_id))
1220						.and_then(|channel| {
1221							nodes.get(&NodeId::from_pubkey(&channel.counterparty.node_id))
1222						})
1223						.map_or("private_node".to_string(), |node| {
1224							node.announcement_info
1225								.as_ref()
1226								.map_or("unnamed node".to_string(), |ann| {
1227									format!("node {}", ann.alias())
1228								})
1229						})
1230				};
1231				let channel_str = |channel_id: &Option<ChannelId>| {
1232					channel_id
1233						.map(|channel_id| format!(" with channel {}", channel_id))
1234						.unwrap_or_default()
1235				};
1236				let from_prev_str = format!(
1237					" from {}{}",
1238					node_str(&prev_channel_id),
1239					channel_str(&prev_channel_id)
1240				);
1241				let to_next_str =
1242					format!(" to {}{}", node_str(&next_channel_id), channel_str(&next_channel_id));
1243
1244				let fee_earned = total_fee_earned_msat.unwrap_or(0);
1245				if claim_from_onchain_tx {
1246					log_info!(
1247						self.logger,
1248						"Forwarded payment{}{} of {}msat, earning {}msat in fees from claiming onchain.",
1249						from_prev_str,
1250						to_next_str,
1251						outbound_amount_forwarded_msat.unwrap_or(0),
1252						fee_earned,
1253					);
1254				} else {
1255					log_info!(
1256						self.logger,
1257						"Forwarded payment{}{} of {}msat, earning {}msat in fees.",
1258						from_prev_str,
1259						to_next_str,
1260						outbound_amount_forwarded_msat.unwrap_or(0),
1261						fee_earned,
1262					);
1263				}
1264
1265				if let Some(liquidity_source) = self.liquidity_source.as_ref() {
1266					liquidity_source.handle_payment_forwarded(next_channel_id);
1267				}
1268
1269				let event = Event::PaymentForwarded {
1270					prev_channel_id: prev_channel_id.expect("prev_channel_id expected for events generated by LDK versions greater than 0.0.107."),
1271					next_channel_id: next_channel_id.expect("next_channel_id expected for events generated by LDK versions greater than 0.0.107."),
1272					prev_user_channel_id: prev_user_channel_id.map(UserChannelId),
1273					next_user_channel_id: next_user_channel_id.map(UserChannelId),
1274					prev_node_id,
1275					next_node_id,
1276					total_fee_earned_msat,
1277					skimmed_fee_msat,
1278					claim_from_onchain_tx,
1279					outbound_amount_forwarded_msat,
1280				};
1281				self.event_queue.add_event(event).map_err(|e| {
1282					log_error!(self.logger, "Failed to push to event queue: {}", e);
1283					ReplayEvent()
1284				})?;
1285			},
1286			LdkEvent::ChannelPending {
1287				channel_id,
1288				user_channel_id,
1289				former_temporary_channel_id,
1290				counterparty_node_id,
1291				funding_txo,
1292				..
1293			} => {
1294				log_info!(
1295					self.logger,
1296					"New channel {} with counterparty {} has been created and is pending confirmation on chain.",
1297					channel_id,
1298					counterparty_node_id,
1299				);
1300
1301				let event = Event::ChannelPending {
1302					channel_id,
1303					user_channel_id: UserChannelId(user_channel_id),
1304					former_temporary_channel_id: former_temporary_channel_id.unwrap(),
1305					counterparty_node_id,
1306					funding_txo,
1307				};
1308				match self.event_queue.add_event(event) {
1309					Ok(_) => {},
1310					Err(e) => {
1311						log_error!(self.logger, "Failed to push to event queue: {}", e);
1312						return Err(ReplayEvent());
1313					},
1314				};
1315
1316				let network_graph = self.network_graph.read_only();
1317				let channels =
1318					self.channel_manager.list_channels_with_counterparty(&counterparty_node_id);
1319				if let Some(pending_channel) =
1320					channels.into_iter().find(|c| c.channel_id == channel_id)
1321				{
1322					if !pending_channel.is_outbound
1323						&& self.peer_store.get_peer(&counterparty_node_id).is_none()
1324					{
1325						if let Some(address) = network_graph
1326							.nodes()
1327							.get(&NodeId::from_pubkey(&counterparty_node_id))
1328							.and_then(|node_info| node_info.announcement_info.as_ref())
1329							.and_then(|ann_info| ann_info.addresses().first())
1330						{
1331							let peer = PeerInfo {
1332								node_id: counterparty_node_id,
1333								address: address.clone(),
1334							};
1335
1336							self.peer_store.add_peer(peer).unwrap_or_else(|e| {
1337								log_error!(
1338									self.logger,
1339									"Failed to add peer {} to peer store: {}",
1340									counterparty_node_id,
1341									e
1342								);
1343							});
1344						}
1345					}
1346				}
1347			},
1348			LdkEvent::ChannelReady {
1349				channel_id, user_channel_id, counterparty_node_id, ..
1350			} => {
1351				log_info!(
1352					self.logger,
1353					"Channel {} with counterparty {} ready to be used.",
1354					channel_id,
1355					counterparty_node_id,
1356				);
1357
1358				if let Some(liquidity_source) = self.liquidity_source.as_ref() {
1359					liquidity_source.handle_channel_ready(
1360						user_channel_id,
1361						&channel_id,
1362						&counterparty_node_id,
1363					);
1364				}
1365
1366				let event = Event::ChannelReady {
1367					channel_id,
1368					user_channel_id: UserChannelId(user_channel_id),
1369					counterparty_node_id: Some(counterparty_node_id),
1370				};
1371				match self.event_queue.add_event(event) {
1372					Ok(_) => {},
1373					Err(e) => {
1374						log_error!(self.logger, "Failed to push to event queue: {}", e);
1375						return Err(ReplayEvent());
1376					},
1377				};
1378			},
1379			LdkEvent::ChannelClosed {
1380				channel_id,
1381				reason,
1382				user_channel_id,
1383				counterparty_node_id,
1384				..
1385			} => {
1386				log_info!(self.logger, "Channel {} closed due to: {}", channel_id, reason);
1387
1388				let event = Event::ChannelClosed {
1389					channel_id,
1390					user_channel_id: UserChannelId(user_channel_id),
1391					counterparty_node_id,
1392					reason: Some(reason),
1393				};
1394
1395				match self.event_queue.add_event(event) {
1396					Ok(_) => {},
1397					Err(e) => {
1398						log_error!(self.logger, "Failed to push to event queue: {}", e);
1399						return Err(ReplayEvent());
1400					},
1401				};
1402			},
1403			LdkEvent::DiscardFunding { .. } => {},
1404			LdkEvent::HTLCIntercepted {
1405				requested_next_hop_scid,
1406				intercept_id,
1407				expected_outbound_amount_msat,
1408				payment_hash,
1409				..
1410			} => {
1411				if let Some(liquidity_source) = self.liquidity_source.as_ref() {
1412					liquidity_source.handle_htlc_intercepted(
1413						requested_next_hop_scid,
1414						intercept_id,
1415						expected_outbound_amount_msat,
1416						payment_hash,
1417					);
1418				}
1419			},
1420			LdkEvent::InvoiceReceived { .. } => {
1421				debug_assert!(false, "We currently don't handle BOLT12 invoices manually, so this event should never be emitted.");
1422			},
1423			LdkEvent::ConnectionNeeded { node_id, addresses } => {
1424				let runtime_lock = self.runtime.read().unwrap();
1425				debug_assert!(runtime_lock.is_some());
1426
1427				if let Some(runtime) = runtime_lock.as_ref() {
1428					let spawn_logger = self.logger.clone();
1429					let spawn_cm = Arc::clone(&self.connection_manager);
1430					runtime.spawn(async move {
1431						for addr in &addresses {
1432							match spawn_cm.connect_peer_if_necessary(node_id, addr.clone()).await {
1433								Ok(()) => {
1434									return;
1435								},
1436								Err(e) => {
1437									log_error!(
1438										spawn_logger,
1439										"Failed to establish connection to peer {}@{}: {}",
1440										node_id,
1441										addr,
1442										e
1443									);
1444								},
1445							}
1446						}
1447					});
1448				}
1449			},
1450			LdkEvent::BumpTransaction(bte) => {
1451				match bte {
1452					BumpTransactionEvent::ChannelClose {
1453						ref channel_id,
1454						ref counterparty_node_id,
1455						..
1456					} => {
1457						// Skip bumping channel closes if our counterparty is trusted.
1458						if let Some(anchor_channels_config) =
1459							self.config.anchor_channels_config.as_ref()
1460						{
1461							if anchor_channels_config
1462								.trusted_peers_no_reserve
1463								.contains(counterparty_node_id)
1464							{
1465								log_debug!(self.logger,
1466									"Ignoring BumpTransactionEvent::ChannelClose for channel {} due to trusted counterparty {}",
1467									channel_id, counterparty_node_id
1468								);
1469								return Ok(());
1470							}
1471						}
1472					},
1473					BumpTransactionEvent::HTLCResolution { .. } => {},
1474				}
1475
1476				self.bump_tx_event_handler.handle_event(&bte);
1477			},
1478			LdkEvent::OnionMessageIntercepted { .. } => {
1479				debug_assert!(false, "We currently don't support onion message interception, so this event should never be emitted.");
1480			},
1481			LdkEvent::OnionMessagePeerConnected { .. } => {
1482				debug_assert!(false, "We currently don't support onion message interception, so this event should never be emitted.");
1483			},
1484		}
1485		Ok(())
1486	}
1487}
1488
1489#[cfg(test)]
1490mod tests {
1491	use super::*;
1492	use lightning::util::test_utils::{TestLogger, TestStore};
1493	use std::sync::atomic::{AtomicU16, Ordering};
1494	use std::time::Duration;
1495
1496	#[tokio::test]
1497	async fn event_queue_persistence() {
1498		let store: Arc<DynStore> = Arc::new(TestStore::new(false));
1499		let logger = Arc::new(TestLogger::new());
1500		let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger)));
1501		assert_eq!(event_queue.next_event(), None);
1502
1503		let expected_event = Event::ChannelReady {
1504			channel_id: ChannelId([23u8; 32]),
1505			user_channel_id: UserChannelId(2323),
1506			counterparty_node_id: None,
1507		};
1508		event_queue.add_event(expected_event.clone()).unwrap();
1509
1510		// Check we get the expected event and that it is returned until we mark it handled.
1511		for _ in 0..5 {
1512			assert_eq!(event_queue.wait_next_event(), expected_event);
1513			assert_eq!(event_queue.next_event_async().await, expected_event);
1514			assert_eq!(event_queue.next_event(), Some(expected_event.clone()));
1515		}
1516
1517		// Check we can read back what we persisted.
1518		let persisted_bytes = store
1519			.read(
1520				EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
1521				EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
1522				EVENT_QUEUE_PERSISTENCE_KEY,
1523			)
1524			.unwrap();
1525		let deser_event_queue =
1526			EventQueue::read(&mut &persisted_bytes[..], (Arc::clone(&store), logger)).unwrap();
1527		assert_eq!(deser_event_queue.wait_next_event(), expected_event);
1528
1529		event_queue.event_handled().unwrap();
1530		assert_eq!(event_queue.next_event(), None);
1531	}
1532
1533	#[tokio::test]
1534	async fn event_queue_concurrency() {
1535		let store: Arc<DynStore> = Arc::new(TestStore::new(false));
1536		let logger = Arc::new(TestLogger::new());
1537		let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger)));
1538		assert_eq!(event_queue.next_event(), None);
1539
1540		let expected_event = Event::ChannelReady {
1541			channel_id: ChannelId([23u8; 32]),
1542			user_channel_id: UserChannelId(2323),
1543			counterparty_node_id: None,
1544		};
1545
1546		// Check `next_event_async` won't return if the queue is empty and always rather timeout.
1547		tokio::select! {
1548			_ = tokio::time::sleep(Duration::from_secs(1)) => {
1549				// Timeout
1550			}
1551			_ = event_queue.next_event_async() => {
1552				panic!();
1553			}
1554		}
1555
1556		assert_eq!(event_queue.next_event(), None);
1557		// Check we get the expected number of events when polling/enqueuing concurrently.
1558		let enqueued_events = AtomicU16::new(0);
1559		let received_events = AtomicU16::new(0);
1560		let mut delayed_enqueue = false;
1561
1562		for _ in 0..25 {
1563			event_queue.add_event(expected_event.clone()).unwrap();
1564			enqueued_events.fetch_add(1, Ordering::SeqCst);
1565		}
1566
1567		loop {
1568			tokio::select! {
1569				_ = tokio::time::sleep(Duration::from_millis(10)), if !delayed_enqueue => {
1570					event_queue.add_event(expected_event.clone()).unwrap();
1571					enqueued_events.fetch_add(1, Ordering::SeqCst);
1572					delayed_enqueue = true;
1573				}
1574				e = event_queue.next_event_async() => {
1575					assert_eq!(e, expected_event);
1576					event_queue.event_handled().unwrap();
1577					received_events.fetch_add(1, Ordering::SeqCst);
1578
1579					event_queue.add_event(expected_event.clone()).unwrap();
1580					enqueued_events.fetch_add(1, Ordering::SeqCst);
1581				}
1582				e = event_queue.next_event_async() => {
1583					assert_eq!(e, expected_event);
1584					event_queue.event_handled().unwrap();
1585					received_events.fetch_add(1, Ordering::SeqCst);
1586				}
1587			}
1588
1589			if delayed_enqueue
1590				&& received_events.load(Ordering::SeqCst) == enqueued_events.load(Ordering::SeqCst)
1591			{
1592				break;
1593			}
1594		}
1595		assert_eq!(event_queue.next_event(), None);
1596
1597		// Check we operate correctly, even when mixing and matching blocking and async API calls.
1598		let (tx, mut rx) = tokio::sync::watch::channel(());
1599		let thread_queue = Arc::clone(&event_queue);
1600		let thread_event = expected_event.clone();
1601		std::thread::spawn(move || {
1602			let e = thread_queue.wait_next_event();
1603			assert_eq!(e, thread_event);
1604			thread_queue.event_handled().unwrap();
1605			tx.send(()).unwrap();
1606		});
1607
1608		let thread_queue = Arc::clone(&event_queue);
1609		let thread_event = expected_event.clone();
1610		std::thread::spawn(move || {
1611			// Sleep a bit before we enqueue the events everybody is waiting for.
1612			std::thread::sleep(Duration::from_millis(20));
1613			thread_queue.add_event(thread_event.clone()).unwrap();
1614			thread_queue.add_event(thread_event.clone()).unwrap();
1615		});
1616
1617		let e = event_queue.next_event_async().await;
1618		assert_eq!(e, expected_event.clone());
1619		event_queue.event_handled().unwrap();
1620
1621		rx.changed().await.unwrap();
1622		assert_eq!(event_queue.next_event(), None);
1623	}
1624}