lightning_background_processor/
lib.rs

1//! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2//! running properly, and (2) either can or should be run in the background.
3#![cfg_attr(feature = "std", doc = "See docs for [`BackgroundProcessor`] for more details.")]
4#![deny(rustdoc::broken_intra_doc_links)]
5#![deny(rustdoc::private_intra_doc_links)]
6#![deny(missing_docs)]
7#![cfg_attr(not(feature = "futures"), deny(unsafe_code))]
8#![cfg_attr(docsrs, feature(doc_auto_cfg))]
9#![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
10
11#[cfg(any(test, feature = "std"))]
12extern crate core;
13
14#[cfg(not(feature = "std"))]
15extern crate alloc;
16
17#[macro_use]
18extern crate lightning;
19extern crate lightning_rapid_gossip_sync;
20
21use lightning::chain;
22use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
23use lightning::chain::chainmonitor::{ChainMonitor, Persist};
24#[cfg(feature = "std")]
25use lightning::events::EventHandler;
26#[cfg(feature = "std")]
27use lightning::events::EventsProvider;
28#[cfg(feature = "futures")]
29use lightning::events::ReplayEvent;
30use lightning::events::{Event, PathFailure};
31
32use lightning::ln::channelmanager::AChannelManager;
33use lightning::ln::msgs::OnionMessageHandler;
34use lightning::ln::peer_handler::APeerManager;
35use lightning::onion_message::messenger::AOnionMessenger;
36use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
37use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
38use lightning::routing::utxo::UtxoLookup;
39use lightning::util::logger::Logger;
40use lightning::util::persist::Persister;
41#[cfg(feature = "std")]
42use lightning::util::wakers::Sleeper;
43use lightning_rapid_gossip_sync::RapidGossipSync;
44
45use core::ops::Deref;
46use core::time::Duration;
47
48#[cfg(feature = "std")]
49use core::sync::atomic::{AtomicBool, Ordering};
50#[cfg(feature = "std")]
51use std::sync::Arc;
52#[cfg(feature = "std")]
53use std::thread::{self, JoinHandle};
54#[cfg(feature = "std")]
55use std::time::Instant;
56
57#[cfg(not(feature = "std"))]
58use alloc::boxed::Box;
59
60/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
61/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
62/// responsibilities are:
63/// * Processing [`Event`]s with a user-provided [`EventHandler`].
64/// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
65///   writing it to disk/backups by invoking the callback given to it at startup.
66///   [`ChannelManager`] persistence should be done in the background.
67/// * Calling [`ChannelManager::timer_tick_occurred`], [`ChainMonitor::rebroadcast_pending_claims`]
68///   and [`PeerManager::timer_tick_occurred`] at the appropriate intervals.
69/// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
70///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
71///
72/// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
73/// upon as doing so may result in high latency.
74///
75/// # Note
76///
77/// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
78/// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
79/// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
80/// unilateral chain closure fees are at risk.
81///
82/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
83/// [`ChannelManager::timer_tick_occurred`]: lightning::ln::channelmanager::ChannelManager::timer_tick_occurred
84/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
85/// [`Event`]: lightning::events::Event
86/// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred
87/// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
88#[cfg(feature = "std")]
89#[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
90pub struct BackgroundProcessor {
91	stop_thread: Arc<AtomicBool>,
92	thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
93}
94
95#[cfg(not(test))]
96const FRESHNESS_TIMER: u64 = 60;
97#[cfg(test)]
98const FRESHNESS_TIMER: u64 = 1;
99
100#[cfg(all(not(test), not(debug_assertions)))]
101const PING_TIMER: u64 = 10;
102/// Signature operations take a lot longer without compiler optimisations.
103/// Increasing the ping timer allows for this but slower devices will be disconnected if the
104/// timeout is reached.
105#[cfg(all(not(test), debug_assertions))]
106const PING_TIMER: u64 = 30;
107#[cfg(test)]
108const PING_TIMER: u64 = 1;
109
110#[cfg(not(test))]
111const ONION_MESSAGE_HANDLER_TIMER: u64 = 10;
112#[cfg(test)]
113const ONION_MESSAGE_HANDLER_TIMER: u64 = 1;
114
115/// Prune the network graph of stale entries hourly.
116const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
117
118#[cfg(not(test))]
119const SCORER_PERSIST_TIMER: u64 = 60 * 5;
120#[cfg(test)]
121const SCORER_PERSIST_TIMER: u64 = 1;
122
123#[cfg(not(test))]
124const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
125#[cfg(test)]
126const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
127
128#[cfg(not(test))]
129const REBROADCAST_TIMER: u64 = 30;
130#[cfg(test)]
131const REBROADCAST_TIMER: u64 = 1;
132
133#[cfg(feature = "futures")]
134/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
135const fn min_u64(a: u64, b: u64) -> u64 {
136	if a < b {
137		a
138	} else {
139		b
140	}
141}
142#[cfg(feature = "futures")]
143const FASTEST_TIMER: u64 = min_u64(
144	min_u64(FRESHNESS_TIMER, PING_TIMER),
145	min_u64(SCORER_PERSIST_TIMER, min_u64(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)),
146);
147
148/// Either [`P2PGossipSync`] or [`RapidGossipSync`].
149pub enum GossipSync<
150	P: Deref<Target = P2PGossipSync<G, U, L>>,
151	R: Deref<Target = RapidGossipSync<G, L>>,
152	G: Deref<Target = NetworkGraph<L>>,
153	U: Deref,
154	L: Deref,
155> where
156	U::Target: UtxoLookup,
157	L::Target: Logger,
158{
159	/// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
160	P2P(P),
161	/// Rapid gossip sync from a trusted server.
162	Rapid(R),
163	/// No gossip sync.
164	None,
165}
166
167impl<
168		P: Deref<Target = P2PGossipSync<G, U, L>>,
169		R: Deref<Target = RapidGossipSync<G, L>>,
170		G: Deref<Target = NetworkGraph<L>>,
171		U: Deref,
172		L: Deref,
173	> GossipSync<P, R, G, U, L>
174where
175	U::Target: UtxoLookup,
176	L::Target: Logger,
177{
178	fn network_graph(&self) -> Option<&G> {
179		match self {
180			GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
181			GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
182			GossipSync::None => None,
183		}
184	}
185
186	fn prunable_network_graph(&self) -> Option<&G> {
187		match self {
188			GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
189			GossipSync::Rapid(gossip_sync) => {
190				if gossip_sync.is_initial_sync_complete() {
191					Some(gossip_sync.network_graph())
192				} else {
193					None
194				}
195			},
196			GossipSync::None => None,
197		}
198	}
199}
200
201/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
202impl<
203		P: Deref<Target = P2PGossipSync<G, U, L>>,
204		G: Deref<Target = NetworkGraph<L>>,
205		U: Deref,
206		L: Deref,
207	> GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
208where
209	U::Target: UtxoLookup,
210	L::Target: Logger,
211{
212	/// Initializes a new [`GossipSync::P2P`] variant.
213	pub fn p2p(gossip_sync: P) -> Self {
214		GossipSync::P2P(gossip_sync)
215	}
216}
217
218/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
219impl<
220		'a,
221		R: Deref<Target = RapidGossipSync<G, L>>,
222		G: Deref<Target = NetworkGraph<L>>,
223		L: Deref,
224	>
225	GossipSync<
226		&P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
227		R,
228		G,
229		&'a (dyn UtxoLookup + Send + Sync),
230		L,
231	> where
232	L::Target: Logger,
233{
234	/// Initializes a new [`GossipSync::Rapid`] variant.
235	pub fn rapid(gossip_sync: R) -> Self {
236		GossipSync::Rapid(gossip_sync)
237	}
238}
239
240/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
241impl<'a, L: Deref>
242	GossipSync<
243		&P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
244		&RapidGossipSync<&'a NetworkGraph<L>, L>,
245		&'a NetworkGraph<L>,
246		&'a (dyn UtxoLookup + Send + Sync),
247		L,
248	> where
249	L::Target: Logger,
250{
251	/// Initializes a new [`GossipSync::None`] variant.
252	pub fn none() -> Self {
253		GossipSync::None
254	}
255}
256
257fn handle_network_graph_update<L: Deref>(network_graph: &NetworkGraph<L>, event: &Event)
258where
259	L::Target: Logger,
260{
261	if let Event::PaymentPathFailed {
262		failure: PathFailure::OnPath { network_update: Some(ref upd) },
263		..
264	} = event
265	{
266		network_graph.handle_network_update(upd);
267	}
268}
269
270/// Updates scorer based on event and returns whether an update occurred so we can decide whether
271/// to persist.
272fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
273	scorer: &'a S, event: &Event, duration_since_epoch: Duration,
274) -> bool {
275	match event {
276		Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
277			let mut score = scorer.write_lock();
278			score.payment_path_failed(path, *scid, duration_since_epoch);
279		},
280		Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
281			// Reached if the destination explicitly failed it back. We treat this as a successful probe
282			// because the payment made it all the way to the destination with sufficient liquidity.
283			let mut score = scorer.write_lock();
284			score.probe_successful(path, duration_since_epoch);
285		},
286		Event::PaymentPathSuccessful { path, .. } => {
287			let mut score = scorer.write_lock();
288			score.payment_path_successful(path, duration_since_epoch);
289		},
290		Event::ProbeSuccessful { path, .. } => {
291			let mut score = scorer.write_lock();
292			score.probe_successful(path, duration_since_epoch);
293		},
294		Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
295			let mut score = scorer.write_lock();
296			score.probe_failed(path, *scid, duration_since_epoch);
297		},
298		_ => return false,
299	}
300	true
301}
302
303macro_rules! define_run_body {
304	(
305		$persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
306		$channel_manager: ident, $process_channel_manager_events: expr,
307		$onion_messenger: ident, $process_onion_message_handler_events: expr,
308		$peer_manager: ident, $gossip_sync: ident,
309		$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
310		$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
311	) => { {
312		log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
313		$channel_manager.get_cm().timer_tick_occurred();
314		log_trace!($logger, "Rebroadcasting monitor's pending claims on startup");
315		$chain_monitor.rebroadcast_pending_claims();
316
317		let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
318		let mut last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER);
319		let mut last_ping_call = $get_timer(PING_TIMER);
320		let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
321		let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
322		let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
323		let mut have_pruned = false;
324		let mut have_decayed_scorer = false;
325
326		loop {
327			$process_channel_manager_events;
328			$process_chain_monitor_events;
329			$process_onion_message_handler_events;
330
331			// Note that the PeerManager::process_events may block on ChannelManager's locks,
332			// hence it comes last here. When the ChannelManager finishes whatever it's doing,
333			// we want to ensure we get into `persist_manager` as quickly as we can, especially
334			// without running the normal event processing above and handing events to users.
335			//
336			// Specifically, on an *extremely* slow machine, we may see ChannelManager start
337			// processing a message effectively at any point during this loop. In order to
338			// minimize the time between such processing completing and persisting the updated
339			// ChannelManager, we want to minimize methods blocking on a ChannelManager
340			// generally, and as a fallback place such blocking only immediately before
341			// persistence.
342			$peer_manager.as_ref().process_events();
343
344			// Exit the loop if the background processor was requested to stop.
345			if $loop_exit_check {
346				log_trace!($logger, "Terminating background processor.");
347				break;
348			}
349
350			// We wait up to 100ms, but track how long it takes to detect being put to sleep,
351			// see `await_start`'s use below.
352			let mut await_start = None;
353			if $check_slow_await { await_start = Some($get_timer(1)); }
354			$await;
355			let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false };
356
357			// Exit the loop if the background processor was requested to stop.
358			if $loop_exit_check {
359				log_trace!($logger, "Terminating background processor.");
360				break;
361			}
362
363			if $channel_manager.get_cm().get_and_clear_needs_persistence() {
364				log_trace!($logger, "Persisting ChannelManager...");
365				$persister.persist_manager(&$channel_manager)?;
366				log_trace!($logger, "Done persisting ChannelManager.");
367			}
368			if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
369				log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
370				$channel_manager.get_cm().timer_tick_occurred();
371				last_freshness_call = $get_timer(FRESHNESS_TIMER);
372			}
373			if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) {
374				if let Some(om) = &$onion_messenger {
375					log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred");
376					om.get_om().timer_tick_occurred();
377				}
378				last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER);
379			}
380			if await_slow {
381				// On various platforms, we may be starved of CPU cycles for several reasons.
382				// E.g. on iOS, if we've been in the background, we will be entirely paused.
383				// Similarly, if we're on a desktop platform and the device has been asleep, we
384				// may not get any cycles.
385				// We detect this by checking if our max-100ms-sleep, above, ran longer than a
386				// full second, at which point we assume sockets may have been killed (they
387				// appear to be at least on some platforms, even if it has only been a second).
388				// Note that we have to take care to not get here just because user event
389				// processing was slow at the top of the loop. For example, the sample client
390				// may call Bitcoin Core RPCs during event handling, which very often takes
391				// more than a handful of seconds to complete, and shouldn't disconnect all our
392				// peers.
393				log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
394				$peer_manager.as_ref().disconnect_all_peers();
395				last_ping_call = $get_timer(PING_TIMER);
396			} else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
397				log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
398				$peer_manager.as_ref().timer_tick_occurred();
399				last_ping_call = $get_timer(PING_TIMER);
400			}
401
402			// Note that we want to run a graph prune once not long after startup before
403			// falling back to our usual hourly prunes. This avoids short-lived clients never
404			// pruning their network graph. We run once 60 seconds after startup before
405			// continuing our normal cadence. For RGS, since 60 seconds is likely too long,
406			// we prune after an initial sync completes.
407			let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
408			let prune_timer_elapsed = $timer_elapsed(&mut last_prune_call, prune_timer);
409			let should_prune = match $gossip_sync {
410				GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
411				_ => prune_timer_elapsed,
412			};
413			if should_prune {
414				// The network graph must not be pruned while rapid sync completion is pending
415				if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
416					if let Some(duration_since_epoch) = $time_fetch() {
417						log_trace!($logger, "Pruning and persisting network graph.");
418						network_graph.remove_stale_channels_and_tracking_with_time(duration_since_epoch.as_secs());
419					} else {
420						log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
421						log_trace!($logger, "Persisting network graph.");
422					}
423
424					if let Err(e) = $persister.persist_graph(network_graph) {
425						log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
426					}
427
428					have_pruned = true;
429				}
430				let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
431				last_prune_call = $get_timer(prune_timer);
432			}
433
434			if !have_decayed_scorer {
435				if let Some(ref scorer) = $scorer {
436					if let Some(duration_since_epoch) = $time_fetch() {
437						log_trace!($logger, "Calling time_passed on scorer at startup");
438						scorer.write_lock().time_passed(duration_since_epoch);
439					}
440				}
441				have_decayed_scorer = true;
442			}
443
444			if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
445				if let Some(ref scorer) = $scorer {
446					if let Some(duration_since_epoch) = $time_fetch() {
447						log_trace!($logger, "Calling time_passed and persisting scorer");
448						scorer.write_lock().time_passed(duration_since_epoch);
449					} else {
450						log_trace!($logger, "Persisting scorer");
451					}
452					if let Err(e) = $persister.persist_scorer(&scorer) {
453						log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
454					}
455				}
456				last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
457			}
458
459			if $timer_elapsed(&mut last_rebroadcast_call, REBROADCAST_TIMER) {
460				log_trace!($logger, "Rebroadcasting monitor's pending claims");
461				$chain_monitor.rebroadcast_pending_claims();
462				last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
463			}
464		}
465
466		// After we exit, ensure we persist the ChannelManager one final time - this avoids
467		// some races where users quit while channel updates were in-flight, with
468		// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
469		$persister.persist_manager(&$channel_manager)?;
470
471		// Persist Scorer on exit
472		if let Some(ref scorer) = $scorer {
473			$persister.persist_scorer(&scorer)?;
474		}
475
476		// Persist NetworkGraph on exit
477		if let Some(network_graph) = $gossip_sync.network_graph() {
478			$persister.persist_graph(network_graph)?;
479		}
480
481		Ok(())
482	} }
483}
484
485#[cfg(feature = "futures")]
486pub(crate) mod futures_util {
487	use core::future::Future;
488	use core::marker::Unpin;
489	use core::pin::Pin;
490	use core::task::{Poll, RawWaker, RawWakerVTable, Waker};
491	pub(crate) struct Selector<
492		A: Future<Output = ()> + Unpin,
493		B: Future<Output = ()> + Unpin,
494		C: Future<Output = ()> + Unpin,
495		D: Future<Output = bool> + Unpin,
496	> {
497		pub a: A,
498		pub b: B,
499		pub c: C,
500		pub d: D,
501	}
502
503	pub(crate) enum SelectorOutput {
504		A,
505		B,
506		C,
507		D(bool),
508	}
509
510	impl<
511			A: Future<Output = ()> + Unpin,
512			B: Future<Output = ()> + Unpin,
513			C: Future<Output = ()> + Unpin,
514			D: Future<Output = bool> + Unpin,
515		> Future for Selector<A, B, C, D>
516	{
517		type Output = SelectorOutput;
518		fn poll(
519			mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>,
520		) -> Poll<SelectorOutput> {
521			match Pin::new(&mut self.a).poll(ctx) {
522				Poll::Ready(()) => {
523					return Poll::Ready(SelectorOutput::A);
524				},
525				Poll::Pending => {},
526			}
527			match Pin::new(&mut self.b).poll(ctx) {
528				Poll::Ready(()) => {
529					return Poll::Ready(SelectorOutput::B);
530				},
531				Poll::Pending => {},
532			}
533			match Pin::new(&mut self.c).poll(ctx) {
534				Poll::Ready(()) => {
535					return Poll::Ready(SelectorOutput::C);
536				},
537				Poll::Pending => {},
538			}
539			match Pin::new(&mut self.d).poll(ctx) {
540				Poll::Ready(res) => {
541					return Poll::Ready(SelectorOutput::D(res));
542				},
543				Poll::Pending => {},
544			}
545			Poll::Pending
546		}
547	}
548
549	/// A selector that takes a future wrapped in an option that will be polled if it is `Some` and
550	/// will always be pending otherwise.
551	pub(crate) struct OptionalSelector<F: Future<Output = ()> + Unpin> {
552		pub optional_future: Option<F>,
553	}
554
555	impl<F: Future<Output = ()> + Unpin> Future for OptionalSelector<F> {
556		type Output = ();
557		fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
558			match self.optional_future.as_mut() {
559				Some(f) => match Pin::new(f).poll(ctx) {
560					Poll::Ready(()) => {
561						self.optional_future.take();
562						Poll::Ready(())
563					},
564					Poll::Pending => Poll::Pending,
565				},
566				None => Poll::Pending,
567			}
568		}
569	}
570
571	// If we want to poll a future without an async context to figure out if it has completed or
572	// not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
573	// but sadly there's a good bit of boilerplate here.
574	fn dummy_waker_clone(_: *const ()) -> RawWaker {
575		RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)
576	}
577	fn dummy_waker_action(_: *const ()) {}
578
579	const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
580		dummy_waker_clone,
581		dummy_waker_action,
582		dummy_waker_action,
583		dummy_waker_action,
584	);
585	pub(crate) fn dummy_waker() -> Waker {
586		unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) }
587	}
588}
589#[cfg(feature = "futures")]
590use core::task;
591#[cfg(feature = "futures")]
592use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
593
594/// Processes background events in a future.
595///
596/// `sleeper` should return a future which completes in the given amount of time and returns a
597/// boolean indicating whether the background processing should exit. Once `sleeper` returns a
598/// future which outputs `true`, the loop will exit and this function's future will complete.
599/// The `sleeper` future is free to return early after it has triggered the exit condition.
600///
601/// See [`BackgroundProcessor::start`] for information on which actions this handles.
602///
603/// Requires the `futures` feature. Note that while this method is available without the `std`
604/// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
605/// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
606/// manually instead.
607///
608/// The `mobile_interruptable_platform` flag should be set if we're currently running on a
609/// mobile device, where we may need to check for interruption of the application regularly. If you
610/// are unsure, you should set the flag, as the performance impact of it is minimal unless there
611/// are hundreds or thousands of simultaneous process calls running.
612///
613/// The `fetch_time` parameter should return the current wall clock time, if one is available. If
614/// no time is available, some features may be disabled, however the node will still operate fine.
615///
616/// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
617/// could setup `process_events_async` like this:
618/// ```
619/// # use lightning::io;
620/// # use lightning::events::ReplayEvent;
621/// # use std::sync::{Arc, RwLock};
622/// # use std::sync::atomic::{AtomicBool, Ordering};
623/// # use std::time::SystemTime;
624/// # use lightning_background_processor::{process_events_async, GossipSync};
625/// # struct Logger {}
626/// # impl lightning::util::logger::Logger for Logger {
627/// #     fn log(&self, _record: lightning::util::logger::Record) {}
628/// # }
629/// # struct Store {}
630/// # impl lightning::util::persist::KVStore for Store {
631/// #     fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
632/// #     fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
633/// #     fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
634/// #     fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
635/// # }
636/// # struct EventHandler {}
637/// # impl EventHandler {
638/// #     async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) }
639/// # }
640/// # #[derive(Eq, PartialEq, Clone, Hash)]
641/// # struct SocketDescriptor {}
642/// # impl lightning::ln::peer_handler::SocketDescriptor for SocketDescriptor {
643/// #     fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
644/// #     fn disconnect_socket(&mut self) {}
645/// # }
646/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<Store>>;
647/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
648/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
649/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
650/// # type OnionMessenger<B, F, FE> = lightning::onion_message::messenger::OnionMessenger<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<Logger>, Arc<ChannelManager<B, F, FE>>, Arc<lightning::onion_message::messenger::DefaultMessageRouter<Arc<NetworkGraph>, Arc<Logger>, Arc<lightning::sign::KeysManager>>>, Arc<ChannelManager<B, F, FE>>, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler>;
651/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
652/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger>;
653/// #
654/// # struct Node<
655/// #     B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
656/// #     F: lightning::chain::Filter + Send + Sync + 'static,
657/// #     FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
658/// #     UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
659/// # > {
660/// #     peer_manager: Arc<PeerManager<B, F, FE, UL>>,
661/// #     event_handler: Arc<EventHandler>,
662/// #     channel_manager: Arc<ChannelManager<B, F, FE>>,
663/// #     onion_messenger: Arc<OnionMessenger<B, F, FE>>,
664/// #     chain_monitor: Arc<ChainMonitor<B, F, FE>>,
665/// #     gossip_sync: Arc<P2PGossipSync<UL>>,
666/// #     persister: Arc<Store>,
667/// #     logger: Arc<Logger>,
668/// #     scorer: Arc<Scorer>,
669/// # }
670/// #
671/// # async fn setup_background_processing<
672/// #     B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
673/// #     F: lightning::chain::Filter + Send + Sync + 'static,
674/// #     FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
675/// #     UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
676/// # >(node: Node<B, F, FE, UL>) {
677///	let background_persister = Arc::clone(&node.persister);
678///	let background_event_handler = Arc::clone(&node.event_handler);
679///	let background_chain_mon = Arc::clone(&node.chain_monitor);
680///	let background_chan_man = Arc::clone(&node.channel_manager);
681///	let background_gossip_sync = GossipSync::p2p(Arc::clone(&node.gossip_sync));
682///	let background_peer_man = Arc::clone(&node.peer_manager);
683///	let background_onion_messenger = Arc::clone(&node.onion_messenger);
684///	let background_logger = Arc::clone(&node.logger);
685///	let background_scorer = Arc::clone(&node.scorer);
686///
687///	// Setup the sleeper.
688///	let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
689///
690///	let sleeper = move |d| {
691///		let mut receiver = stop_receiver.clone();
692///		Box::pin(async move {
693///			tokio::select!{
694///				_ = tokio::time::sleep(d) => false,
695///				_ = receiver.changed() => true,
696///			}
697///		})
698///	};
699///
700///	let mobile_interruptable_platform = false;
701///
702///	let handle = tokio::spawn(async move {
703///		process_events_async(
704///			background_persister,
705///			|e| background_event_handler.handle_event(e),
706///			background_chain_mon,
707///			background_chan_man,
708///			Some(background_onion_messenger),
709///			background_gossip_sync,
710///			background_peer_man,
711///			background_logger,
712///			Some(background_scorer),
713///			sleeper,
714///			mobile_interruptable_platform,
715///			|| Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap())
716///		)
717///		.await
718///		.expect("Failed to process events");
719///	});
720///
721///	// Stop the background processing.
722///	stop_sender.send(()).unwrap();
723///	handle.await.unwrap();
724///	# }
725///```
726#[cfg(feature = "futures")]
727pub async fn process_events_async<
728	'a,
729	UL: 'static + Deref + Send + Sync,
730	CF: 'static + Deref + Send + Sync,
731	T: 'static + Deref + Send + Sync,
732	F: 'static + Deref + Send + Sync,
733	G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
734	L: 'static + Deref + Send + Sync,
735	P: 'static + Deref + Send + Sync,
736	EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
737	EventHandler: Fn(Event) -> EventHandlerFuture,
738	PS: 'static + Deref + Send,
739	M: 'static
740		+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>>
741		+ Send
742		+ Sync,
743	CM: 'static + Deref + Send + Sync,
744	OM: 'static + Deref + Send + Sync,
745	PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
746	RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
747	PM: 'static + Deref + Send + Sync,
748	S: 'static + Deref<Target = SC> + Send + Sync,
749	SC: for<'b> WriteableScore<'b>,
750	SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
751	Sleeper: Fn(Duration) -> SleepFuture,
752	FetchTime: Fn() -> Option<Duration>,
753>(
754	persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
755	onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
756	logger: L, scorer: Option<S>, sleeper: Sleeper, mobile_interruptable_platform: bool,
757	fetch_time: FetchTime,
758) -> Result<(), lightning::io::Error>
759where
760	UL::Target: 'static + UtxoLookup,
761	CF::Target: 'static + chain::Filter,
762	T::Target: 'static + BroadcasterInterface,
763	F::Target: 'static + FeeEstimator,
764	L::Target: 'static + Logger,
765	P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
766	PS::Target: 'static + Persister<'a, CM, L, S>,
767	CM::Target: AChannelManager + Send + Sync,
768	OM::Target: AOnionMessenger + Send + Sync,
769	PM::Target: APeerManager + Send + Sync,
770{
771	let mut should_break = false;
772	let async_event_handler = |event| {
773		let network_graph = gossip_sync.network_graph();
774		let event_handler = &event_handler;
775		let scorer = &scorer;
776		let logger = &logger;
777		let persister = &persister;
778		let fetch_time = &fetch_time;
779		// We should be able to drop the Box once our MSRV is 1.68
780		Box::pin(async move {
781			if let Some(network_graph) = network_graph {
782				handle_network_graph_update(network_graph, &event)
783			}
784			if let Some(ref scorer) = scorer {
785				if let Some(duration_since_epoch) = fetch_time() {
786					if update_scorer(scorer, &event, duration_since_epoch) {
787						log_trace!(logger, "Persisting scorer after update");
788						if let Err(e) = persister.persist_scorer(&*scorer) {
789							log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
790							// We opt not to abort early on persistence failure here as persisting
791							// the scorer is non-critical and we still hope that it will have
792							// resolved itself when it is potentially critical in event handling
793							// below.
794						}
795					}
796				}
797			}
798			event_handler(event).await
799		})
800	};
801	define_run_body!(
802		persister,
803		chain_monitor,
804		chain_monitor.process_pending_events_async(async_event_handler).await,
805		channel_manager,
806		channel_manager.get_cm().process_pending_events_async(async_event_handler).await,
807		onion_messenger,
808		if let Some(om) = &onion_messenger {
809			om.get_om().process_pending_events_async(async_event_handler).await
810		},
811		peer_manager,
812		gossip_sync,
813		logger,
814		scorer,
815		should_break,
816		{
817			let om_fut = if let Some(om) = onion_messenger.as_ref() {
818				let fut = om.get_om().get_update_future();
819				OptionalSelector { optional_future: Some(fut) }
820			} else {
821				OptionalSelector { optional_future: None }
822			};
823			let fut = Selector {
824				a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
825				b: chain_monitor.get_update_future(),
826				c: om_fut,
827				d: sleeper(if mobile_interruptable_platform {
828					Duration::from_millis(100)
829				} else {
830					Duration::from_secs(FASTEST_TIMER)
831				}),
832			};
833			match fut.await {
834				SelectorOutput::A | SelectorOutput::B | SelectorOutput::C => {},
835				SelectorOutput::D(exit) => {
836					should_break = exit;
837				},
838			}
839		},
840		|t| sleeper(Duration::from_secs(t)),
841		|fut: &mut SleepFuture, _| {
842			let mut waker = dummy_waker();
843			let mut ctx = task::Context::from_waker(&mut waker);
844			match core::pin::Pin::new(fut).poll(&mut ctx) {
845				task::Poll::Ready(exit) => {
846					should_break = exit;
847					true
848				},
849				task::Poll::Pending => false,
850			}
851		},
852		mobile_interruptable_platform,
853		fetch_time,
854	)
855}
856
857#[cfg(feature = "std")]
858impl BackgroundProcessor {
859	/// Start a background thread that takes care of responsibilities enumerated in the [top-level
860	/// documentation].
861	///
862	/// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
863	/// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
864	/// either [`join`] or [`stop`].
865	///
866	/// # Data Persistence
867	///
868	/// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
869	/// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
870	/// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
871	/// provided implementation.
872	///
873	/// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
874	/// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
875	/// See the `lightning-persister` crate for LDK's provided implementation.
876	///
877	/// Typically, users should either implement [`Persister::persist_manager`] to never return an
878	/// error or call [`join`] and handle any error that may arise. For the latter case,
879	/// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
880	///
881	/// # Event Handling
882	///
883	/// `event_handler` is responsible for handling events that users should be notified of (e.g.,
884	/// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
885	/// functionality implemented by other handlers.
886	/// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
887	///
888	/// # Rapid Gossip Sync
889	///
890	/// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
891	/// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
892	/// until the [`RapidGossipSync`] instance completes its first sync.
893	///
894	/// [top-level documentation]: BackgroundProcessor
895	/// [`join`]: Self::join
896	/// [`stop`]: Self::stop
897	/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
898	/// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
899	/// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
900	/// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
901	/// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
902	/// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
903	pub fn start<
904		'a,
905		UL: 'static + Deref + Send + Sync,
906		CF: 'static + Deref + Send + Sync,
907		T: 'static + Deref + Send + Sync,
908		F: 'static + Deref + Send + Sync,
909		G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
910		L: 'static + Deref + Send + Sync,
911		P: 'static + Deref + Send + Sync,
912		EH: 'static + EventHandler + Send,
913		PS: 'static + Deref + Send,
914		M: 'static
915			+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>>
916			+ Send
917			+ Sync,
918		CM: 'static + Deref + Send + Sync,
919		OM: 'static + Deref + Send + Sync,
920		PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
921		RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
922		PM: 'static + Deref + Send + Sync,
923		S: 'static + Deref<Target = SC> + Send + Sync,
924		SC: for<'b> WriteableScore<'b>,
925	>(
926		persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
927		onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
928		logger: L, scorer: Option<S>,
929	) -> Self
930	where
931		UL::Target: 'static + UtxoLookup,
932		CF::Target: 'static + chain::Filter,
933		T::Target: 'static + BroadcasterInterface,
934		F::Target: 'static + FeeEstimator,
935		L::Target: 'static + Logger,
936		P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
937		PS::Target: 'static + Persister<'a, CM, L, S>,
938		CM::Target: AChannelManager + Send + Sync,
939		OM::Target: AOnionMessenger + Send + Sync,
940		PM::Target: APeerManager + Send + Sync,
941	{
942		let stop_thread = Arc::new(AtomicBool::new(false));
943		let stop_thread_clone = stop_thread.clone();
944		let handle = thread::spawn(move || -> Result<(), std::io::Error> {
945			let event_handler = |event| {
946				let network_graph = gossip_sync.network_graph();
947				if let Some(network_graph) = network_graph {
948					handle_network_graph_update(network_graph, &event)
949				}
950				if let Some(ref scorer) = scorer {
951					use std::time::SystemTime;
952					let duration_since_epoch = SystemTime::now()
953						.duration_since(SystemTime::UNIX_EPOCH)
954						.expect("Time should be sometime after 1970");
955					if update_scorer(scorer, &event, duration_since_epoch) {
956						log_trace!(logger, "Persisting scorer after update");
957						if let Err(e) = persister.persist_scorer(&scorer) {
958							log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
959						}
960					}
961				}
962				event_handler.handle_event(event)
963			};
964			define_run_body!(
965				persister,
966				chain_monitor,
967				chain_monitor.process_pending_events(&event_handler),
968				channel_manager,
969				channel_manager.get_cm().process_pending_events(&event_handler),
970				onion_messenger,
971				if let Some(om) = &onion_messenger {
972					om.get_om().process_pending_events(&event_handler)
973				},
974				peer_manager,
975				gossip_sync,
976				logger,
977				scorer,
978				stop_thread.load(Ordering::Acquire),
979				{
980					let sleeper = if let Some(om) = onion_messenger.as_ref() {
981						Sleeper::from_three_futures(
982							&channel_manager.get_cm().get_event_or_persistence_needed_future(),
983							&chain_monitor.get_update_future(),
984							&om.get_om().get_update_future(),
985						)
986					} else {
987						Sleeper::from_two_futures(
988							&channel_manager.get_cm().get_event_or_persistence_needed_future(),
989							&chain_monitor.get_update_future(),
990						)
991					};
992					sleeper.wait_timeout(Duration::from_millis(100));
993				},
994				|_| Instant::now(),
995				|time: &Instant, dur| time.elapsed().as_secs() > dur,
996				false,
997				|| {
998					use std::time::SystemTime;
999					Some(
1000						SystemTime::now()
1001							.duration_since(SystemTime::UNIX_EPOCH)
1002							.expect("Time should be sometime after 1970"),
1003					)
1004				},
1005			)
1006		});
1007		Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
1008	}
1009
1010	/// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
1011	/// [`ChannelManager`].
1012	///
1013	/// # Panics
1014	///
1015	/// This function panics if the background thread has panicked such as while persisting or
1016	/// handling events.
1017	///
1018	/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
1019	pub fn join(mut self) -> Result<(), std::io::Error> {
1020		assert!(self.thread_handle.is_some());
1021		self.join_thread()
1022	}
1023
1024	/// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
1025	/// [`ChannelManager`].
1026	///
1027	/// # Panics
1028	///
1029	/// This function panics if the background thread has panicked such as while persisting or
1030	/// handling events.
1031	///
1032	/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
1033	pub fn stop(mut self) -> Result<(), std::io::Error> {
1034		assert!(self.thread_handle.is_some());
1035		self.stop_and_join_thread()
1036	}
1037
1038	fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
1039		self.stop_thread.store(true, Ordering::Release);
1040		self.join_thread()
1041	}
1042
1043	fn join_thread(&mut self) -> Result<(), std::io::Error> {
1044		match self.thread_handle.take() {
1045			Some(handle) => handle.join().unwrap(),
1046			None => Ok(()),
1047		}
1048	}
1049}
1050
1051#[cfg(feature = "std")]
1052impl Drop for BackgroundProcessor {
1053	fn drop(&mut self) {
1054		self.stop_and_join_thread().unwrap();
1055	}
1056}
1057
1058#[cfg(all(feature = "std", test))]
1059mod tests {
1060	use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
1061	use bitcoin::constants::{genesis_block, ChainHash};
1062	use bitcoin::hashes::Hash;
1063	use bitcoin::locktime::absolute::LockTime;
1064	use bitcoin::network::Network;
1065	use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
1066	use bitcoin::transaction::Version;
1067	use bitcoin::transaction::{Transaction, TxOut};
1068	use bitcoin::{Amount, ScriptBuf, Txid};
1069	use core::sync::atomic::{AtomicBool, Ordering};
1070	use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
1071	use lightning::chain::transaction::OutPoint;
1072	use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1073	use lightning::events::{
1074		Event, MessageSendEvent, MessageSendEventsProvider, PathFailure, ReplayEvent,
1075	};
1076	use lightning::ln::channelmanager;
1077	use lightning::ln::channelmanager::{
1078		ChainParameters, PaymentId, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA,
1079	};
1080	use lightning::ln::functional_test_utils::*;
1081	use lightning::ln::msgs::{ChannelMessageHandler, Init};
1082	use lightning::ln::peer_handler::{
1083		IgnoringMessageHandler, MessageHandler, PeerManager, SocketDescriptor,
1084	};
1085	use lightning::ln::types::ChannelId;
1086	use lightning::onion_message::messenger::{DefaultMessageRouter, OnionMessenger};
1087	use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
1088	use lightning::routing::router::{CandidateRouteHop, DefaultRouter, Path, RouteHop};
1089	use lightning::routing::scoring::{ChannelUsage, LockableScore, ScoreLookUp, ScoreUpdate};
1090	use lightning::sign::{ChangeDestinationSource, InMemorySigner, KeysManager};
1091	use lightning::types::features::{ChannelFeatures, NodeFeatures};
1092	use lightning::types::payment::PaymentHash;
1093	use lightning::util::config::UserConfig;
1094	use lightning::util::persist::{
1095		KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1096		CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
1097		NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1098		SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1099		SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1100	};
1101	use lightning::util::ser::Writeable;
1102	use lightning::util::sweep::{OutputSpendStatus, OutputSweeper};
1103	use lightning::util::test_utils;
1104	use lightning::{get_event, get_event_msg};
1105	use lightning_persister::fs_store::FilesystemStore;
1106	use lightning_rapid_gossip_sync::RapidGossipSync;
1107	use std::collections::VecDeque;
1108	use std::path::PathBuf;
1109	use std::sync::mpsc::SyncSender;
1110	use std::sync::Arc;
1111	use std::time::Duration;
1112	use std::{env, fs};
1113
1114	const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
1115
1116	#[derive(Clone, Hash, PartialEq, Eq)]
1117	struct TestDescriptor {}
1118	impl SocketDescriptor for TestDescriptor {
1119		fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
1120			0
1121		}
1122
1123		fn disconnect_socket(&mut self) {}
1124	}
1125
1126	#[cfg(c_bindings)]
1127	type LockingWrapper<T> = lightning::routing::scoring::MultiThreadedLockableScore<T>;
1128	#[cfg(not(c_bindings))]
1129	type LockingWrapper<T> = std::sync::Mutex<T>;
1130
1131	type ChannelManager = channelmanager::ChannelManager<
1132		Arc<ChainMonitor>,
1133		Arc<test_utils::TestBroadcaster>,
1134		Arc<KeysManager>,
1135		Arc<KeysManager>,
1136		Arc<KeysManager>,
1137		Arc<test_utils::TestFeeEstimator>,
1138		Arc<
1139			DefaultRouter<
1140				Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1141				Arc<test_utils::TestLogger>,
1142				Arc<KeysManager>,
1143				Arc<LockingWrapper<TestScorer>>,
1144				(),
1145				TestScorer,
1146			>,
1147		>,
1148		Arc<
1149			DefaultMessageRouter<
1150				Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1151				Arc<test_utils::TestLogger>,
1152				Arc<KeysManager>,
1153			>,
1154		>,
1155		Arc<test_utils::TestLogger>,
1156	>;
1157
1158	type ChainMonitor = chainmonitor::ChainMonitor<
1159		InMemorySigner,
1160		Arc<test_utils::TestChainSource>,
1161		Arc<test_utils::TestBroadcaster>,
1162		Arc<test_utils::TestFeeEstimator>,
1163		Arc<test_utils::TestLogger>,
1164		Arc<FilesystemStore>,
1165	>;
1166
1167	type PGS = Arc<
1168		P2PGossipSync<
1169			Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1170			Arc<test_utils::TestChainSource>,
1171			Arc<test_utils::TestLogger>,
1172		>,
1173	>;
1174	type RGS = Arc<
1175		RapidGossipSync<
1176			Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1177			Arc<test_utils::TestLogger>,
1178		>,
1179	>;
1180
1181	type OM = OnionMessenger<
1182		Arc<KeysManager>,
1183		Arc<KeysManager>,
1184		Arc<test_utils::TestLogger>,
1185		Arc<ChannelManager>,
1186		Arc<
1187			DefaultMessageRouter<
1188				Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1189				Arc<test_utils::TestLogger>,
1190				Arc<KeysManager>,
1191			>,
1192		>,
1193		IgnoringMessageHandler,
1194		Arc<ChannelManager>,
1195		IgnoringMessageHandler,
1196		IgnoringMessageHandler,
1197	>;
1198
1199	struct Node {
1200		node: Arc<ChannelManager>,
1201		messenger: Arc<OM>,
1202		p2p_gossip_sync: PGS,
1203		rapid_gossip_sync: RGS,
1204		peer_manager: Arc<
1205			PeerManager<
1206				TestDescriptor,
1207				Arc<test_utils::TestChannelMessageHandler>,
1208				Arc<test_utils::TestRoutingMessageHandler>,
1209				Arc<OM>,
1210				Arc<test_utils::TestLogger>,
1211				IgnoringMessageHandler,
1212				Arc<KeysManager>,
1213			>,
1214		>,
1215		chain_monitor: Arc<ChainMonitor>,
1216		kv_store: Arc<FilesystemStore>,
1217		tx_broadcaster: Arc<test_utils::TestBroadcaster>,
1218		network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1219		logger: Arc<test_utils::TestLogger>,
1220		best_block: BestBlock,
1221		scorer: Arc<LockingWrapper<TestScorer>>,
1222		sweeper: Arc<
1223			OutputSweeper<
1224				Arc<test_utils::TestBroadcaster>,
1225				Arc<TestWallet>,
1226				Arc<test_utils::TestFeeEstimator>,
1227				Arc<dyn Filter + Sync + Send>,
1228				Arc<FilesystemStore>,
1229				Arc<test_utils::TestLogger>,
1230				Arc<KeysManager>,
1231			>,
1232		>,
1233	}
1234
1235	impl Node {
1236		fn p2p_gossip_sync(
1237			&self,
1238		) -> GossipSync<
1239			PGS,
1240			RGS,
1241			Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1242			Arc<test_utils::TestChainSource>,
1243			Arc<test_utils::TestLogger>,
1244		> {
1245			GossipSync::P2P(self.p2p_gossip_sync.clone())
1246		}
1247
1248		fn rapid_gossip_sync(
1249			&self,
1250		) -> GossipSync<
1251			PGS,
1252			RGS,
1253			Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1254			Arc<test_utils::TestChainSource>,
1255			Arc<test_utils::TestLogger>,
1256		> {
1257			GossipSync::Rapid(self.rapid_gossip_sync.clone())
1258		}
1259
1260		fn no_gossip_sync(
1261			&self,
1262		) -> GossipSync<
1263			PGS,
1264			RGS,
1265			Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1266			Arc<test_utils::TestChainSource>,
1267			Arc<test_utils::TestLogger>,
1268		> {
1269			GossipSync::None
1270		}
1271	}
1272
1273	impl Drop for Node {
1274		fn drop(&mut self) {
1275			let data_dir = self.kv_store.get_data_dir();
1276			match fs::remove_dir_all(data_dir.clone()) {
1277				Err(e) => {
1278					println!("Failed to remove test store directory {}: {}", data_dir.display(), e)
1279				},
1280				_ => {},
1281			}
1282		}
1283	}
1284
1285	struct Persister {
1286		graph_error: Option<(std::io::ErrorKind, &'static str)>,
1287		graph_persistence_notifier: Option<SyncSender<()>>,
1288		manager_error: Option<(std::io::ErrorKind, &'static str)>,
1289		scorer_error: Option<(std::io::ErrorKind, &'static str)>,
1290		kv_store: FilesystemStore,
1291	}
1292
1293	impl Persister {
1294		fn new(data_dir: PathBuf) -> Self {
1295			let kv_store = FilesystemStore::new(data_dir);
1296			Self {
1297				graph_error: None,
1298				graph_persistence_notifier: None,
1299				manager_error: None,
1300				scorer_error: None,
1301				kv_store,
1302			}
1303		}
1304
1305		fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
1306			Self { graph_error: Some((error, message)), ..self }
1307		}
1308
1309		fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
1310			Self { graph_persistence_notifier: Some(sender), ..self }
1311		}
1312
1313		fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
1314			Self { manager_error: Some((error, message)), ..self }
1315		}
1316
1317		fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
1318			Self { scorer_error: Some((error, message)), ..self }
1319		}
1320	}
1321
1322	impl KVStore for Persister {
1323		fn read(
1324			&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
1325		) -> lightning::io::Result<Vec<u8>> {
1326			self.kv_store.read(primary_namespace, secondary_namespace, key)
1327		}
1328
1329		fn write(
1330			&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
1331		) -> lightning::io::Result<()> {
1332			if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE
1333				&& secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE
1334				&& key == CHANNEL_MANAGER_PERSISTENCE_KEY
1335			{
1336				if let Some((error, message)) = self.manager_error {
1337					return Err(std::io::Error::new(error, message).into());
1338				}
1339			}
1340
1341			if primary_namespace == NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE
1342				&& secondary_namespace == NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE
1343				&& key == NETWORK_GRAPH_PERSISTENCE_KEY
1344			{
1345				if let Some(sender) = &self.graph_persistence_notifier {
1346					match sender.send(()) {
1347						Ok(()) => {},
1348						Err(std::sync::mpsc::SendError(())) => {
1349							println!("Persister failed to notify as receiver went away.")
1350						},
1351					}
1352				};
1353
1354				if let Some((error, message)) = self.graph_error {
1355					return Err(std::io::Error::new(error, message).into());
1356				}
1357			}
1358
1359			if primary_namespace == SCORER_PERSISTENCE_PRIMARY_NAMESPACE
1360				&& secondary_namespace == SCORER_PERSISTENCE_SECONDARY_NAMESPACE
1361				&& key == SCORER_PERSISTENCE_KEY
1362			{
1363				if let Some((error, message)) = self.scorer_error {
1364					return Err(std::io::Error::new(error, message).into());
1365				}
1366			}
1367
1368			self.kv_store.write(primary_namespace, secondary_namespace, key, buf)
1369		}
1370
1371		fn remove(
1372			&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
1373		) -> lightning::io::Result<()> {
1374			self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy)
1375		}
1376
1377		fn list(
1378			&self, primary_namespace: &str, secondary_namespace: &str,
1379		) -> lightning::io::Result<Vec<String>> {
1380			self.kv_store.list(primary_namespace, secondary_namespace)
1381		}
1382	}
1383
1384	struct TestScorer {
1385		event_expectations: Option<VecDeque<TestResult>>,
1386	}
1387
1388	#[derive(Debug)]
1389	enum TestResult {
1390		PaymentFailure { path: Path, short_channel_id: u64 },
1391		PaymentSuccess { path: Path },
1392		ProbeFailure { path: Path },
1393		ProbeSuccess { path: Path },
1394	}
1395
1396	impl TestScorer {
1397		fn new() -> Self {
1398			Self { event_expectations: None }
1399		}
1400
1401		fn expect(&mut self, expectation: TestResult) {
1402			self.event_expectations.get_or_insert_with(VecDeque::new).push_back(expectation);
1403		}
1404	}
1405
1406	impl lightning::util::ser::Writeable for TestScorer {
1407		fn write<W: lightning::util::ser::Writer>(
1408			&self, _: &mut W,
1409		) -> Result<(), lightning::io::Error> {
1410			Ok(())
1411		}
1412	}
1413
1414	impl ScoreLookUp for TestScorer {
1415		type ScoreParams = ();
1416		fn channel_penalty_msat(
1417			&self, _candidate: &CandidateRouteHop, _usage: ChannelUsage,
1418			_score_params: &Self::ScoreParams,
1419		) -> u64 {
1420			unimplemented!();
1421		}
1422	}
1423
1424	impl ScoreUpdate for TestScorer {
1425		fn payment_path_failed(
1426			&mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration,
1427		) {
1428			if let Some(expectations) = &mut self.event_expectations {
1429				match expectations.pop_front().unwrap() {
1430					TestResult::PaymentFailure { path, short_channel_id } => {
1431						assert_eq!(actual_path, &path);
1432						assert_eq!(actual_short_channel_id, short_channel_id);
1433					},
1434					TestResult::PaymentSuccess { path } => {
1435						panic!("Unexpected successful payment path: {:?}", path)
1436					},
1437					TestResult::ProbeFailure { path } => {
1438						panic!("Unexpected probe failure: {:?}", path)
1439					},
1440					TestResult::ProbeSuccess { path } => {
1441						panic!("Unexpected probe success: {:?}", path)
1442					},
1443				}
1444			}
1445		}
1446
1447		fn payment_path_successful(&mut self, actual_path: &Path, _: Duration) {
1448			if let Some(expectations) = &mut self.event_expectations {
1449				match expectations.pop_front().unwrap() {
1450					TestResult::PaymentFailure { path, .. } => {
1451						panic!("Unexpected payment path failure: {:?}", path)
1452					},
1453					TestResult::PaymentSuccess { path } => {
1454						assert_eq!(actual_path, &path);
1455					},
1456					TestResult::ProbeFailure { path } => {
1457						panic!("Unexpected probe failure: {:?}", path)
1458					},
1459					TestResult::ProbeSuccess { path } => {
1460						panic!("Unexpected probe success: {:?}", path)
1461					},
1462				}
1463			}
1464		}
1465
1466		fn probe_failed(&mut self, actual_path: &Path, _: u64, _: Duration) {
1467			if let Some(expectations) = &mut self.event_expectations {
1468				match expectations.pop_front().unwrap() {
1469					TestResult::PaymentFailure { path, .. } => {
1470						panic!("Unexpected payment path failure: {:?}", path)
1471					},
1472					TestResult::PaymentSuccess { path } => {
1473						panic!("Unexpected payment path success: {:?}", path)
1474					},
1475					TestResult::ProbeFailure { path } => {
1476						assert_eq!(actual_path, &path);
1477					},
1478					TestResult::ProbeSuccess { path } => {
1479						panic!("Unexpected probe success: {:?}", path)
1480					},
1481				}
1482			}
1483		}
1484		fn probe_successful(&mut self, actual_path: &Path, _: Duration) {
1485			if let Some(expectations) = &mut self.event_expectations {
1486				match expectations.pop_front().unwrap() {
1487					TestResult::PaymentFailure { path, .. } => {
1488						panic!("Unexpected payment path failure: {:?}", path)
1489					},
1490					TestResult::PaymentSuccess { path } => {
1491						panic!("Unexpected payment path success: {:?}", path)
1492					},
1493					TestResult::ProbeFailure { path } => {
1494						panic!("Unexpected probe failure: {:?}", path)
1495					},
1496					TestResult::ProbeSuccess { path } => {
1497						assert_eq!(actual_path, &path);
1498					},
1499				}
1500			}
1501		}
1502		fn time_passed(&mut self, _: Duration) {}
1503	}
1504
1505	#[cfg(c_bindings)]
1506	impl lightning::routing::scoring::Score for TestScorer {}
1507
1508	impl Drop for TestScorer {
1509		fn drop(&mut self) {
1510			if std::thread::panicking() {
1511				return;
1512			}
1513
1514			if let Some(event_expectations) = &self.event_expectations {
1515				if !event_expectations.is_empty() {
1516					panic!("Unsatisfied event expectations: {:?}", event_expectations);
1517				}
1518			}
1519		}
1520	}
1521
1522	struct TestWallet {}
1523
1524	impl ChangeDestinationSource for TestWallet {
1525		fn get_change_destination_script(&self) -> Result<ScriptBuf, ()> {
1526			Ok(ScriptBuf::new())
1527		}
1528	}
1529
1530	fn get_full_filepath(filepath: String, filename: String) -> String {
1531		let mut path = PathBuf::from(filepath);
1532		path.push(filename);
1533		path.to_str().unwrap().to_string()
1534	}
1535
1536	fn create_nodes(num_nodes: usize, persist_dir: &str) -> (String, Vec<Node>) {
1537		let persist_temp_path = env::temp_dir().join(persist_dir);
1538		let persist_dir = persist_temp_path.to_string_lossy().to_string();
1539		let network = Network::Bitcoin;
1540		let mut nodes = Vec::new();
1541		for i in 0..num_nodes {
1542			let tx_broadcaster = Arc::new(test_utils::TestBroadcaster::new(network));
1543			let fee_estimator = Arc::new(test_utils::TestFeeEstimator::new(253));
1544			let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
1545			let genesis_block = genesis_block(network);
1546			let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
1547			let scorer = Arc::new(LockingWrapper::new(TestScorer::new()));
1548			let now = Duration::from_secs(genesis_block.header.time as u64);
1549			let seed = [i as u8; 32];
1550			let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
1551			let router = Arc::new(DefaultRouter::new(
1552				network_graph.clone(),
1553				logger.clone(),
1554				Arc::clone(&keys_manager),
1555				scorer.clone(),
1556				Default::default(),
1557			));
1558			let msg_router = Arc::new(DefaultMessageRouter::new(
1559				network_graph.clone(),
1560				Arc::clone(&keys_manager),
1561			));
1562			let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
1563			let kv_store =
1564				Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into()));
1565			let now = Duration::from_secs(genesis_block.header.time as u64);
1566			let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
1567			let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(
1568				Some(chain_source.clone()),
1569				tx_broadcaster.clone(),
1570				logger.clone(),
1571				fee_estimator.clone(),
1572				kv_store.clone(),
1573			));
1574			let best_block = BestBlock::from_network(network);
1575			let params = ChainParameters { network, best_block };
1576			let manager = Arc::new(ChannelManager::new(
1577				fee_estimator.clone(),
1578				chain_monitor.clone(),
1579				tx_broadcaster.clone(),
1580				router.clone(),
1581				msg_router.clone(),
1582				logger.clone(),
1583				keys_manager.clone(),
1584				keys_manager.clone(),
1585				keys_manager.clone(),
1586				UserConfig::default(),
1587				params,
1588				genesis_block.header.time,
1589			));
1590			let messenger = Arc::new(OnionMessenger::new(
1591				keys_manager.clone(),
1592				keys_manager.clone(),
1593				logger.clone(),
1594				manager.clone(),
1595				msg_router.clone(),
1596				IgnoringMessageHandler {},
1597				manager.clone(),
1598				IgnoringMessageHandler {},
1599				IgnoringMessageHandler {},
1600			));
1601			let wallet = Arc::new(TestWallet {});
1602			let sweeper = Arc::new(OutputSweeper::new(
1603				best_block,
1604				Arc::clone(&tx_broadcaster),
1605				Arc::clone(&fee_estimator),
1606				None::<Arc<dyn Filter + Sync + Send>>,
1607				Arc::clone(&keys_manager),
1608				wallet,
1609				Arc::clone(&kv_store),
1610				Arc::clone(&logger),
1611			));
1612			let p2p_gossip_sync = Arc::new(P2PGossipSync::new(
1613				network_graph.clone(),
1614				Some(chain_source.clone()),
1615				logger.clone(),
1616			));
1617			let rapid_gossip_sync =
1618				Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
1619			let msg_handler = MessageHandler {
1620				chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(
1621					ChainHash::using_genesis_block(Network::Testnet),
1622				)),
1623				route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()),
1624				onion_message_handler: messenger.clone(),
1625				custom_message_handler: IgnoringMessageHandler {},
1626			};
1627			let peer_manager = Arc::new(PeerManager::new(
1628				msg_handler,
1629				0,
1630				&seed,
1631				logger.clone(),
1632				keys_manager.clone(),
1633			));
1634			let node = Node {
1635				node: manager,
1636				p2p_gossip_sync,
1637				rapid_gossip_sync,
1638				peer_manager,
1639				chain_monitor,
1640				kv_store,
1641				tx_broadcaster,
1642				network_graph,
1643				logger,
1644				best_block,
1645				scorer,
1646				sweeper,
1647				messenger,
1648			};
1649			nodes.push(node);
1650		}
1651
1652		for i in 0..num_nodes {
1653			for j in (i + 1)..num_nodes {
1654				let init_i = Init {
1655					features: nodes[j].node.init_features(),
1656					networks: None,
1657					remote_network_address: None,
1658				};
1659				nodes[i]
1660					.node
1661					.peer_connected(nodes[j].node.get_our_node_id(), &init_i, true)
1662					.unwrap();
1663				let init_j = Init {
1664					features: nodes[i].node.init_features(),
1665					networks: None,
1666					remote_network_address: None,
1667				};
1668				nodes[j]
1669					.node
1670					.peer_connected(nodes[i].node.get_our_node_id(), &init_j, false)
1671					.unwrap();
1672			}
1673		}
1674
1675		(persist_dir, nodes)
1676	}
1677
1678	macro_rules! open_channel {
1679		($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1680			begin_open_channel!($node_a, $node_b, $channel_value);
1681			let events = $node_a.node.get_and_clear_pending_events();
1682			assert_eq!(events.len(), 1);
1683			let (temporary_channel_id, tx) =
1684				handle_funding_generation_ready!(events[0], $channel_value);
1685			$node_a
1686				.node
1687				.funding_transaction_generated(
1688					temporary_channel_id,
1689					$node_b.node.get_our_node_id(),
1690					tx.clone(),
1691				)
1692				.unwrap();
1693			let msg_a = get_event_msg!(
1694				$node_a,
1695				MessageSendEvent::SendFundingCreated,
1696				$node_b.node.get_our_node_id()
1697			);
1698			$node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
1699			get_event!($node_b, Event::ChannelPending);
1700			let msg_b = get_event_msg!(
1701				$node_b,
1702				MessageSendEvent::SendFundingSigned,
1703				$node_a.node.get_our_node_id()
1704			);
1705			$node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
1706			get_event!($node_a, Event::ChannelPending);
1707			tx
1708		}};
1709	}
1710
1711	macro_rules! begin_open_channel {
1712		($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1713			$node_a
1714				.node
1715				.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None, None)
1716				.unwrap();
1717			let msg_a = get_event_msg!(
1718				$node_a,
1719				MessageSendEvent::SendOpenChannel,
1720				$node_b.node.get_our_node_id()
1721			);
1722			$node_b.node.handle_open_channel($node_a.node.get_our_node_id(), &msg_a);
1723			let msg_b = get_event_msg!(
1724				$node_b,
1725				MessageSendEvent::SendAcceptChannel,
1726				$node_a.node.get_our_node_id()
1727			);
1728			$node_a.node.handle_accept_channel($node_b.node.get_our_node_id(), &msg_b);
1729		}};
1730	}
1731
1732	macro_rules! handle_funding_generation_ready {
1733		($event: expr, $channel_value: expr) => {{
1734			match $event {
1735				Event::FundingGenerationReady {
1736					temporary_channel_id,
1737					channel_value_satoshis,
1738					ref output_script,
1739					user_channel_id,
1740					..
1741				} => {
1742					assert_eq!(channel_value_satoshis, $channel_value);
1743					assert_eq!(user_channel_id, 42);
1744
1745					let tx = Transaction {
1746						version: Version::ONE,
1747						lock_time: LockTime::ZERO,
1748						input: Vec::new(),
1749						output: vec![TxOut {
1750							value: Amount::from_sat(channel_value_satoshis),
1751							script_pubkey: output_script.clone(),
1752						}],
1753					};
1754					(temporary_channel_id, tx)
1755				},
1756				_ => panic!("Unexpected event"),
1757			}
1758		}};
1759	}
1760
1761	fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
1762		for i in 1..=depth {
1763			let prev_blockhash = node.best_block.block_hash;
1764			let height = node.best_block.height + 1;
1765			let header = create_dummy_header(prev_blockhash, height);
1766			let txdata = vec![(0, tx)];
1767			node.best_block = BestBlock::new(header.block_hash(), height);
1768			match i {
1769				1 => {
1770					node.node.transactions_confirmed(&header, &txdata, height);
1771					node.chain_monitor.transactions_confirmed(&header, &txdata, height);
1772					node.sweeper.transactions_confirmed(&header, &txdata, height);
1773				},
1774				x if x == depth => {
1775					// We need the TestBroadcaster to know about the new height so that it doesn't think
1776					// we're violating the time lock requirements of transactions broadcasted at that
1777					// point.
1778					let block = (genesis_block(Network::Bitcoin), height);
1779					node.tx_broadcaster.blocks.lock().unwrap().push(block);
1780					node.node.best_block_updated(&header, height);
1781					node.chain_monitor.best_block_updated(&header, height);
1782					node.sweeper.best_block_updated(&header, height);
1783				},
1784				_ => {},
1785			}
1786		}
1787	}
1788
1789	fn advance_chain(node: &mut Node, num_blocks: u32) {
1790		for i in 1..=num_blocks {
1791			let prev_blockhash = node.best_block.block_hash;
1792			let height = node.best_block.height + 1;
1793			let header = create_dummy_header(prev_blockhash, height);
1794			node.best_block = BestBlock::new(header.block_hash(), height);
1795			if i == num_blocks {
1796				// We need the TestBroadcaster to know about the new height so that it doesn't think
1797				// we're violating the time lock requirements of transactions broadcasted at that
1798				// point.
1799				let block = (genesis_block(Network::Bitcoin), height);
1800				node.tx_broadcaster.blocks.lock().unwrap().push(block);
1801				node.node.best_block_updated(&header, height);
1802				node.chain_monitor.best_block_updated(&header, height);
1803				node.sweeper.best_block_updated(&header, height);
1804			}
1805		}
1806	}
1807
1808	fn confirm_transaction(node: &mut Node, tx: &Transaction) {
1809		confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
1810	}
1811
1812	#[test]
1813	fn test_background_processor() {
1814		// Test that when a new channel is created, the ChannelManager needs to be re-persisted with
1815		// updates. Also test that when new updates are available, the manager signals that it needs
1816		// re-persistence and is successfully re-persisted.
1817		let (persist_dir, nodes) = create_nodes(2, "test_background_processor");
1818
1819		// Go through the channel creation process so that each node has something to persist. Since
1820		// open_channel consumes events, it must complete before starting BackgroundProcessor to
1821		// avoid a race with processing events.
1822		let tx = open_channel!(nodes[0], nodes[1], 100000);
1823
1824		// Initiate the background processors to watch each node.
1825		let data_dir = nodes[0].kv_store.get_data_dir();
1826		let persister = Arc::new(Persister::new(data_dir));
1827		let event_handler = |_: _| Ok(());
1828		let bg_processor = BackgroundProcessor::start(
1829			persister,
1830			event_handler,
1831			nodes[0].chain_monitor.clone(),
1832			nodes[0].node.clone(),
1833			Some(nodes[0].messenger.clone()),
1834			nodes[0].p2p_gossip_sync(),
1835			nodes[0].peer_manager.clone(),
1836			nodes[0].logger.clone(),
1837			Some(nodes[0].scorer.clone()),
1838		);
1839
1840		macro_rules! check_persisted_data {
1841			($node: expr, $filepath: expr) => {
1842				let mut expected_bytes = Vec::new();
1843				loop {
1844					expected_bytes.clear();
1845					match $node.write(&mut expected_bytes) {
1846						Ok(()) => match std::fs::read($filepath) {
1847							Ok(bytes) => {
1848								if bytes == expected_bytes {
1849									break;
1850								} else {
1851									continue;
1852								}
1853							},
1854							Err(_) => continue,
1855						},
1856						Err(e) => panic!("Unexpected error: {}", e),
1857					}
1858				}
1859			};
1860		}
1861
1862		// Check that the initial channel manager data is persisted as expected.
1863		let filepath =
1864			get_full_filepath(format!("{}_persister_0", &persist_dir), "manager".to_string());
1865		check_persisted_data!(nodes[0].node, filepath.clone());
1866
1867		loop {
1868			if !nodes[0].node.get_event_or_persist_condvar_value() {
1869				break;
1870			}
1871		}
1872
1873		// Force-close the channel.
1874		let error_message = "Channel force-closed";
1875		nodes[0]
1876			.node
1877			.force_close_broadcasting_latest_txn(
1878				&ChannelId::v1_from_funding_outpoint(OutPoint {
1879					txid: tx.compute_txid(),
1880					index: 0,
1881				}),
1882				&nodes[1].node.get_our_node_id(),
1883				error_message.to_string(),
1884			)
1885			.unwrap();
1886
1887		// Check that the force-close updates are persisted.
1888		check_persisted_data!(nodes[0].node, filepath.clone());
1889		loop {
1890			if !nodes[0].node.get_event_or_persist_condvar_value() {
1891				break;
1892			}
1893		}
1894
1895		// Check network graph is persisted
1896		let filepath =
1897			get_full_filepath(format!("{}_persister_0", &persist_dir), "network_graph".to_string());
1898		check_persisted_data!(nodes[0].network_graph, filepath.clone());
1899
1900		// Check scorer is persisted
1901		let filepath =
1902			get_full_filepath(format!("{}_persister_0", &persist_dir), "scorer".to_string());
1903		check_persisted_data!(nodes[0].scorer, filepath.clone());
1904
1905		if !std::thread::panicking() {
1906			bg_processor.stop().unwrap();
1907		}
1908	}
1909
1910	#[test]
1911	fn test_timer_tick_called() {
1912		// Test that:
1913		// - `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`,
1914		// - `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`,
1915		// - `PeerManager::timer_tick_occurred` is called every `PING_TIMER`, and
1916		// - `OnionMessageHandler::timer_tick_occurred` is called every `ONION_MESSAGE_HANDLER_TIMER`.
1917		let (_, nodes) = create_nodes(1, "test_timer_tick_called");
1918		let data_dir = nodes[0].kv_store.get_data_dir();
1919		let persister = Arc::new(Persister::new(data_dir));
1920		let event_handler = |_: _| Ok(());
1921		let bg_processor = BackgroundProcessor::start(
1922			persister,
1923			event_handler,
1924			nodes[0].chain_monitor.clone(),
1925			nodes[0].node.clone(),
1926			Some(nodes[0].messenger.clone()),
1927			nodes[0].no_gossip_sync(),
1928			nodes[0].peer_manager.clone(),
1929			nodes[0].logger.clone(),
1930			Some(nodes[0].scorer.clone()),
1931		);
1932		loop {
1933			let log_entries = nodes[0].logger.lines.lock().unwrap();
1934			let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
1935			let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
1936			let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
1937			let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string();
1938			if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some()
1939				&& log_entries.get(&("lightning_background_processor", desired_log_2)).is_some()
1940				&& log_entries.get(&("lightning_background_processor", desired_log_3)).is_some()
1941				&& log_entries.get(&("lightning_background_processor", desired_log_4)).is_some()
1942			{
1943				break;
1944			}
1945		}
1946
1947		if !std::thread::panicking() {
1948			bg_processor.stop().unwrap();
1949		}
1950	}
1951
1952	#[test]
1953	fn test_channel_manager_persist_error() {
1954		// Test that if we encounter an error during manager persistence, the thread panics.
1955		let (_, nodes) = create_nodes(2, "test_persist_error");
1956		open_channel!(nodes[0], nodes[1], 100000);
1957
1958		let data_dir = nodes[0].kv_store.get_data_dir();
1959		let persister = Arc::new(
1960			Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
1961		);
1962		let event_handler = |_: _| Ok(());
1963		let bg_processor = BackgroundProcessor::start(
1964			persister,
1965			event_handler,
1966			nodes[0].chain_monitor.clone(),
1967			nodes[0].node.clone(),
1968			Some(nodes[0].messenger.clone()),
1969			nodes[0].no_gossip_sync(),
1970			nodes[0].peer_manager.clone(),
1971			nodes[0].logger.clone(),
1972			Some(nodes[0].scorer.clone()),
1973		);
1974		match bg_processor.join() {
1975			Ok(_) => panic!("Expected error persisting manager"),
1976			Err(e) => {
1977				assert_eq!(e.kind(), std::io::ErrorKind::Other);
1978				assert_eq!(e.get_ref().unwrap().to_string(), "test");
1979			},
1980		}
1981	}
1982
1983	#[tokio::test]
1984	#[cfg(feature = "futures")]
1985	async fn test_channel_manager_persist_error_async() {
1986		// Test that if we encounter an error during manager persistence, the thread panics.
1987		let (_, nodes) = create_nodes(2, "test_persist_error_sync");
1988		open_channel!(nodes[0], nodes[1], 100000);
1989
1990		let data_dir = nodes[0].kv_store.get_data_dir();
1991		let persister = Arc::new(
1992			Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
1993		);
1994
1995		let bp_future = super::process_events_async(
1996			persister,
1997			|_: _| async { Ok(()) },
1998			nodes[0].chain_monitor.clone(),
1999			nodes[0].node.clone(),
2000			Some(nodes[0].messenger.clone()),
2001			nodes[0].rapid_gossip_sync(),
2002			nodes[0].peer_manager.clone(),
2003			nodes[0].logger.clone(),
2004			Some(nodes[0].scorer.clone()),
2005			move |dur: Duration| {
2006				Box::pin(async move {
2007					tokio::time::sleep(dur).await;
2008					false // Never exit
2009				})
2010			},
2011			false,
2012			|| Some(Duration::ZERO),
2013		);
2014		match bp_future.await {
2015			Ok(_) => panic!("Expected error persisting manager"),
2016			Err(e) => {
2017				assert_eq!(e.kind(), lightning::io::ErrorKind::Other);
2018				assert_eq!(e.get_ref().unwrap().to_string(), "test");
2019			},
2020		}
2021	}
2022
2023	#[test]
2024	fn test_network_graph_persist_error() {
2025		// Test that if we encounter an error during network graph persistence, an error gets returned.
2026		let (_, nodes) = create_nodes(2, "test_persist_network_graph_error");
2027		let data_dir = nodes[0].kv_store.get_data_dir();
2028		let persister =
2029			Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
2030		let event_handler = |_: _| Ok(());
2031		let bg_processor = BackgroundProcessor::start(
2032			persister,
2033			event_handler,
2034			nodes[0].chain_monitor.clone(),
2035			nodes[0].node.clone(),
2036			Some(nodes[0].messenger.clone()),
2037			nodes[0].p2p_gossip_sync(),
2038			nodes[0].peer_manager.clone(),
2039			nodes[0].logger.clone(),
2040			Some(nodes[0].scorer.clone()),
2041		);
2042
2043		match bg_processor.stop() {
2044			Ok(_) => panic!("Expected error persisting network graph"),
2045			Err(e) => {
2046				assert_eq!(e.kind(), std::io::ErrorKind::Other);
2047				assert_eq!(e.get_ref().unwrap().to_string(), "test");
2048			},
2049		}
2050	}
2051
2052	#[test]
2053	fn test_scorer_persist_error() {
2054		// Test that if we encounter an error during scorer persistence, an error gets returned.
2055		let (_, nodes) = create_nodes(2, "test_persist_scorer_error");
2056		let data_dir = nodes[0].kv_store.get_data_dir();
2057		let persister =
2058			Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
2059		let event_handler = |_: _| Ok(());
2060		let bg_processor = BackgroundProcessor::start(
2061			persister,
2062			event_handler,
2063			nodes[0].chain_monitor.clone(),
2064			nodes[0].node.clone(),
2065			Some(nodes[0].messenger.clone()),
2066			nodes[0].no_gossip_sync(),
2067			nodes[0].peer_manager.clone(),
2068			nodes[0].logger.clone(),
2069			Some(nodes[0].scorer.clone()),
2070		);
2071
2072		match bg_processor.stop() {
2073			Ok(_) => panic!("Expected error persisting scorer"),
2074			Err(e) => {
2075				assert_eq!(e.kind(), std::io::ErrorKind::Other);
2076				assert_eq!(e.get_ref().unwrap().to_string(), "test");
2077			},
2078		}
2079	}
2080
2081	#[test]
2082	fn test_background_event_handling() {
2083		let (_, mut nodes) = create_nodes(2, "test_background_event_handling");
2084		let node_0_id = nodes[0].node.get_our_node_id();
2085		let node_1_id = nodes[1].node.get_our_node_id();
2086
2087		let channel_value = 100000;
2088		let data_dir = nodes[0].kv_store.get_data_dir();
2089		let persister = Arc::new(Persister::new(data_dir.clone()));
2090
2091		// Set up a background event handler for FundingGenerationReady events.
2092		let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
2093		let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
2094		let event_handler = move |event: Event| {
2095			match event {
2096				Event::FundingGenerationReady { .. } => funding_generation_send
2097					.send(handle_funding_generation_ready!(event, channel_value))
2098					.unwrap(),
2099				Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
2100				Event::ChannelReady { .. } => {},
2101				_ => panic!("Unexpected event: {:?}", event),
2102			}
2103			Ok(())
2104		};
2105
2106		let bg_processor = BackgroundProcessor::start(
2107			persister,
2108			event_handler,
2109			nodes[0].chain_monitor.clone(),
2110			nodes[0].node.clone(),
2111			Some(nodes[0].messenger.clone()),
2112			nodes[0].no_gossip_sync(),
2113			nodes[0].peer_manager.clone(),
2114			nodes[0].logger.clone(),
2115			Some(nodes[0].scorer.clone()),
2116		);
2117
2118		// Open a channel and check that the FundingGenerationReady event was handled.
2119		begin_open_channel!(nodes[0], nodes[1], channel_value);
2120		let (temporary_channel_id, funding_tx) = funding_generation_recv
2121			.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
2122			.expect("FundingGenerationReady not handled within deadline");
2123		nodes[0]
2124			.node
2125			.funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
2126			.unwrap();
2127		let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
2128		nodes[1].node.handle_funding_created(node_0_id, &msg_0);
2129		get_event!(nodes[1], Event::ChannelPending);
2130		let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
2131		nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
2132		let _ = channel_pending_recv
2133			.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
2134			.expect("ChannelPending not handled within deadline");
2135
2136		// Confirm the funding transaction.
2137		confirm_transaction(&mut nodes[0], &funding_tx);
2138		let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, node_1_id);
2139		confirm_transaction(&mut nodes[1], &funding_tx);
2140		let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, node_0_id);
2141		nodes[0].node.handle_channel_ready(node_1_id, &bs_funding);
2142		let _as_channel_update =
2143			get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, node_1_id);
2144		nodes[1].node.handle_channel_ready(node_0_id, &as_funding);
2145		let _bs_channel_update =
2146			get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, node_0_id);
2147		let broadcast_funding =
2148			nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
2149		assert_eq!(broadcast_funding.compute_txid(), funding_tx.compute_txid());
2150		assert!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty());
2151
2152		if !std::thread::panicking() {
2153			bg_processor.stop().unwrap();
2154		}
2155
2156		// Set up a background event handler for SpendableOutputs events.
2157		let (sender, receiver) = std::sync::mpsc::sync_channel(1);
2158		let event_handler = move |event: Event| {
2159			match event {
2160				Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
2161				Event::ChannelReady { .. } => {},
2162				Event::ChannelClosed { .. } => {},
2163				_ => panic!("Unexpected event: {:?}", event),
2164			}
2165			Ok(())
2166		};
2167		let persister = Arc::new(Persister::new(data_dir));
2168		let bg_processor = BackgroundProcessor::start(
2169			persister,
2170			event_handler,
2171			nodes[0].chain_monitor.clone(),
2172			nodes[0].node.clone(),
2173			Some(nodes[0].messenger.clone()),
2174			nodes[0].no_gossip_sync(),
2175			nodes[0].peer_manager.clone(),
2176			nodes[0].logger.clone(),
2177			Some(nodes[0].scorer.clone()),
2178		);
2179
2180		// Force close the channel and check that the SpendableOutputs event was handled.
2181		let error_message = "Channel force-closed";
2182		nodes[0]
2183			.node
2184			.force_close_broadcasting_latest_txn(
2185				&nodes[0].node.list_channels()[0].channel_id,
2186				&node_1_id,
2187				error_message.to_string(),
2188			)
2189			.unwrap();
2190		let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
2191		confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
2192
2193		let event = receiver
2194			.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
2195			.expect("Events not handled within deadline");
2196		match event {
2197			Event::SpendableOutputs { outputs, channel_id } => {
2198				nodes[0]
2199					.sweeper
2200					.track_spendable_outputs(outputs, channel_id, false, Some(153))
2201					.unwrap();
2202			},
2203			_ => panic!("Unexpected event: {:?}", event),
2204		}
2205
2206		// Check we don't generate an initial sweeping tx until we reach the required height.
2207		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2208		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2209		if let Some(sweep_tx_0) = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop() {
2210			assert!(!tracked_output.is_spent_in(&sweep_tx_0));
2211			match tracked_output.status {
2212				OutputSpendStatus::PendingInitialBroadcast { delayed_until_height } => {
2213					assert_eq!(delayed_until_height, Some(153));
2214				},
2215				_ => panic!("Unexpected status"),
2216			}
2217		}
2218
2219		advance_chain(&mut nodes[0], 3);
2220
2221		// Check we generate an initial sweeping tx.
2222		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2223		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2224		let sweep_tx_0 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
2225		match tracked_output.status {
2226			OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
2227				assert_eq!(sweep_tx_0.compute_txid(), latest_spending_tx.compute_txid());
2228			},
2229			_ => panic!("Unexpected status"),
2230		}
2231
2232		// Check we regenerate and rebroadcast the sweeping tx each block.
2233		advance_chain(&mut nodes[0], 1);
2234		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2235		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2236		let sweep_tx_1 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
2237		match tracked_output.status {
2238			OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
2239				assert_eq!(sweep_tx_1.compute_txid(), latest_spending_tx.compute_txid());
2240			},
2241			_ => panic!("Unexpected status"),
2242		}
2243		assert_ne!(sweep_tx_0, sweep_tx_1);
2244
2245		advance_chain(&mut nodes[0], 1);
2246		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2247		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2248		let sweep_tx_2 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
2249		match tracked_output.status {
2250			OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
2251				assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
2252			},
2253			_ => panic!("Unexpected status"),
2254		}
2255		assert_ne!(sweep_tx_0, sweep_tx_2);
2256		assert_ne!(sweep_tx_1, sweep_tx_2);
2257
2258		// Check we still track the spendable outputs up to ANTI_REORG_DELAY confirmations.
2259		confirm_transaction_depth(&mut nodes[0], &sweep_tx_2, 5);
2260		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2261		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2262		match tracked_output.status {
2263			OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
2264				assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
2265			},
2266			_ => panic!("Unexpected status"),
2267		}
2268
2269		// Check we still see the transaction as confirmed if we unconfirm any untracked
2270		// transaction. (We previously had a bug that would mark tracked transactions as
2271		// unconfirmed if any transaction at an unknown block height would be unconfirmed.)
2272		let unconf_txid = Txid::from_slice(&[0; 32]).unwrap();
2273		nodes[0].sweeper.transaction_unconfirmed(&unconf_txid);
2274
2275		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2276		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2277		match tracked_output.status {
2278			OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
2279				assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
2280			},
2281			_ => panic!("Unexpected status"),
2282		}
2283
2284		// Check we stop tracking the spendable outputs when one of the txs reaches
2285		// ANTI_REORG_DELAY confirmations.
2286		confirm_transaction_depth(&mut nodes[0], &sweep_tx_0, ANTI_REORG_DELAY);
2287		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 0);
2288
2289		if !std::thread::panicking() {
2290			bg_processor.stop().unwrap();
2291		}
2292	}
2293
2294	#[test]
2295	fn test_event_handling_failures_are_replayed() {
2296		let (_, nodes) = create_nodes(2, "test_event_handling_failures_are_replayed");
2297		let channel_value = 100000;
2298		let data_dir = nodes[0].kv_store.get_data_dir();
2299		let persister = Arc::new(Persister::new(data_dir.clone()));
2300
2301		let (first_event_send, first_event_recv) = std::sync::mpsc::sync_channel(1);
2302		let (second_event_send, second_event_recv) = std::sync::mpsc::sync_channel(1);
2303		let should_fail_event_handling = Arc::new(AtomicBool::new(true));
2304		let event_handler = move |event: Event| {
2305			if let Ok(true) = should_fail_event_handling.compare_exchange(
2306				true,
2307				false,
2308				Ordering::Acquire,
2309				Ordering::Relaxed,
2310			) {
2311				first_event_send.send(event).unwrap();
2312				return Err(ReplayEvent());
2313			}
2314
2315			second_event_send.send(event).unwrap();
2316			Ok(())
2317		};
2318
2319		let bg_processor = BackgroundProcessor::start(
2320			persister,
2321			event_handler,
2322			nodes[0].chain_monitor.clone(),
2323			nodes[0].node.clone(),
2324			Some(nodes[0].messenger.clone()),
2325			nodes[0].no_gossip_sync(),
2326			nodes[0].peer_manager.clone(),
2327			nodes[0].logger.clone(),
2328			Some(nodes[0].scorer.clone()),
2329		);
2330
2331		begin_open_channel!(nodes[0], nodes[1], channel_value);
2332		assert_eq!(
2333			first_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE)).unwrap(),
2334			second_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE)).unwrap()
2335		);
2336
2337		if !std::thread::panicking() {
2338			bg_processor.stop().unwrap();
2339		}
2340	}
2341
2342	#[test]
2343	fn test_scorer_persistence() {
2344		let (_, nodes) = create_nodes(2, "test_scorer_persistence");
2345		let data_dir = nodes[0].kv_store.get_data_dir();
2346		let persister = Arc::new(Persister::new(data_dir));
2347		let event_handler = |_: _| Ok(());
2348		let bg_processor = BackgroundProcessor::start(
2349			persister,
2350			event_handler,
2351			nodes[0].chain_monitor.clone(),
2352			nodes[0].node.clone(),
2353			Some(nodes[0].messenger.clone()),
2354			nodes[0].no_gossip_sync(),
2355			nodes[0].peer_manager.clone(),
2356			nodes[0].logger.clone(),
2357			Some(nodes[0].scorer.clone()),
2358		);
2359
2360		loop {
2361			let log_entries = nodes[0].logger.lines.lock().unwrap();
2362			let expected_log = "Calling time_passed and persisting scorer".to_string();
2363			if log_entries.get(&("lightning_background_processor", expected_log)).is_some() {
2364				break;
2365			}
2366		}
2367
2368		if !std::thread::panicking() {
2369			bg_processor.stop().unwrap();
2370		}
2371	}
2372
2373	macro_rules! do_test_not_pruning_network_graph_until_graph_sync_completion {
2374		($nodes: expr, $receive: expr, $sleep: expr) => {
2375			let features = ChannelFeatures::empty();
2376			$nodes[0]
2377				.network_graph
2378				.add_channel_from_partial_announcement(
2379					42,
2380					53,
2381					features,
2382					$nodes[0].node.get_our_node_id(),
2383					$nodes[1].node.get_our_node_id(),
2384				)
2385				.expect("Failed to update channel from partial announcement");
2386			let original_graph_description = $nodes[0].network_graph.to_string();
2387			assert!(original_graph_description.contains("42: features: 0000, node_one:"));
2388			assert_eq!($nodes[0].network_graph.read_only().channels().len(), 1);
2389
2390			loop {
2391				$sleep;
2392				let log_entries = $nodes[0].logger.lines.lock().unwrap();
2393				let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
2394				if *log_entries.get(&("lightning_background_processor", loop_counter)).unwrap_or(&0)
2395					> 1
2396				{
2397					// Wait until the loop has gone around at least twice.
2398					break;
2399				}
2400			}
2401
2402			let initialization_input = vec![
2403				76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99,
2404				247, 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227,
2405				98, 218, 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61,
2406				250, 251, 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6,
2407				67, 2, 36, 125, 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47,
2408				115, 172, 63, 136, 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158,
2409				1, 242, 121, 152, 106, 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95,
2410				65, 3, 83, 185, 58, 138, 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136,
2411				149, 185, 226, 156, 137, 175, 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219,
2412				175, 168, 77, 4, 143, 38, 128, 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2,
2413				27, 0, 0, 0, 1, 0, 0, 255, 2, 68, 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0,
2414				0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232, 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255,
2415				8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0, 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6,
2416				11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
2417			];
2418			$nodes[0]
2419				.rapid_gossip_sync
2420				.update_network_graph_no_std(&initialization_input[..], Some(1642291930))
2421				.unwrap();
2422
2423			// this should have added two channels and pruned the previous one.
2424			assert_eq!($nodes[0].network_graph.read_only().channels().len(), 2);
2425
2426			$receive.expect("Network graph not pruned within deadline");
2427
2428			// all channels should now be pruned
2429			assert_eq!($nodes[0].network_graph.read_only().channels().len(), 0);
2430		};
2431	}
2432
2433	#[test]
2434	fn test_not_pruning_network_graph_until_graph_sync_completion() {
2435		let (sender, receiver) = std::sync::mpsc::sync_channel(1);
2436
2437		let (_, nodes) =
2438			create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion");
2439		let data_dir = nodes[0].kv_store.get_data_dir();
2440		let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
2441
2442		let event_handler = |_: _| Ok(());
2443		let background_processor = BackgroundProcessor::start(
2444			persister,
2445			event_handler,
2446			nodes[0].chain_monitor.clone(),
2447			nodes[0].node.clone(),
2448			Some(nodes[0].messenger.clone()),
2449			nodes[0].rapid_gossip_sync(),
2450			nodes[0].peer_manager.clone(),
2451			nodes[0].logger.clone(),
2452			Some(nodes[0].scorer.clone()),
2453		);
2454
2455		do_test_not_pruning_network_graph_until_graph_sync_completion!(
2456			nodes,
2457			receiver.recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5)),
2458			std::thread::sleep(Duration::from_millis(1))
2459		);
2460
2461		background_processor.stop().unwrap();
2462	}
2463
2464	#[tokio::test]
2465	#[cfg(feature = "futures")]
2466	async fn test_not_pruning_network_graph_until_graph_sync_completion_async() {
2467		let (sender, receiver) = std::sync::mpsc::sync_channel(1);
2468
2469		let (_, nodes) =
2470			create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
2471		let data_dir = nodes[0].kv_store.get_data_dir();
2472		let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
2473
2474		let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
2475		let bp_future = super::process_events_async(
2476			persister,
2477			|_: _| async { Ok(()) },
2478			nodes[0].chain_monitor.clone(),
2479			nodes[0].node.clone(),
2480			Some(nodes[0].messenger.clone()),
2481			nodes[0].rapid_gossip_sync(),
2482			nodes[0].peer_manager.clone(),
2483			nodes[0].logger.clone(),
2484			Some(nodes[0].scorer.clone()),
2485			move |dur: Duration| {
2486				let mut exit_receiver = exit_receiver.clone();
2487				Box::pin(async move {
2488					tokio::select! {
2489						_ = tokio::time::sleep(dur) => false,
2490						_ = exit_receiver.changed() => true,
2491					}
2492				})
2493			},
2494			false,
2495			|| Some(Duration::from_secs(1696300000)),
2496		);
2497
2498		let t1 = tokio::spawn(bp_future);
2499		let t2 = tokio::spawn(async move {
2500			do_test_not_pruning_network_graph_until_graph_sync_completion!(
2501				nodes,
2502				{
2503					let mut i = 0;
2504					loop {
2505						tokio::time::sleep(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER))
2506							.await;
2507						if let Ok(()) = receiver.try_recv() {
2508							break Ok::<(), ()>(());
2509						}
2510						assert!(i < 5);
2511						i += 1;
2512					}
2513				},
2514				tokio::time::sleep(Duration::from_millis(1)).await
2515			);
2516			exit_sender.send(()).unwrap();
2517		});
2518		let (r1, r2) = tokio::join!(t1, t2);
2519		r1.unwrap().unwrap();
2520		r2.unwrap()
2521	}
2522
2523	macro_rules! do_test_payment_path_scoring {
2524		($nodes: expr, $receive: expr) => {
2525			// Ensure that we update the scorer when relevant events are processed. In this case, we ensure
2526			// that we update the scorer upon a payment path succeeding (note that the channel must be
2527			// public or else we won't score it).
2528			// A background event handler for FundingGenerationReady events must be hooked up to a
2529			// running background processor.
2530			let scored_scid = 4242;
2531			let secp_ctx = Secp256k1::new();
2532			let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
2533			let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
2534
2535			let path = Path { hops: vec![RouteHop {
2536				pubkey: node_1_id,
2537				node_features: NodeFeatures::empty(),
2538				short_channel_id: scored_scid,
2539				channel_features: ChannelFeatures::empty(),
2540				fee_msat: 0,
2541				cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
2542				maybe_announced_channel: true,
2543			}], blinded_tail: None };
2544
2545			$nodes[0].scorer.write_lock().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
2546			$nodes[0].node.push_pending_event(Event::PaymentPathFailed {
2547				payment_id: None,
2548				payment_hash: PaymentHash([42; 32]),
2549				payment_failed_permanently: false,
2550				failure: PathFailure::OnPath { network_update: None },
2551				path: path.clone(),
2552				short_channel_id: Some(scored_scid),
2553			});
2554			let event = $receive.expect("PaymentPathFailed not handled within deadline");
2555			match event {
2556				Event::PaymentPathFailed { .. } => {},
2557				_ => panic!("Unexpected event"),
2558			}
2559
2560			// Ensure we'll score payments that were explicitly failed back by the destination as
2561			// ProbeSuccess.
2562			$nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
2563			$nodes[0].node.push_pending_event(Event::PaymentPathFailed {
2564				payment_id: None,
2565				payment_hash: PaymentHash([42; 32]),
2566				payment_failed_permanently: true,
2567				failure: PathFailure::OnPath { network_update: None },
2568				path: path.clone(),
2569				short_channel_id: None,
2570			});
2571			let event = $receive.expect("PaymentPathFailed not handled within deadline");
2572			match event {
2573				Event::PaymentPathFailed { .. } => {},
2574				_ => panic!("Unexpected event"),
2575			}
2576
2577			$nodes[0].scorer.write_lock().expect(TestResult::PaymentSuccess { path: path.clone() });
2578			$nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
2579				payment_id: PaymentId([42; 32]),
2580				payment_hash: None,
2581				path: path.clone(),
2582			});
2583			let event = $receive.expect("PaymentPathSuccessful not handled within deadline");
2584			match event {
2585				Event::PaymentPathSuccessful { .. } => {},
2586				_ => panic!("Unexpected event"),
2587			}
2588
2589			$nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
2590			$nodes[0].node.push_pending_event(Event::ProbeSuccessful {
2591				payment_id: PaymentId([42; 32]),
2592				payment_hash: PaymentHash([42; 32]),
2593				path: path.clone(),
2594			});
2595			let event = $receive.expect("ProbeSuccessful not handled within deadline");
2596			match event {
2597				Event::ProbeSuccessful  { .. } => {},
2598				_ => panic!("Unexpected event"),
2599			}
2600
2601			$nodes[0].scorer.write_lock().expect(TestResult::ProbeFailure { path: path.clone() });
2602			$nodes[0].node.push_pending_event(Event::ProbeFailed {
2603				payment_id: PaymentId([42; 32]),
2604				payment_hash: PaymentHash([42; 32]),
2605				path,
2606				short_channel_id: Some(scored_scid),
2607			});
2608			let event = $receive.expect("ProbeFailure not handled within deadline");
2609			match event {
2610				Event::ProbeFailed { .. } => {},
2611				_ => panic!("Unexpected event"),
2612			}
2613		}
2614	}
2615
2616	#[test]
2617	fn test_payment_path_scoring() {
2618		let (sender, receiver) = std::sync::mpsc::sync_channel(1);
2619		let event_handler = move |event: Event| {
2620			match event {
2621				Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
2622				Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
2623				Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
2624				Event::ProbeFailed { .. } => sender.send(event).unwrap(),
2625				_ => panic!("Unexpected event: {:?}", event),
2626			}
2627			Ok(())
2628		};
2629
2630		let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
2631		let data_dir = nodes[0].kv_store.get_data_dir();
2632		let persister = Arc::new(Persister::new(data_dir));
2633		let bg_processor = BackgroundProcessor::start(
2634			persister,
2635			event_handler,
2636			nodes[0].chain_monitor.clone(),
2637			nodes[0].node.clone(),
2638			Some(nodes[0].messenger.clone()),
2639			nodes[0].no_gossip_sync(),
2640			nodes[0].peer_manager.clone(),
2641			nodes[0].logger.clone(),
2642			Some(nodes[0].scorer.clone()),
2643		);
2644
2645		do_test_payment_path_scoring!(
2646			nodes,
2647			receiver.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
2648		);
2649
2650		if !std::thread::panicking() {
2651			bg_processor.stop().unwrap();
2652		}
2653
2654		let log_entries = nodes[0].logger.lines.lock().unwrap();
2655		let expected_log = "Persisting scorer after update".to_string();
2656		assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5);
2657	}
2658
2659	#[tokio::test]
2660	#[cfg(feature = "futures")]
2661	async fn test_payment_path_scoring_async() {
2662		let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
2663		let event_handler = move |event: Event| {
2664			let sender_ref = sender.clone();
2665			async move {
2666				match event {
2667					Event::PaymentPathFailed { .. } => sender_ref.send(event).await.unwrap(),
2668					Event::PaymentPathSuccessful { .. } => sender_ref.send(event).await.unwrap(),
2669					Event::ProbeSuccessful { .. } => sender_ref.send(event).await.unwrap(),
2670					Event::ProbeFailed { .. } => sender_ref.send(event).await.unwrap(),
2671					_ => panic!("Unexpected event: {:?}", event),
2672				}
2673				Ok(())
2674			}
2675		};
2676
2677		let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
2678		let data_dir = nodes[0].kv_store.get_data_dir();
2679		let persister = Arc::new(Persister::new(data_dir));
2680
2681		let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
2682
2683		let bp_future = super::process_events_async(
2684			persister,
2685			event_handler,
2686			nodes[0].chain_monitor.clone(),
2687			nodes[0].node.clone(),
2688			Some(nodes[0].messenger.clone()),
2689			nodes[0].no_gossip_sync(),
2690			nodes[0].peer_manager.clone(),
2691			nodes[0].logger.clone(),
2692			Some(nodes[0].scorer.clone()),
2693			move |dur: Duration| {
2694				let mut exit_receiver = exit_receiver.clone();
2695				Box::pin(async move {
2696					tokio::select! {
2697						_ = tokio::time::sleep(dur) => false,
2698						_ = exit_receiver.changed() => true,
2699					}
2700				})
2701			},
2702			false,
2703			|| Some(Duration::ZERO),
2704		);
2705		let t1 = tokio::spawn(bp_future);
2706		let t2 = tokio::spawn(async move {
2707			do_test_payment_path_scoring!(nodes, receiver.recv().await);
2708			exit_sender.send(()).unwrap();
2709
2710			let log_entries = nodes[0].logger.lines.lock().unwrap();
2711			let expected_log = "Persisting scorer after update".to_string();
2712			assert_eq!(
2713				*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(),
2714				5
2715			);
2716		});
2717
2718		let (r1, r2) = tokio::join!(t1, t2);
2719		r1.unwrap().unwrap();
2720		r2.unwrap()
2721	}
2722}