Skip to main content

lightning_background_processor/
lib.rs

1// This file is Copyright its original authors, visible in version control
2// history.
3//
4// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7// You may not use this file except in accordance with one or both of these
8// licenses.
9
10//! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
11//! running properly, and (2) either can or should be run in the background.
12#![cfg_attr(feature = "std", doc = "See docs for [`BackgroundProcessor`] for more details.")]
13#![deny(rustdoc::broken_intra_doc_links)]
14#![deny(rustdoc::private_intra_doc_links)]
15#![deny(missing_docs)]
16#![cfg_attr(docsrs, feature(doc_cfg))]
17#![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
18
19#[cfg(any(test, feature = "std"))]
20extern crate core;
21
22#[cfg(not(feature = "std"))]
23extern crate alloc;
24
25#[macro_use]
26extern crate lightning;
27extern crate lightning_rapid_gossip_sync;
28
29mod fwd_batch;
30
31use fwd_batch::BatchDelay;
32
33use lightning::chain;
34use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
35use lightning::chain::chainmonitor::{ChainMonitor, Persist};
36#[cfg(feature = "std")]
37use lightning::events::EventHandler;
38#[cfg(feature = "std")]
39use lightning::events::EventsProvider;
40use lightning::events::ReplayEvent;
41use lightning::events::{Event, PathFailure};
42use lightning::util::ser::Writeable;
43
44use lightning::ln::channelmanager::AChannelManager;
45use lightning::ln::msgs::OnionMessageHandler;
46use lightning::ln::peer_handler::APeerManager;
47use lightning::onion_message::messenger::AOnionMessenger;
48use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
49use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
50use lightning::routing::utxo::UtxoLookup;
51use lightning::sign::{
52	ChangeDestinationSource, ChangeDestinationSourceSync, EntropySource, OutputSpender,
53};
54use lightning::util::logger::Logger;
55use lightning::util::persist::{
56	KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
57	CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
58	NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
59	NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY,
60	SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
61};
62use lightning::util::sweep::{OutputSweeper, OutputSweeperSync};
63#[cfg(feature = "std")]
64use lightning::util::wakers::Sleeper;
65use lightning_rapid_gossip_sync::RapidGossipSync;
66
67use lightning_liquidity::ALiquidityManager;
68#[cfg(feature = "std")]
69use lightning_liquidity::ALiquidityManagerSync;
70
71use core::ops::Deref;
72use core::time::Duration;
73
74#[cfg(feature = "std")]
75use core::sync::atomic::{AtomicBool, Ordering};
76#[cfg(feature = "std")]
77use std::sync::Arc;
78#[cfg(feature = "std")]
79use std::thread::{self, JoinHandle};
80#[cfg(feature = "std")]
81use std::time::Instant;
82
83#[cfg(not(feature = "std"))]
84use alloc::boxed::Box;
85#[cfg(all(not(c_bindings), not(feature = "std")))]
86use alloc::sync::Arc;
87
88/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
89/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
90/// responsibilities are:
91/// * Processing [`Event`]s with a user-provided [`EventHandler`].
92/// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
93///   writing it to disk/backups by invoking the callback given to it at startup.
94///   [`ChannelManager`] persistence should be done in the background.
95/// * Calling [`ChannelManager::timer_tick_occurred`], [`ChainMonitor::rebroadcast_pending_claims`]
96///   and [`PeerManager::timer_tick_occurred`] at the appropriate intervals.
97/// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
98///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
99///
100/// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
101/// upon as doing so may result in high latency.
102///
103/// # Note
104///
105/// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
106/// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
107/// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
108/// unilateral chain closure fees are at risk.
109///
110/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
111/// [`ChannelManager::timer_tick_occurred`]: lightning::ln::channelmanager::ChannelManager::timer_tick_occurred
112/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
113/// [`Event`]: lightning::events::Event
114/// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred
115/// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
116#[cfg(feature = "std")]
117#[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
118pub struct BackgroundProcessor {
119	stop_thread: Arc<AtomicBool>,
120	thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
121}
122
123#[cfg(not(test))]
124const FRESHNESS_TIMER: Duration = Duration::from_secs(60);
125#[cfg(test)]
126const FRESHNESS_TIMER: Duration = Duration::from_secs(1);
127
128#[cfg(all(not(test), not(debug_assertions)))]
129const PING_TIMER: Duration = Duration::from_secs(10);
130/// Signature operations take a lot longer without compiler optimisations.
131/// Increasing the ping timer allows for this but slower devices will be disconnected if the
132/// timeout is reached.
133#[cfg(all(not(test), debug_assertions))]
134const PING_TIMER: Duration = Duration::from_secs(30);
135#[cfg(test)]
136const PING_TIMER: Duration = Duration::from_secs(1);
137
138#[cfg(not(test))]
139const ONION_MESSAGE_HANDLER_TIMER: Duration = Duration::from_secs(10);
140#[cfg(test)]
141const ONION_MESSAGE_HANDLER_TIMER: Duration = Duration::from_secs(1);
142
143/// Prune the network graph of stale entries hourly.
144const NETWORK_PRUNE_TIMER: Duration = Duration::from_secs(60 * 60);
145
146#[cfg(not(test))]
147const SCORER_PERSIST_TIMER: Duration = Duration::from_secs(60 * 5);
148#[cfg(test)]
149const SCORER_PERSIST_TIMER: Duration = Duration::from_secs(1);
150
151#[cfg(not(test))]
152const FIRST_NETWORK_PRUNE_TIMER: Duration = Duration::from_secs(60);
153#[cfg(test)]
154const FIRST_NETWORK_PRUNE_TIMER: Duration = Duration::from_secs(1);
155
156#[cfg(not(test))]
157const REBROADCAST_TIMER: Duration = Duration::from_secs(30);
158#[cfg(test)]
159const REBROADCAST_TIMER: Duration = Duration::from_secs(1);
160
161#[cfg(not(test))]
162const SWEEPER_TIMER: Duration = Duration::from_secs(30);
163#[cfg(test)]
164const SWEEPER_TIMER: Duration = Duration::from_secs(1);
165
166/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
167const fn min_duration(a: Duration, b: Duration) -> Duration {
168	if a.as_nanos() < b.as_nanos() {
169		a
170	} else {
171		b
172	}
173}
174const FASTEST_TIMER: Duration = min_duration(
175	min_duration(FRESHNESS_TIMER, PING_TIMER),
176	min_duration(SCORER_PERSIST_TIMER, min_duration(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)),
177);
178
179/// Either [`P2PGossipSync`] or [`RapidGossipSync`].
180pub enum GossipSync<
181	P: Deref<Target = P2PGossipSync<G, U, L>>,
182	R: Deref<Target = RapidGossipSync<G, L>>,
183	G: Deref<Target = NetworkGraph<L>>,
184	U: Deref,
185	L: Deref,
186> where
187	U::Target: UtxoLookup,
188	L::Target: Logger,
189{
190	/// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
191	P2P(P),
192	/// Rapid gossip sync from a trusted server.
193	Rapid(R),
194	/// No gossip sync.
195	None,
196}
197
198impl<
199		P: Deref<Target = P2PGossipSync<G, U, L>>,
200		R: Deref<Target = RapidGossipSync<G, L>>,
201		G: Deref<Target = NetworkGraph<L>>,
202		U: Deref,
203		L: Deref,
204	> GossipSync<P, R, G, U, L>
205where
206	U::Target: UtxoLookup,
207	L::Target: Logger,
208{
209	fn network_graph(&self) -> Option<&G> {
210		match self {
211			GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
212			GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
213			GossipSync::None => None,
214		}
215	}
216
217	fn prunable_network_graph(&self) -> Option<&G> {
218		match self {
219			GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
220			GossipSync::Rapid(gossip_sync) => {
221				if gossip_sync.is_initial_sync_complete() {
222					Some(gossip_sync.network_graph())
223				} else {
224					None
225				}
226			},
227			GossipSync::None => None,
228		}
229	}
230}
231
232/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
233impl<
234		P: Deref<Target = P2PGossipSync<G, U, L>>,
235		G: Deref<Target = NetworkGraph<L>>,
236		U: Deref,
237		L: Deref,
238	> GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
239where
240	U::Target: UtxoLookup,
241	L::Target: Logger,
242{
243	/// Initializes a new [`GossipSync::P2P`] variant.
244	pub fn p2p(gossip_sync: P) -> Self {
245		GossipSync::P2P(gossip_sync)
246	}
247}
248
249/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
250impl<
251		'a,
252		R: Deref<Target = RapidGossipSync<G, L>>,
253		G: Deref<Target = NetworkGraph<L>>,
254		L: Deref,
255	>
256	GossipSync<
257		&P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
258		R,
259		G,
260		&'a (dyn UtxoLookup + Send + Sync),
261		L,
262	> where
263	L::Target: Logger,
264{
265	/// Initializes a new [`GossipSync::Rapid`] variant.
266	pub fn rapid(gossip_sync: R) -> Self {
267		GossipSync::Rapid(gossip_sync)
268	}
269}
270
271/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
272impl<'a, L: Deref>
273	GossipSync<
274		&P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
275		&RapidGossipSync<&'a NetworkGraph<L>, L>,
276		&'a NetworkGraph<L>,
277		&'a (dyn UtxoLookup + Send + Sync),
278		L,
279	> where
280	L::Target: Logger,
281{
282	/// Initializes a new [`GossipSync::None`] variant.
283	pub fn none() -> Self {
284		GossipSync::None
285	}
286}
287
288fn handle_network_graph_update<L: Deref>(network_graph: &NetworkGraph<L>, event: &Event)
289where
290	L::Target: Logger,
291{
292	if let Event::PaymentPathFailed {
293		failure: PathFailure::OnPath { network_update: Some(ref upd) },
294		..
295	} = event
296	{
297		network_graph.handle_network_update(upd);
298	}
299}
300
301/// Updates scorer based on event and returns whether an update occurred so we can decide whether
302/// to persist.
303fn update_scorer<'a, S: Deref<Target = SC>, SC: 'a + WriteableScore<'a>>(
304	scorer: &'a S, event: &Event, duration_since_epoch: Duration,
305) -> bool {
306	match event {
307		Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
308			let mut score = scorer.write_lock();
309			score.payment_path_failed(path, *scid, duration_since_epoch);
310		},
311		Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
312			// Reached if the destination explicitly failed it back. We treat this as a successful probe
313			// because the payment made it all the way to the destination with sufficient liquidity.
314			let mut score = scorer.write_lock();
315			score.probe_successful(path, duration_since_epoch);
316		},
317		Event::PaymentPathSuccessful { path, .. } => {
318			let mut score = scorer.write_lock();
319			score.payment_path_successful(path, duration_since_epoch);
320		},
321		Event::ProbeSuccessful { path, .. } => {
322			let mut score = scorer.write_lock();
323			score.probe_successful(path, duration_since_epoch);
324		},
325		Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
326			let mut score = scorer.write_lock();
327			score.probe_failed(path, *scid, duration_since_epoch);
328		},
329		_ => return false,
330	}
331	true
332}
333
334#[cfg(all(not(c_bindings), feature = "std"))]
335type ScorerWrapper<T> = std::sync::RwLock<T>;
336
337#[cfg(all(not(c_bindings), not(feature = "std")))]
338type ScorerWrapper<T> = core::cell::RefCell<T>;
339
340#[cfg(not(c_bindings))]
341type DynRouter = lightning::routing::router::DefaultRouter<
342	&'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
343	&'static (dyn Logger + Send + Sync),
344	&'static (dyn EntropySource + Send + Sync),
345	&'static ScorerWrapper<
346		lightning::routing::scoring::ProbabilisticScorer<
347			&'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
348			&'static (dyn Logger + Send + Sync),
349		>,
350	>,
351	lightning::routing::scoring::ProbabilisticScoringFeeParameters,
352	lightning::routing::scoring::ProbabilisticScorer<
353		&'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
354		&'static (dyn Logger + Send + Sync),
355	>,
356>;
357
358#[cfg(not(c_bindings))]
359type DynMessageRouter = lightning::onion_message::messenger::DefaultMessageRouter<
360	&'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
361	&'static (dyn Logger + Send + Sync),
362	&'static (dyn EntropySource + Send + Sync),
363>;
364
365#[cfg(all(not(c_bindings), not(taproot)))]
366type DynSignerProvider = dyn lightning::sign::SignerProvider<EcdsaSigner = lightning::sign::InMemorySigner>
367	+ Send
368	+ Sync;
369
370#[cfg(all(not(c_bindings), taproot))]
371type DynSignerProvider = (dyn lightning::sign::SignerProvider<
372	EcdsaSigner = lightning::sign::InMemorySigner,
373	TaprootSigner = lightning::sign::InMemorySigner,
374> + Send
375     + Sync);
376
377#[cfg(not(c_bindings))]
378type DynChannelManager = lightning::ln::channelmanager::ChannelManager<
379	&'static (dyn chain::Watch<lightning::sign::InMemorySigner> + Send + Sync),
380	&'static (dyn BroadcasterInterface + Send + Sync),
381	&'static (dyn EntropySource + Send + Sync),
382	&'static (dyn lightning::sign::NodeSigner + Send + Sync),
383	&'static DynSignerProvider,
384	&'static (dyn FeeEstimator + Send + Sync),
385	&'static DynRouter,
386	&'static DynMessageRouter,
387	&'static (dyn Logger + Send + Sync),
388>;
389
390/// When initializing a background processor without an onion messenger, this can be used to avoid
391/// specifying a concrete `OnionMessenger` type.
392#[cfg(not(c_bindings))]
393pub const NO_ONION_MESSENGER: Option<
394	Arc<
395		dyn AOnionMessenger<
396				EntropySource = dyn EntropySource + Send + Sync,
397				ES = &(dyn EntropySource + Send + Sync),
398				NodeSigner = dyn lightning::sign::NodeSigner + Send + Sync,
399				NS = &(dyn lightning::sign::NodeSigner + Send + Sync),
400				Logger = dyn Logger + Send + Sync,
401				L = &'static (dyn Logger + Send + Sync),
402				NodeIdLookUp = DynChannelManager,
403				NL = &'static DynChannelManager,
404				MessageRouter = DynMessageRouter,
405				MR = &'static DynMessageRouter,
406				OffersMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
407				OMH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
408				AsyncPaymentsMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
409				APH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
410				DNSResolverMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
411				DRH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
412				CustomOnionMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
413				CMH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
414			> + Send
415			+ Sync,
416	>,
417> = None;
418
419/// When initializing a background processor without a liquidity manager, this can be used to avoid
420/// specifying a concrete `LiquidityManager` type.
421#[cfg(not(c_bindings))]
422pub const NO_LIQUIDITY_MANAGER: Option<
423	Arc<
424		dyn ALiquidityManager<
425				EntropySource = dyn EntropySource + Send + Sync,
426				ES = &(dyn EntropySource + Send + Sync),
427				NodeSigner = dyn lightning::sign::NodeSigner + Send + Sync,
428				NS = &(dyn lightning::sign::NodeSigner + Send + Sync),
429				AChannelManager = DynChannelManager,
430				CM = &DynChannelManager,
431				Filter = dyn chain::Filter + Send + Sync,
432				C = &(dyn chain::Filter + Send + Sync),
433				KVStore = dyn lightning::util::persist::KVStore + Send + Sync,
434				K = &(dyn lightning::util::persist::KVStore + Send + Sync),
435				TimeProvider = dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync,
436				TP = &(dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync),
437				BroadcasterInterface = dyn lightning::chain::chaininterface::BroadcasterInterface
438				                           + Send
439				                           + Sync,
440				T = &(dyn BroadcasterInterface + Send + Sync),
441			> + Send
442			+ Sync,
443	>,
444> = None;
445
446/// When initializing a background processor without a liquidity manager, this can be used to avoid
447/// specifying a concrete `LiquidityManagerSync` type.
448#[cfg(all(not(c_bindings), feature = "std"))]
449pub const NO_LIQUIDITY_MANAGER_SYNC: Option<
450	Arc<
451		dyn ALiquidityManagerSync<
452				EntropySource = dyn EntropySource + Send + Sync,
453				ES = &(dyn EntropySource + Send + Sync),
454				NodeSigner = dyn lightning::sign::NodeSigner + Send + Sync,
455				NS = &(dyn lightning::sign::NodeSigner + Send + Sync),
456				AChannelManager = DynChannelManager,
457				CM = &DynChannelManager,
458				Filter = dyn chain::Filter + Send + Sync,
459				C = &(dyn chain::Filter + Send + Sync),
460				KVStoreSync = dyn lightning::util::persist::KVStoreSync + Send + Sync,
461				KS = &(dyn lightning::util::persist::KVStoreSync + Send + Sync),
462				TimeProvider = dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync,
463				TP = &(dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync),
464				BroadcasterInterface = dyn lightning::chain::chaininterface::BroadcasterInterface
465				                           + Send
466				                           + Sync,
467				T = &(dyn BroadcasterInterface + Send + Sync),
468			> + Send
469			+ Sync,
470	>,
471> = None;
472
473pub(crate) mod futures_util {
474	use core::future::Future;
475	use core::marker::Unpin;
476	use core::pin::Pin;
477	use core::task::{Poll, RawWaker, RawWakerVTable, Waker};
478	pub(crate) struct Selector<
479		A: Future<Output = bool> + Unpin,
480		B: Future<Output = ()> + Unpin,
481		C: Future<Output = ()> + Unpin,
482		D: Future<Output = ()> + Unpin,
483		E: Future<Output = ()> + Unpin,
484	> {
485		pub a: A,
486		pub b: B,
487		pub c: C,
488		pub d: D,
489		pub e: E,
490	}
491
492	pub(crate) enum SelectorOutput {
493		A(bool),
494		B,
495		C,
496		D,
497		E,
498	}
499
500	impl<
501			A: Future<Output = bool> + Unpin,
502			B: Future<Output = ()> + Unpin,
503			C: Future<Output = ()> + Unpin,
504			D: Future<Output = ()> + Unpin,
505			E: Future<Output = ()> + Unpin,
506		> Future for Selector<A, B, C, D, E>
507	{
508		type Output = SelectorOutput;
509		fn poll(
510			mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>,
511		) -> Poll<SelectorOutput> {
512			// Bias the selector so it first polls the sleeper future, allowing to exit immediately
513			// if the flag is set.
514			match Pin::new(&mut self.a).poll(ctx) {
515				Poll::Ready(res) => {
516					return Poll::Ready(SelectorOutput::A(res));
517				},
518				Poll::Pending => {},
519			}
520			match Pin::new(&mut self.b).poll(ctx) {
521				Poll::Ready(()) => {
522					return Poll::Ready(SelectorOutput::B);
523				},
524				Poll::Pending => {},
525			}
526			match Pin::new(&mut self.c).poll(ctx) {
527				Poll::Ready(()) => {
528					return Poll::Ready(SelectorOutput::C);
529				},
530				Poll::Pending => {},
531			}
532			match Pin::new(&mut self.d).poll(ctx) {
533				Poll::Ready(()) => {
534					return Poll::Ready(SelectorOutput::D);
535				},
536				Poll::Pending => {},
537			}
538			match Pin::new(&mut self.e).poll(ctx) {
539				Poll::Ready(()) => {
540					return Poll::Ready(SelectorOutput::E);
541				},
542				Poll::Pending => {},
543			}
544			Poll::Pending
545		}
546	}
547
548	/// A selector that takes a future wrapped in an option that will be polled if it is `Some` and
549	/// will always be pending otherwise.
550	pub(crate) struct OptionalSelector<F: Future<Output = ()> + Unpin> {
551		pub optional_future: Option<F>,
552	}
553
554	impl<F: Future<Output = ()> + Unpin> Future for OptionalSelector<F> {
555		type Output = ();
556		fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
557			match self.optional_future.as_mut() {
558				Some(f) => match Pin::new(f).poll(ctx) {
559					Poll::Ready(()) => {
560						self.optional_future.take();
561						Poll::Ready(())
562					},
563					Poll::Pending => Poll::Pending,
564				},
565				None => Poll::Pending,
566			}
567		}
568	}
569
570	// If we want to poll a future without an async context to figure out if it has completed or
571	// not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
572	// but sadly there's a good bit of boilerplate here.
573	fn dummy_waker_clone(_: *const ()) -> RawWaker {
574		RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)
575	}
576	fn dummy_waker_action(_: *const ()) {}
577
578	const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
579		dummy_waker_clone,
580		dummy_waker_action,
581		dummy_waker_action,
582		dummy_waker_action,
583	);
584	pub(crate) fn dummy_waker() -> Waker {
585		unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) }
586	}
587
588	enum JoinerResult<ERR, F: Future<Output = Result<(), ERR>> + Unpin> {
589		Pending(Option<F>),
590		Ready(Result<(), ERR>),
591	}
592
593	pub(crate) struct Joiner<
594		ERR,
595		A: Future<Output = Result<(), ERR>> + Unpin,
596		B: Future<Output = Result<(), ERR>> + Unpin,
597		C: Future<Output = Result<(), ERR>> + Unpin,
598		D: Future<Output = Result<(), ERR>> + Unpin,
599		E: Future<Output = Result<(), ERR>> + Unpin,
600	> {
601		a: JoinerResult<ERR, A>,
602		b: JoinerResult<ERR, B>,
603		c: JoinerResult<ERR, C>,
604		d: JoinerResult<ERR, D>,
605		e: JoinerResult<ERR, E>,
606	}
607
608	impl<
609			ERR,
610			A: Future<Output = Result<(), ERR>> + Unpin,
611			B: Future<Output = Result<(), ERR>> + Unpin,
612			C: Future<Output = Result<(), ERR>> + Unpin,
613			D: Future<Output = Result<(), ERR>> + Unpin,
614			E: Future<Output = Result<(), ERR>> + Unpin,
615		> Joiner<ERR, A, B, C, D, E>
616	{
617		pub(crate) fn new() -> Self {
618			Self {
619				a: JoinerResult::Pending(None),
620				b: JoinerResult::Pending(None),
621				c: JoinerResult::Pending(None),
622				d: JoinerResult::Pending(None),
623				e: JoinerResult::Pending(None),
624			}
625		}
626
627		pub(crate) fn set_a(&mut self, fut: A) {
628			self.a = JoinerResult::Pending(Some(fut));
629		}
630		pub(crate) fn set_a_res(&mut self, res: Result<(), ERR>) {
631			self.a = JoinerResult::Ready(res);
632		}
633		pub(crate) fn set_b(&mut self, fut: B) {
634			self.b = JoinerResult::Pending(Some(fut));
635		}
636		pub(crate) fn set_c(&mut self, fut: C) {
637			self.c = JoinerResult::Pending(Some(fut));
638		}
639		pub(crate) fn set_d(&mut self, fut: D) {
640			self.d = JoinerResult::Pending(Some(fut));
641		}
642		pub(crate) fn set_e(&mut self, fut: E) {
643			self.e = JoinerResult::Pending(Some(fut));
644		}
645	}
646
647	impl<
648			ERR,
649			A: Future<Output = Result<(), ERR>> + Unpin,
650			B: Future<Output = Result<(), ERR>> + Unpin,
651			C: Future<Output = Result<(), ERR>> + Unpin,
652			D: Future<Output = Result<(), ERR>> + Unpin,
653			E: Future<Output = Result<(), ERR>> + Unpin,
654		> Future for Joiner<ERR, A, B, C, D, E>
655	where
656		Joiner<ERR, A, B, C, D, E>: Unpin,
657	{
658		type Output = [Result<(), ERR>; 5];
659		fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
660			let mut all_complete = true;
661			macro_rules! handle {
662				($val: ident) => {
663					match &mut (self.$val) {
664						JoinerResult::Pending(None) => {
665							self.$val = JoinerResult::Ready(Ok(()));
666						},
667						JoinerResult::<ERR, _>::Pending(Some(ref mut val)) => {
668							match Pin::new(val).poll(ctx) {
669								Poll::Ready(res) => {
670									self.$val = JoinerResult::Ready(res);
671								},
672								Poll::Pending => {
673									all_complete = false;
674								},
675							}
676						},
677						JoinerResult::Ready(_) => {},
678					}
679				};
680			}
681			handle!(a);
682			handle!(b);
683			handle!(c);
684			handle!(d);
685			handle!(e);
686
687			if all_complete {
688				let mut res = [Ok(()), Ok(()), Ok(()), Ok(()), Ok(())];
689				if let JoinerResult::Ready(ref mut val) = &mut self.a {
690					core::mem::swap(&mut res[0], val);
691				}
692				if let JoinerResult::Ready(ref mut val) = &mut self.b {
693					core::mem::swap(&mut res[1], val);
694				}
695				if let JoinerResult::Ready(ref mut val) = &mut self.c {
696					core::mem::swap(&mut res[2], val);
697				}
698				if let JoinerResult::Ready(ref mut val) = &mut self.d {
699					core::mem::swap(&mut res[3], val);
700				}
701				if let JoinerResult::Ready(ref mut val) = &mut self.e {
702					core::mem::swap(&mut res[4], val);
703				}
704				Poll::Ready(res)
705			} else {
706				Poll::Pending
707			}
708		}
709	}
710}
711use core::task;
712use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutput};
713
714/// Processes background events in a future.
715///
716/// `sleeper` should return a future which completes in the given amount of time and returns a
717/// boolean indicating whether the background processing should exit. Once `sleeper` returns a
718/// future which outputs `true`, the loop will exit and this function's future will complete.
719/// The `sleeper` future is free to return early after it has triggered the exit condition.
720///
721#[cfg_attr(
722	feature = "std",
723	doc = " See [`BackgroundProcessor::start`] for information on which actions this handles.\n"
724)]
725/// The `mobile_interruptable_platform` flag should be set if we're currently running on a
726/// mobile device, where we may need to check for interruption of the application regularly. If you
727/// are unsure, you should set the flag, as the performance impact of it is minimal unless there
728/// are hundreds or thousands of simultaneous process calls running.
729///
730/// The `fetch_time` parameter should return the current wall clock time, if one is available. If
731/// no time is available, some features may be disabled, however the node will still operate fine.
732///
733/// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
734/// could setup `process_events_async` like this:
735/// ```
736/// # use lightning::io;
737/// # use lightning::events::ReplayEvent;
738/// # use std::sync::{Arc, RwLock};
739/// # use std::sync::atomic::{AtomicBool, Ordering};
740/// # use std::time::SystemTime;
741/// # use lightning_background_processor::{process_events_async, GossipSync};
742/// # use core::future::Future;
743/// # use core::pin::Pin;
744/// # use lightning_liquidity::utils::time::TimeProvider;
745/// # struct Logger {}
746/// # impl lightning::util::logger::Logger for Logger {
747/// #     fn log(&self, _record: lightning::util::logger::Record) {}
748/// # }
749/// # struct StoreSync {}
750/// # impl lightning::util::persist::KVStoreSync for StoreSync {
751/// #     fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
752/// #     fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>) -> io::Result<()> { Ok(()) }
753/// #     fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
754/// #     fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
755/// # }
756/// # struct Store {}
757/// # impl lightning::util::persist::KVStore for Store {
758/// #     fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
759/// #     fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
760/// #     fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
761/// #     fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> { todo!() }
762/// # }
763/// # use core::time::Duration;
764/// # struct DefaultTimeProvider;
765/// #
766/// # impl TimeProvider for DefaultTimeProvider {
767/// #    fn duration_since_epoch(&self) -> Duration {
768/// #        use std::time::{SystemTime, UNIX_EPOCH};
769/// #        SystemTime::now().duration_since(UNIX_EPOCH).expect("system time before Unix epoch")
770/// #    }
771/// # }
772/// # struct EventHandler {}
773/// # impl EventHandler {
774/// #     async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) }
775/// # }
776/// # #[derive(Eq, PartialEq, Clone, Hash)]
777/// # struct SocketDescriptor {}
778/// # impl lightning::ln::peer_handler::SocketDescriptor for SocketDescriptor {
779/// #     fn send_data(&mut self, _data: &[u8], _continue_read: bool) -> usize { 0 }
780/// #     fn disconnect_socket(&mut self) {}
781/// # }
782/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
783/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
784/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
785/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
786/// # type OnionMessenger<B, F, FE> = lightning::onion_message::messenger::OnionMessenger<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<Logger>, Arc<ChannelManager<B, F, FE>>, Arc<lightning::onion_message::messenger::DefaultMessageRouter<Arc<NetworkGraph>, Arc<Logger>, Arc<lightning::sign::KeysManager>>>, Arc<ChannelManager<B, F, FE>>, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler>;
787/// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>, Arc<Store>, Arc<DefaultTimeProvider>, Arc<B>>;
788/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
789/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, StoreSync>;
790/// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<Store>, Arc<Logger>, Arc<O>>;
791///
792/// # struct Node<
793/// #     B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
794/// #     F: lightning::chain::Filter + Send + Sync + 'static,
795/// #     FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
796/// #     UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
797/// #     D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
798/// #     O: lightning::sign::OutputSpender + Send + Sync + 'static,
799/// # > {
800/// #     peer_manager: Arc<PeerManager<B, F, FE, UL>>,
801/// #     event_handler: Arc<EventHandler>,
802/// #     channel_manager: Arc<ChannelManager<B, F, FE>>,
803/// #     onion_messenger: Arc<OnionMessenger<B, F, FE>>,
804/// #     liquidity_manager: Arc<LiquidityManager<B, F, FE>>,
805/// #     chain_monitor: Arc<ChainMonitor<B, F, FE>>,
806/// #     gossip_sync: Arc<P2PGossipSync<UL>>,
807/// #     persister: Arc<Store>,
808/// #     logger: Arc<Logger>,
809/// #     scorer: Arc<Scorer>,
810/// #     sweeper: Arc<OutputSweeper<B, D, FE, F, O>>,
811/// # }
812/// #
813/// # async fn setup_background_processing<
814/// #     B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
815/// #     F: lightning::chain::Filter + Send + Sync + 'static,
816/// #     FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
817/// #     UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
818/// #     D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
819/// #     O: lightning::sign::OutputSpender + Send + Sync + 'static,
820/// # >(node: Node<B, F, FE, UL, D, O>) {
821///	let background_persister = Arc::clone(&node.persister);
822///	let background_event_handler = Arc::clone(&node.event_handler);
823///	let background_chain_mon = Arc::clone(&node.chain_monitor);
824///	let background_chan_man = Arc::clone(&node.channel_manager);
825///	let background_gossip_sync = GossipSync::p2p(Arc::clone(&node.gossip_sync));
826///	let background_peer_man = Arc::clone(&node.peer_manager);
827///	let background_onion_messenger = Arc::clone(&node.onion_messenger);
828///	let background_liquidity_manager = Arc::clone(&node.liquidity_manager);
829///	let background_logger = Arc::clone(&node.logger);
830///	let background_scorer = Arc::clone(&node.scorer);
831///	let background_sweeper = Arc::clone(&node.sweeper);
832///	// Setup the sleeper.
833#[cfg_attr(
834	feature = "std",
835	doc = "	let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());"
836)]
837#[cfg_attr(feature = "std", doc = "")]
838///	let sleeper = move |d| {
839#[cfg_attr(feature = "std", doc = "		let mut receiver = stop_receiver.clone();")]
840///		Box::pin(async move {
841///			tokio::select!{
842///				_ = tokio::time::sleep(d) => false,
843#[cfg_attr(feature = "std", doc = "				_ = receiver.changed() => true,")]
844///			}
845///		})
846///	};
847///
848///	let mobile_interruptable_platform = false;
849///
850#[cfg_attr(feature = "std", doc = "	let handle = tokio::spawn(async move {")]
851#[cfg_attr(
852	not(feature = "std"),
853	doc = "	let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();"
854)]
855#[cfg_attr(not(feature = "std"), doc = "	rt.block_on(async move {")]
856///		process_events_async(
857///			background_persister,
858///			|e| background_event_handler.handle_event(e),
859///			background_chain_mon,
860///			background_chan_man,
861///			Some(background_onion_messenger),
862///			background_gossip_sync,
863///			background_peer_man,
864///			Some(background_liquidity_manager),
865///			Some(background_sweeper),
866///			background_logger,
867///			Some(background_scorer),
868///			sleeper,
869///			mobile_interruptable_platform,
870///			|| Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap())
871///		)
872///		.await
873///		.expect("Failed to process events");
874///	});
875///
876///	// Stop the background processing.
877#[cfg_attr(feature = "std", doc = "	stop_sender.send(()).unwrap();")]
878#[cfg_attr(feature = "std", doc = "	handle.await.unwrap()")]
879///	# }
880///```
881pub async fn process_events_async<
882	'a,
883	UL: Deref,
884	CF: Deref,
885	T: Deref,
886	F: Deref,
887	G: Deref<Target = NetworkGraph<L>>,
888	L: Deref,
889	P: Deref,
890	EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
891	EventHandler: Fn(Event) -> EventHandlerFuture,
892	ES: Deref,
893	M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
894	CM: Deref,
895	OM: Deref,
896	PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
897	RGS: Deref<Target = RapidGossipSync<G, L>>,
898	PM: Deref,
899	LM: Deref,
900	D: Deref,
901	O: Deref,
902	K: Deref,
903	OS: Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
904	S: Deref<Target = SC>,
905	SC: for<'b> WriteableScore<'b>,
906	SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
907	Sleeper: Fn(Duration) -> SleepFuture,
908	FetchTime: Fn() -> Option<Duration>,
909>(
910	kv_store: K, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
911	onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
912	liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
913	sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
914) -> Result<(), lightning::io::Error>
915where
916	UL::Target: UtxoLookup,
917	CF::Target: chain::Filter,
918	T::Target: BroadcasterInterface,
919	F::Target: FeeEstimator,
920	L::Target: Logger,
921	P::Target: Persist<<CM::Target as AChannelManager>::Signer>,
922	ES::Target: EntropySource,
923	CM::Target: AChannelManager,
924	OM::Target: AOnionMessenger,
925	PM::Target: APeerManager,
926	LM::Target: ALiquidityManager,
927	O::Target: OutputSpender,
928	D::Target: ChangeDestinationSource,
929	K::Target: KVStore,
930{
931	let async_event_handler = |event| {
932		let network_graph = gossip_sync.network_graph();
933		let event_handler = &event_handler;
934		let scorer = &scorer;
935		let logger = &logger;
936		let kv_store = &kv_store;
937		let fetch_time = &fetch_time;
938		// We should be able to drop the Box once our MSRV is 1.68
939		Box::pin(async move {
940			if let Some(network_graph) = network_graph {
941				handle_network_graph_update(network_graph, &event)
942			}
943			if let Some(ref scorer) = scorer {
944				if let Some(duration_since_epoch) = fetch_time() {
945					if update_scorer(scorer, &event, duration_since_epoch) {
946						log_trace!(logger, "Persisting scorer after update");
947						if let Err(e) = kv_store
948							.write(
949								SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
950								SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
951								SCORER_PERSISTENCE_KEY,
952								scorer.encode(),
953							)
954							.await
955						{
956							log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
957							// We opt not to abort early on persistence failure here as persisting
958							// the scorer is non-critical and we still hope that it will have
959							// resolved itself when it is potentially critical in event handling
960							// below.
961						}
962					}
963				}
964			}
965			event_handler(event).await
966		})
967	};
968	let mut batch_delay = BatchDelay::new();
969
970	log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
971	channel_manager.get_cm().timer_tick_occurred();
972	log_trace!(logger, "Rebroadcasting monitor's pending claims on startup");
973	chain_monitor.rebroadcast_pending_claims();
974
975	let mut last_freshness_call = sleeper(FRESHNESS_TIMER);
976	let mut last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER);
977	let mut last_ping_call = sleeper(PING_TIMER);
978	let mut last_prune_call = sleeper(FIRST_NETWORK_PRUNE_TIMER);
979	let mut last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER);
980	let mut last_rebroadcast_call = sleeper(REBROADCAST_TIMER);
981	let mut last_sweeper_call = sleeper(SWEEPER_TIMER);
982	let mut have_pruned = false;
983	let mut have_decayed_scorer = false;
984
985	let mut last_forwards_processing_call = sleeper(batch_delay.get());
986
987	loop {
988		channel_manager.get_cm().process_pending_events_async(async_event_handler).await;
989		chain_monitor.process_pending_events_async(async_event_handler).await;
990		if let Some(om) = &onion_messenger {
991			om.get_om().process_pending_events_async(async_event_handler).await
992		}
993
994		// Note that the PeerManager::process_events may block on ChannelManager's locks,
995		// hence it comes last here. When the ChannelManager finishes whatever it's doing,
996		// we want to ensure we get into `persist_manager` as quickly as we can, especially
997		// without running the normal event processing above and handing events to users.
998		//
999		// Specifically, on an *extremely* slow machine, we may see ChannelManager start
1000		// processing a message effectively at any point during this loop. In order to
1001		// minimize the time between such processing completing and persisting the updated
1002		// ChannelManager, we want to minimize methods blocking on a ChannelManager
1003		// generally, and as a fallback place such blocking only immediately before
1004		// persistence.
1005		peer_manager.as_ref().process_events();
1006		match check_and_reset_sleeper(&mut last_forwards_processing_call, || {
1007			sleeper(batch_delay.next())
1008		}) {
1009			Some(false) => {
1010				channel_manager.get_cm().process_pending_htlc_forwards();
1011			},
1012			Some(true) => break,
1013			None => {},
1014		}
1015
1016		// We wait up to 100ms, but track how long it takes to detect being put to sleep,
1017		// see `await_start`'s use below.
1018		let mut await_start = None;
1019		if mobile_interruptable_platform {
1020			await_start = Some(sleeper(Duration::from_secs(1)));
1021		}
1022		let om_fut = if let Some(om) = onion_messenger.as_ref() {
1023			let fut = om.get_om().get_update_future();
1024			OptionalSelector { optional_future: Some(fut) }
1025		} else {
1026			OptionalSelector { optional_future: None }
1027		};
1028		let lm_fut = if let Some(lm) = liquidity_manager.as_ref() {
1029			let fut = lm.get_lm().get_pending_msgs_or_needs_persist_future();
1030			OptionalSelector { optional_future: Some(fut) }
1031		} else {
1032			OptionalSelector { optional_future: None }
1033		};
1034		let needs_processing = channel_manager.get_cm().needs_pending_htlc_processing();
1035		let sleep_delay = match (needs_processing, mobile_interruptable_platform) {
1036			(true, true) => batch_delay.get().min(Duration::from_millis(100)),
1037			(true, false) => batch_delay.get().min(FASTEST_TIMER),
1038			(false, true) => Duration::from_millis(100),
1039			(false, false) => FASTEST_TIMER,
1040		};
1041		let fut = Selector {
1042			a: sleeper(sleep_delay),
1043			b: channel_manager.get_cm().get_event_or_persistence_needed_future(),
1044			c: chain_monitor.get_update_future(),
1045			d: om_fut,
1046			e: lm_fut,
1047		};
1048		match fut.await {
1049			SelectorOutput::B | SelectorOutput::C | SelectorOutput::D | SelectorOutput::E => {},
1050			SelectorOutput::A(exit) => {
1051				if exit {
1052					break;
1053				}
1054			},
1055		}
1056
1057		let await_slow = if mobile_interruptable_platform {
1058			// Specify a zero new sleeper timeout because we won't use the new sleeper. It is re-initialized in the next
1059			// loop iteration.
1060			match check_and_reset_sleeper(&mut await_start.unwrap(), || sleeper(Duration::ZERO)) {
1061				Some(true) => break,
1062				Some(false) => true,
1063				None => false,
1064			}
1065		} else {
1066			false
1067		};
1068		match check_and_reset_sleeper(&mut last_freshness_call, || sleeper(FRESHNESS_TIMER)) {
1069			Some(false) => {
1070				log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
1071				channel_manager.get_cm().timer_tick_occurred();
1072			},
1073			Some(true) => break,
1074			None => {},
1075		}
1076
1077		let mut futures = Joiner::new();
1078
1079		if channel_manager.get_cm().get_and_clear_needs_persistence() {
1080			log_trace!(logger, "Persisting ChannelManager...");
1081
1082			let fut = async {
1083				kv_store
1084					.write(
1085						CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1086						CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1087						CHANNEL_MANAGER_PERSISTENCE_KEY,
1088						channel_manager.get_cm().encode(),
1089					)
1090					.await
1091			};
1092			// TODO: Once our MSRV is 1.68 we should be able to drop the Box
1093			let mut fut = Box::pin(fut);
1094
1095			// Because persisting the ChannelManager is important to avoid accidental
1096			// force-closures, go ahead and poll the future once before we do slightly more
1097			// CPU-intensive tasks in the form of NetworkGraph pruning or scorer time-stepping
1098			// below. This will get it moving but won't block us for too long if the underlying
1099			// future is actually async.
1100			use core::future::Future;
1101			let mut waker = dummy_waker();
1102			let mut ctx = task::Context::from_waker(&mut waker);
1103			match core::pin::Pin::new(&mut fut).poll(&mut ctx) {
1104				task::Poll::Ready(res) => futures.set_a_res(res),
1105				task::Poll::Pending => futures.set_a(fut),
1106			}
1107
1108			log_trace!(logger, "Done persisting ChannelManager.");
1109		}
1110
1111		// Note that we want to run a graph prune once not long after startup before
1112		// falling back to our usual hourly prunes. This avoids short-lived clients never
1113		// pruning their network graph. We run once 60 seconds after startup before
1114		// continuing our normal cadence. For RGS, since 60 seconds is likely too long,
1115		// we prune after an initial sync completes.
1116		let prune_timer = if gossip_sync.prunable_network_graph().is_some() {
1117			NETWORK_PRUNE_TIMER
1118		} else {
1119			FIRST_NETWORK_PRUNE_TIMER
1120		};
1121		let prune_timer_elapsed = {
1122			match check_and_reset_sleeper(&mut last_prune_call, || sleeper(prune_timer)) {
1123				Some(false) => true,
1124				Some(true) => break,
1125				None => false,
1126			}
1127		};
1128
1129		let should_prune = match gossip_sync {
1130			GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
1131			_ => prune_timer_elapsed,
1132		};
1133		if should_prune {
1134			// The network graph must not be pruned while rapid sync completion is pending
1135			if let Some(network_graph) = gossip_sync.prunable_network_graph() {
1136				if let Some(duration_since_epoch) = fetch_time() {
1137					log_trace!(logger, "Pruning and persisting network graph.");
1138					network_graph.remove_stale_channels_and_tracking_with_time(
1139						duration_since_epoch.as_secs(),
1140					);
1141				} else {
1142					log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually.");
1143					log_trace!(logger, "Persisting network graph.");
1144				}
1145				let fut = async {
1146					if let Err(e) = kv_store
1147						.write(
1148							NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1149							NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1150							NETWORK_GRAPH_PERSISTENCE_KEY,
1151							network_graph.encode(),
1152						)
1153						.await
1154					{
1155						log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
1156					}
1157
1158					Ok(())
1159				};
1160
1161				// TODO: Once our MSRV is 1.68 we should be able to drop the Box
1162				futures.set_b(Box::pin(fut));
1163
1164				have_pruned = true;
1165			}
1166		}
1167		if !have_decayed_scorer {
1168			if let Some(ref scorer) = scorer {
1169				if let Some(duration_since_epoch) = fetch_time() {
1170					log_trace!(logger, "Calling time_passed on scorer at startup");
1171					scorer.write_lock().time_passed(duration_since_epoch);
1172				}
1173			}
1174			have_decayed_scorer = true;
1175		}
1176		match check_and_reset_sleeper(&mut last_scorer_persist_call, || {
1177			sleeper(SCORER_PERSIST_TIMER)
1178		}) {
1179			Some(false) => {
1180				if let Some(ref scorer) = scorer {
1181					if let Some(duration_since_epoch) = fetch_time() {
1182						log_trace!(logger, "Calling time_passed and persisting scorer");
1183						scorer.write_lock().time_passed(duration_since_epoch);
1184					} else {
1185						log_trace!(logger, "Persisting scorer");
1186					}
1187					let fut = async {
1188						if let Err(e) = kv_store
1189							.write(
1190								SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1191								SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1192								SCORER_PERSISTENCE_KEY,
1193								scorer.encode(),
1194							)
1195							.await
1196						{
1197							log_error!(
1198							logger,
1199							"Error: Failed to persist scorer, check your disk and permissions {}",
1200							e
1201						);
1202						}
1203
1204						Ok(())
1205					};
1206
1207					// TODO: Once our MSRV is 1.68 we should be able to drop the Box
1208					futures.set_c(Box::pin(fut));
1209				}
1210			},
1211			Some(true) => break,
1212			None => {},
1213		}
1214		match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) {
1215			Some(false) => {
1216				log_trace!(logger, "Regenerating sweeper spends if necessary");
1217				if let Some(ref sweeper) = sweeper {
1218					let fut = async {
1219						let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
1220
1221						Ok(())
1222					};
1223
1224					// TODO: Once our MSRV is 1.68 we should be able to drop the Box
1225					futures.set_d(Box::pin(fut));
1226				}
1227			},
1228			Some(true) => break,
1229			None => {},
1230		}
1231
1232		if let Some(liquidity_manager) = liquidity_manager.as_ref() {
1233			let fut = async {
1234				liquidity_manager
1235					.get_lm()
1236					.persist()
1237					.await
1238					.map(|did_persist| {
1239						if did_persist {
1240							log_trace!(logger, "Persisted LiquidityManager.");
1241						}
1242					})
1243					.map_err(|e| {
1244						log_error!(logger, "Persisting LiquidityManager failed: {}", e);
1245						e
1246					})
1247			};
1248			futures.set_e(Box::pin(fut));
1249		}
1250
1251		// Run persistence tasks in parallel and exit if any of them returns an error.
1252		for res in futures.await {
1253			res?;
1254		}
1255
1256		match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
1257			sleeper(ONION_MESSAGE_HANDLER_TIMER)
1258		}) {
1259			Some(false) => {
1260				if let Some(om) = &onion_messenger {
1261					log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
1262					om.get_om().timer_tick_occurred();
1263				}
1264			},
1265			Some(true) => break,
1266			None => {},
1267		}
1268
1269		// Peer manager timer tick. If we were interrupted on a mobile platform, we disconnect all peers.
1270		if await_slow {
1271			// On various platforms, we may be starved of CPU cycles for several reasons.
1272			// E.g. on iOS, if we've been in the background, we will be entirely paused.
1273			// Similarly, if we're on a desktop platform and the device has been asleep, we
1274			// may not get any cycles.
1275			// We detect this by checking if our max-100ms-sleep, above, ran longer than a
1276			// full second, at which point we assume sockets may have been killed (they
1277			// appear to be at least on some platforms, even if it has only been a second).
1278			// Note that we have to take care to not get here just because user event
1279			// processing was slow at the top of the loop. For example, the sample client
1280			// may call Bitcoin Core RPCs during event handling, which very often takes
1281			// more than a handful of seconds to complete, and shouldn't disconnect all our
1282			// peers.
1283			log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
1284			peer_manager.as_ref().disconnect_all_peers();
1285			last_ping_call = sleeper(PING_TIMER);
1286		} else {
1287			match check_and_reset_sleeper(&mut last_ping_call, || sleeper(PING_TIMER)) {
1288				Some(false) => {
1289					log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
1290					peer_manager.as_ref().timer_tick_occurred();
1291				},
1292				Some(true) => break,
1293				_ => {},
1294			}
1295		}
1296
1297		// Rebroadcast pending claims.
1298		match check_and_reset_sleeper(&mut last_rebroadcast_call, || sleeper(REBROADCAST_TIMER)) {
1299			Some(false) => {
1300				log_trace!(logger, "Rebroadcasting monitor's pending claims");
1301				chain_monitor.rebroadcast_pending_claims();
1302			},
1303			Some(true) => break,
1304			None => {},
1305		}
1306	}
1307	log_trace!(logger, "Terminating background processor.");
1308
1309	// After we exit, ensure we persist the ChannelManager one final time - this avoids
1310	// some races where users quit while channel updates were in-flight, with
1311	// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
1312	kv_store
1313		.write(
1314			CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1315			CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1316			CHANNEL_MANAGER_PERSISTENCE_KEY,
1317			channel_manager.get_cm().encode(),
1318		)
1319		.await?;
1320	if let Some(ref scorer) = scorer {
1321		kv_store
1322			.write(
1323				SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1324				SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1325				SCORER_PERSISTENCE_KEY,
1326				scorer.encode(),
1327			)
1328			.await?;
1329	}
1330	if let Some(network_graph) = gossip_sync.network_graph() {
1331		kv_store
1332			.write(
1333				NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1334				NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1335				NETWORK_GRAPH_PERSISTENCE_KEY,
1336				network_graph.encode(),
1337			)
1338			.await?;
1339	}
1340	Ok(())
1341}
1342
1343fn check_and_reset_sleeper<
1344	SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
1345>(
1346	fut: &mut SleepFuture, mut new_sleeper: impl FnMut() -> SleepFuture,
1347) -> Option<bool> {
1348	let mut waker = dummy_waker();
1349	let mut ctx = task::Context::from_waker(&mut waker);
1350	match core::pin::Pin::new(&mut *fut).poll(&mut ctx) {
1351		task::Poll::Ready(exit) => {
1352			*fut = new_sleeper();
1353			Some(exit)
1354		},
1355		task::Poll::Pending => None,
1356	}
1357}
1358
1359/// Async events processor that is based on [`process_events_async`] but allows for [`KVStoreSync`] to be used for
1360/// synchronous background persistence.
1361pub async fn process_events_async_with_kv_store_sync<
1362	UL: Deref,
1363	CF: Deref,
1364	T: Deref,
1365	F: Deref,
1366	G: Deref<Target = NetworkGraph<L>>,
1367	L: Deref,
1368	P: Deref,
1369	EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
1370	EventHandler: Fn(Event) -> EventHandlerFuture,
1371	ES: Deref,
1372	M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
1373	CM: Deref,
1374	OM: Deref,
1375	PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
1376	RGS: Deref<Target = RapidGossipSync<G, L>>,
1377	PM: Deref,
1378	LM: Deref,
1379	D: Deref,
1380	O: Deref,
1381	K: Deref,
1382	OS: Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>>,
1383	S: Deref<Target = SC>,
1384	SC: for<'b> WriteableScore<'b>,
1385	SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
1386	Sleeper: Fn(Duration) -> SleepFuture,
1387	FetchTime: Fn() -> Option<Duration>,
1388>(
1389	kv_store: K, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
1390	onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
1391	liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
1392	sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
1393) -> Result<(), lightning::io::Error>
1394where
1395	UL::Target: UtxoLookup,
1396	CF::Target: chain::Filter,
1397	T::Target: BroadcasterInterface,
1398	F::Target: FeeEstimator,
1399	L::Target: Logger,
1400	P::Target: Persist<<CM::Target as AChannelManager>::Signer>,
1401	ES::Target: EntropySource,
1402	CM::Target: AChannelManager,
1403	OM::Target: AOnionMessenger,
1404	PM::Target: APeerManager,
1405	LM::Target: ALiquidityManager,
1406	O::Target: OutputSpender,
1407	D::Target: ChangeDestinationSourceSync,
1408	K::Target: KVStoreSync,
1409{
1410	let kv_store = KVStoreSyncWrapper(kv_store);
1411	process_events_async(
1412		kv_store,
1413		event_handler,
1414		chain_monitor,
1415		channel_manager,
1416		onion_messenger,
1417		gossip_sync,
1418		peer_manager,
1419		liquidity_manager,
1420		sweeper.as_ref().map(|os| os.sweeper_async()),
1421		logger,
1422		scorer,
1423		sleeper,
1424		mobile_interruptable_platform,
1425		fetch_time,
1426	)
1427	.await
1428}
1429
1430#[cfg(feature = "std")]
1431impl BackgroundProcessor {
1432	/// Start a background thread that takes care of responsibilities enumerated in the [top-level
1433	/// documentation].
1434	///
1435	/// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
1436	/// [`KVStoreSync`] returns an error. In case of an error, the error is retrieved by calling
1437	/// either [`join`] or [`stop`].
1438	///
1439	/// # Data Persistence
1440	///
1441	/// [`KVStoreSync`] is responsible for writing out the [`ChannelManager`] to disk, and/or
1442	/// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
1443	/// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
1444	/// provided implementation.
1445	///
1446	/// [`KVStoreSync`] is also responsible for writing out the [`NetworkGraph`] to disk, if
1447	/// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
1448	/// See the `lightning-persister` crate for LDK's provided implementation.
1449	///
1450	/// Typically, users should either implement [`KVStoreSync`] to never return an
1451	/// error or call [`join`] and handle any error that may arise. For the latter case,
1452	/// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
1453	///
1454	/// # Event Handling
1455	///
1456	/// `event_handler` is responsible for handling events that users should be notified of (e.g.,
1457	/// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
1458	/// functionality implemented by other handlers.
1459	/// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
1460	///
1461	/// # Rapid Gossip Sync
1462	///
1463	/// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
1464	/// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
1465	/// until the [`RapidGossipSync`] instance completes its first sync.
1466	///
1467	/// [top-level documentation]: BackgroundProcessor
1468	/// [`join`]: Self::join
1469	/// [`stop`]: Self::stop
1470	/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
1471	/// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
1472	/// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
1473	/// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
1474	pub fn start<
1475		'a,
1476		UL: 'static + Deref,
1477		CF: 'static + Deref,
1478		T: 'static + Deref,
1479		F: 'static + Deref + Send,
1480		G: 'static + Deref<Target = NetworkGraph<L>>,
1481		L: 'static + Deref + Send,
1482		P: 'static + Deref,
1483		EH: 'static + EventHandler + Send,
1484		ES: 'static + Deref + Send,
1485		M: 'static
1486			+ Deref<
1487				Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
1488			>
1489			+ Send
1490			+ Sync,
1491		CM: 'static + Deref + Send,
1492		OM: 'static + Deref + Send,
1493		PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send,
1494		RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
1495		PM: 'static + Deref + Send,
1496		LM: 'static + Deref + Send,
1497		S: 'static + Deref<Target = SC> + Send + Sync,
1498		SC: for<'b> WriteableScore<'b>,
1499		D: 'static + Deref,
1500		O: 'static + Deref,
1501		K: 'static + Deref + Send,
1502		OS: 'static + Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>> + Send,
1503	>(
1504		kv_store: K, event_handler: EH, chain_monitor: M, channel_manager: CM,
1505		onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
1506		liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
1507	) -> Self
1508	where
1509		UL::Target: 'static + UtxoLookup,
1510		CF::Target: 'static + chain::Filter,
1511		T::Target: 'static + BroadcasterInterface,
1512		F::Target: 'static + FeeEstimator,
1513		L::Target: 'static + Logger,
1514		P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1515		ES::Target: 'static + EntropySource,
1516		CM::Target: AChannelManager,
1517		OM::Target: AOnionMessenger,
1518		PM::Target: APeerManager,
1519		LM::Target: ALiquidityManagerSync,
1520		D::Target: ChangeDestinationSourceSync,
1521		O::Target: 'static + OutputSpender,
1522		K::Target: 'static + KVStoreSync,
1523	{
1524		let stop_thread = Arc::new(AtomicBool::new(false));
1525		let stop_thread_clone = Arc::clone(&stop_thread);
1526		let handle = thread::spawn(move || -> Result<(), std::io::Error> {
1527			let event_handler = |event| {
1528				let network_graph = gossip_sync.network_graph();
1529				if let Some(network_graph) = network_graph {
1530					handle_network_graph_update(network_graph, &event)
1531				}
1532				if let Some(ref scorer) = scorer {
1533					use std::time::SystemTime;
1534					let duration_since_epoch = SystemTime::now()
1535						.duration_since(SystemTime::UNIX_EPOCH)
1536						.expect("Time should be sometime after 1970");
1537					if update_scorer(scorer, &event, duration_since_epoch) {
1538						log_trace!(logger, "Persisting scorer after update");
1539						if let Err(e) = kv_store.write(
1540							SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1541							SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1542							SCORER_PERSISTENCE_KEY,
1543							scorer.encode(),
1544						) {
1545							log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
1546						}
1547					}
1548				}
1549				event_handler.handle_event(event)
1550			};
1551			let mut batch_delay = BatchDelay::new();
1552
1553			log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
1554			channel_manager.get_cm().timer_tick_occurred();
1555			log_trace!(logger, "Rebroadcasting monitor's pending claims on startup");
1556			chain_monitor.rebroadcast_pending_claims();
1557
1558			let mut last_freshness_call = Instant::now();
1559			let mut last_onion_message_handler_call = Instant::now();
1560			let mut last_ping_call = Instant::now();
1561			let mut last_prune_call = Instant::now();
1562			let mut last_scorer_persist_call = Instant::now();
1563			let mut last_rebroadcast_call = Instant::now();
1564			let mut last_sweeper_call = Instant::now();
1565			let mut have_pruned = false;
1566			let mut have_decayed_scorer = false;
1567
1568			let mut cur_batch_delay = batch_delay.get();
1569			let mut last_forwards_processing_call = Instant::now();
1570
1571			loop {
1572				channel_manager.get_cm().process_pending_events(&event_handler);
1573				chain_monitor.process_pending_events(&event_handler);
1574				if let Some(om) = &onion_messenger {
1575					om.get_om().process_pending_events(&event_handler)
1576				};
1577
1578				// Note that the PeerManager::process_events may block on ChannelManager's locks,
1579				// hence it comes last here. When the ChannelManager finishes whatever it's doing,
1580				// we want to ensure we get into `persist_manager` as quickly as we can, especially
1581				// without running the normal event processing above and handing events to users.
1582				//
1583				// Specifically, on an *extremely* slow machine, we may see ChannelManager start
1584				// processing a message effectively at any point during this loop. In order to
1585				// minimize the time between such processing completing and persisting the updated
1586				// ChannelManager, we want to minimize methods blocking on a ChannelManager
1587				// generally, and as a fallback place such blocking only immediately before
1588				// persistence.
1589				peer_manager.as_ref().process_events();
1590				if last_forwards_processing_call.elapsed() > cur_batch_delay {
1591					channel_manager.get_cm().process_pending_htlc_forwards();
1592					cur_batch_delay = batch_delay.next();
1593					last_forwards_processing_call = Instant::now();
1594				}
1595				if stop_thread.load(Ordering::Acquire) {
1596					log_trace!(logger, "Terminating background processor.");
1597					break;
1598				}
1599				let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) {
1600					(Some(om), Some(lm)) => Sleeper::from_four_futures(
1601						&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1602						&chain_monitor.get_update_future(),
1603						&om.get_om().get_update_future(),
1604						&lm.get_lm().get_pending_msgs_or_needs_persist_future(),
1605					),
1606					(Some(om), None) => Sleeper::from_three_futures(
1607						&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1608						&chain_monitor.get_update_future(),
1609						&om.get_om().get_update_future(),
1610					),
1611					(None, Some(lm)) => Sleeper::from_three_futures(
1612						&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1613						&chain_monitor.get_update_future(),
1614						&lm.get_lm().get_pending_msgs_or_needs_persist_future(),
1615					),
1616					(None, None) => Sleeper::from_two_futures(
1617						&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1618						&chain_monitor.get_update_future(),
1619					),
1620				};
1621				let batch_delay = if channel_manager.get_cm().needs_pending_htlc_processing() {
1622					batch_delay.get()
1623				} else {
1624					Duration::MAX
1625				};
1626				let fastest_timeout = batch_delay.min(Duration::from_millis(100));
1627				sleeper.wait_timeout(fastest_timeout);
1628				if stop_thread.load(Ordering::Acquire) {
1629					log_trace!(logger, "Terminating background processor.");
1630					break;
1631				}
1632				if last_freshness_call.elapsed() > FRESHNESS_TIMER {
1633					log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
1634					channel_manager.get_cm().timer_tick_occurred();
1635					last_freshness_call = Instant::now();
1636				}
1637				if channel_manager.get_cm().get_and_clear_needs_persistence() {
1638					log_trace!(logger, "Persisting ChannelManager...");
1639					(kv_store.write(
1640						CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1641						CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1642						CHANNEL_MANAGER_PERSISTENCE_KEY,
1643						channel_manager.get_cm().encode(),
1644					))?;
1645					log_trace!(logger, "Done persisting ChannelManager.");
1646				}
1647
1648				if let Some(liquidity_manager) = liquidity_manager.as_ref() {
1649					log_trace!(logger, "Persisting LiquidityManager...");
1650					let _ = liquidity_manager.get_lm().persist().map_err(|e| {
1651						log_error!(logger, "Persisting LiquidityManager failed: {}", e);
1652					});
1653				}
1654
1655				// Note that we want to run a graph prune once not long after startup before
1656				// falling back to our usual hourly prunes. This avoids short-lived clients never
1657				// pruning their network graph. We run once 60 seconds after startup before
1658				// continuing our normal cadence. For RGS, since 60 seconds is likely too long,
1659				// we prune after an initial sync completes.
1660				let prune_timer =
1661					if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
1662				let prune_timer_elapsed = last_prune_call.elapsed() > prune_timer;
1663				let should_prune = match gossip_sync {
1664					GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
1665					_ => prune_timer_elapsed,
1666				};
1667				if should_prune {
1668					// The network graph must not be pruned while rapid sync completion is pending
1669					if let Some(network_graph) = gossip_sync.prunable_network_graph() {
1670						let duration_since_epoch = std::time::SystemTime::now()
1671							.duration_since(std::time::SystemTime::UNIX_EPOCH)
1672							.expect("Time should be sometime after 1970");
1673
1674						log_trace!(logger, "Pruning and persisting network graph.");
1675						network_graph.remove_stale_channels_and_tracking_with_time(
1676							duration_since_epoch.as_secs(),
1677						);
1678						if let Err(e) = kv_store.write(
1679							NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1680							NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1681							NETWORK_GRAPH_PERSISTENCE_KEY,
1682							network_graph.encode(),
1683						) {
1684							log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e);
1685						}
1686						have_pruned = true;
1687					}
1688					last_prune_call = Instant::now();
1689				}
1690				if !have_decayed_scorer {
1691					if let Some(ref scorer) = scorer {
1692						let duration_since_epoch = std::time::SystemTime::now()
1693							.duration_since(std::time::SystemTime::UNIX_EPOCH)
1694							.expect("Time should be sometime after 1970");
1695						log_trace!(logger, "Calling time_passed on scorer at startup");
1696						scorer.write_lock().time_passed(duration_since_epoch);
1697					}
1698					have_decayed_scorer = true;
1699				}
1700				if last_scorer_persist_call.elapsed() > SCORER_PERSIST_TIMER {
1701					if let Some(ref scorer) = scorer {
1702						let duration_since_epoch = std::time::SystemTime::now()
1703							.duration_since(std::time::SystemTime::UNIX_EPOCH)
1704							.expect("Time should be sometime after 1970");
1705						log_trace!(logger, "Calling time_passed and persisting scorer");
1706						scorer.write_lock().time_passed(duration_since_epoch);
1707						if let Err(e) = kv_store.write(
1708							SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1709							SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1710							SCORER_PERSISTENCE_KEY,
1711							scorer.encode(),
1712						) {
1713							log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
1714						}
1715					}
1716					last_scorer_persist_call = Instant::now();
1717				}
1718				if last_sweeper_call.elapsed() > SWEEPER_TIMER {
1719					log_trace!(logger, "Regenerating sweeper spends if necessary");
1720					if let Some(ref sweeper) = sweeper {
1721						let _ = sweeper.regenerate_and_broadcast_spend_if_necessary();
1722					}
1723					last_sweeper_call = Instant::now();
1724				}
1725				if last_onion_message_handler_call.elapsed() > ONION_MESSAGE_HANDLER_TIMER {
1726					if let Some(om) = &onion_messenger {
1727						log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
1728						om.get_om().timer_tick_occurred();
1729					}
1730					last_onion_message_handler_call = Instant::now();
1731				}
1732				if last_ping_call.elapsed() > PING_TIMER {
1733					log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
1734					peer_manager.as_ref().timer_tick_occurred();
1735					last_ping_call = Instant::now();
1736				}
1737				if last_rebroadcast_call.elapsed() > REBROADCAST_TIMER {
1738					log_trace!(logger, "Rebroadcasting monitor's pending claims");
1739					chain_monitor.rebroadcast_pending_claims();
1740					last_rebroadcast_call = Instant::now();
1741				}
1742			}
1743
1744			// After we exit, ensure we persist the ChannelManager one final time - this avoids
1745			// some races where users quit while channel updates were in-flight, with
1746			// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
1747			kv_store.write(
1748				CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1749				CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1750				CHANNEL_MANAGER_PERSISTENCE_KEY,
1751				channel_manager.get_cm().encode(),
1752			)?;
1753			if let Some(ref scorer) = scorer {
1754				kv_store.write(
1755					SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1756					SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1757					SCORER_PERSISTENCE_KEY,
1758					scorer.encode(),
1759				)?;
1760			}
1761			if let Some(network_graph) = gossip_sync.network_graph() {
1762				kv_store.write(
1763					NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1764					NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1765					NETWORK_GRAPH_PERSISTENCE_KEY,
1766					network_graph.encode(),
1767				)?;
1768			}
1769			Ok(())
1770		});
1771		Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
1772	}
1773
1774	/// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
1775	/// [`ChannelManager`].
1776	///
1777	/// # Panics
1778	///
1779	/// This function panics if the background thread has panicked such as while persisting or
1780	/// handling events.
1781	///
1782	/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
1783	pub fn join(mut self) -> Result<(), std::io::Error> {
1784		assert!(self.thread_handle.is_some());
1785		self.join_thread()
1786	}
1787
1788	/// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
1789	/// [`ChannelManager`].
1790	///
1791	/// # Panics
1792	///
1793	/// This function panics if the background thread has panicked such as while persisting or
1794	/// handling events.
1795	///
1796	/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
1797	pub fn stop(mut self) -> Result<(), std::io::Error> {
1798		assert!(self.thread_handle.is_some());
1799		self.stop_and_join_thread()
1800	}
1801
1802	fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
1803		self.stop_thread.store(true, Ordering::Release);
1804		self.join_thread()
1805	}
1806
1807	fn join_thread(&mut self) -> Result<(), std::io::Error> {
1808		match self.thread_handle.take() {
1809			Some(handle) => handle.join().unwrap(),
1810			None => Ok(()),
1811		}
1812	}
1813}
1814
1815#[cfg(feature = "std")]
1816impl Drop for BackgroundProcessor {
1817	fn drop(&mut self) {
1818		self.stop_and_join_thread().unwrap();
1819	}
1820}
1821
1822#[cfg(all(feature = "std", test))]
1823mod tests {
1824	use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
1825	use bitcoin::constants::{genesis_block, ChainHash};
1826	use bitcoin::hashes::Hash;
1827	use bitcoin::locktime::absolute::LockTime;
1828	use bitcoin::network::Network;
1829	use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
1830	use bitcoin::transaction::Version;
1831	use bitcoin::transaction::{Transaction, TxOut};
1832	use bitcoin::{Amount, ScriptBuf, Txid};
1833	use core::sync::atomic::{AtomicBool, Ordering};
1834	use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
1835	use lightning::chain::transaction::OutPoint;
1836	use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1837	use lightning::events::{Event, PathFailure, ReplayEvent};
1838	use lightning::ln::channelmanager;
1839	use lightning::ln::channelmanager::{
1840		ChainParameters, PaymentId, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA,
1841	};
1842	use lightning::ln::functional_test_utils::*;
1843	use lightning::ln::msgs::{BaseMessageHandler, ChannelMessageHandler, Init, MessageSendEvent};
1844	use lightning::ln::peer_handler::{
1845		IgnoringMessageHandler, MessageHandler, PeerManager, SocketDescriptor,
1846	};
1847	use lightning::ln::types::ChannelId;
1848	use lightning::onion_message::messenger::{DefaultMessageRouter, OnionMessenger};
1849	use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
1850	use lightning::routing::router::{CandidateRouteHop, DefaultRouter, Path, RouteHop};
1851	use lightning::routing::scoring::{ChannelUsage, LockableScore, ScoreLookUp, ScoreUpdate};
1852	use lightning::sign::{ChangeDestinationSourceSync, InMemorySigner, KeysManager, NodeSigner};
1853	use lightning::types::features::{ChannelFeatures, NodeFeatures};
1854	use lightning::types::payment::PaymentHash;
1855	use lightning::util::config::UserConfig;
1856	use lightning::util::persist::{
1857		KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
1858		CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1859		CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
1860		NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1861		SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1862		SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1863	};
1864	use lightning::util::ser::Writeable;
1865	use lightning::util::sweep::{
1866		OutputSpendStatus, OutputSweeper, OutputSweeperSync, PRUNE_DELAY_BLOCKS,
1867	};
1868	use lightning::util::test_utils;
1869	use lightning::{get_event, get_event_msg};
1870	use lightning_liquidity::utils::time::DefaultTimeProvider;
1871	use lightning_liquidity::{ALiquidityManagerSync, LiquidityManager, LiquidityManagerSync};
1872	use lightning_persister::fs_store::FilesystemStore;
1873	use lightning_rapid_gossip_sync::RapidGossipSync;
1874	use std::collections::VecDeque;
1875	use std::path::PathBuf;
1876	use std::sync::mpsc::SyncSender;
1877	use std::sync::Arc;
1878	use std::time::Duration;
1879	use std::{env, fs};
1880
1881	const EVENT_DEADLINE: Duration =
1882		Duration::from_millis(5 * (FRESHNESS_TIMER.as_millis() as u64));
1883
1884	#[derive(Clone, Hash, PartialEq, Eq)]
1885	struct TestDescriptor {}
1886	impl SocketDescriptor for TestDescriptor {
1887		fn send_data(&mut self, _data: &[u8], _continue_read: bool) -> usize {
1888			0
1889		}
1890
1891		fn disconnect_socket(&mut self) {}
1892	}
1893
1894	#[cfg(c_bindings)]
1895	type LockingWrapper<T> = lightning::routing::scoring::MultiThreadedLockableScore<T>;
1896	#[cfg(not(c_bindings))]
1897	type LockingWrapper<T> = std::sync::Mutex<T>;
1898
1899	type ChannelManager = channelmanager::ChannelManager<
1900		Arc<ChainMonitor>,
1901		Arc<test_utils::TestBroadcaster>,
1902		Arc<KeysManager>,
1903		Arc<KeysManager>,
1904		Arc<KeysManager>,
1905		Arc<test_utils::TestFeeEstimator>,
1906		Arc<
1907			DefaultRouter<
1908				Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1909				Arc<test_utils::TestLogger>,
1910				Arc<KeysManager>,
1911				Arc<LockingWrapper<TestScorer>>,
1912				(),
1913				TestScorer,
1914			>,
1915		>,
1916		Arc<
1917			DefaultMessageRouter<
1918				Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1919				Arc<test_utils::TestLogger>,
1920				Arc<KeysManager>,
1921			>,
1922		>,
1923		Arc<test_utils::TestLogger>,
1924	>;
1925
1926	type ChainMonitor = chainmonitor::ChainMonitor<
1927		InMemorySigner,
1928		Arc<test_utils::TestChainSource>,
1929		Arc<test_utils::TestBroadcaster>,
1930		Arc<test_utils::TestFeeEstimator>,
1931		Arc<test_utils::TestLogger>,
1932		Arc<Persister>,
1933		Arc<KeysManager>,
1934	>;
1935
1936	type PGS = Arc<
1937		P2PGossipSync<
1938			Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1939			Arc<test_utils::TestChainSource>,
1940			Arc<test_utils::TestLogger>,
1941		>,
1942	>;
1943	type RGS = Arc<
1944		RapidGossipSync<
1945			Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1946			Arc<test_utils::TestLogger>,
1947		>,
1948	>;
1949
1950	type OM = OnionMessenger<
1951		Arc<KeysManager>,
1952		Arc<KeysManager>,
1953		Arc<test_utils::TestLogger>,
1954		Arc<ChannelManager>,
1955		Arc<
1956			DefaultMessageRouter<
1957				Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1958				Arc<test_utils::TestLogger>,
1959				Arc<KeysManager>,
1960			>,
1961		>,
1962		IgnoringMessageHandler,
1963		Arc<ChannelManager>,
1964		IgnoringMessageHandler,
1965		IgnoringMessageHandler,
1966	>;
1967
1968	type LM = LiquidityManagerSync<
1969		Arc<KeysManager>,
1970		Arc<KeysManager>,
1971		Arc<ChannelManager>,
1972		Arc<dyn Filter + Sync + Send>,
1973		Arc<Persister>,
1974		DefaultTimeProvider,
1975		Arc<test_utils::TestBroadcaster>,
1976	>;
1977
1978	struct Node {
1979		node: Arc<ChannelManager>,
1980		messenger: Arc<OM>,
1981		p2p_gossip_sync: PGS,
1982		rapid_gossip_sync: RGS,
1983		peer_manager: Arc<
1984			PeerManager<
1985				TestDescriptor,
1986				Arc<test_utils::TestChannelMessageHandler>,
1987				Arc<test_utils::TestRoutingMessageHandler>,
1988				Arc<OM>,
1989				Arc<test_utils::TestLogger>,
1990				IgnoringMessageHandler,
1991				Arc<KeysManager>,
1992				IgnoringMessageHandler,
1993			>,
1994		>,
1995		liquidity_manager: Arc<LM>,
1996		chain_monitor: Arc<ChainMonitor>,
1997		kv_store: Arc<Persister>,
1998		tx_broadcaster: Arc<test_utils::TestBroadcaster>,
1999		network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
2000		logger: Arc<test_utils::TestLogger>,
2001		best_block: BestBlock,
2002		scorer: Arc<LockingWrapper<TestScorer>>,
2003		sweeper: Arc<
2004			OutputSweeperSync<
2005				Arc<test_utils::TestBroadcaster>,
2006				Arc<TestWallet>,
2007				Arc<test_utils::TestFeeEstimator>,
2008				Arc<test_utils::TestChainSource>,
2009				Arc<Persister>,
2010				Arc<test_utils::TestLogger>,
2011				Arc<KeysManager>,
2012			>,
2013		>,
2014	}
2015
2016	impl Node {
2017		fn p2p_gossip_sync(
2018			&self,
2019		) -> GossipSync<
2020			PGS,
2021			RGS,
2022			Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
2023			Arc<test_utils::TestChainSource>,
2024			Arc<test_utils::TestLogger>,
2025		> {
2026			GossipSync::P2P(Arc::clone(&self.p2p_gossip_sync))
2027		}
2028
2029		fn rapid_gossip_sync(
2030			&self,
2031		) -> GossipSync<
2032			PGS,
2033			RGS,
2034			Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
2035			Arc<test_utils::TestChainSource>,
2036			Arc<test_utils::TestLogger>,
2037		> {
2038			GossipSync::Rapid(Arc::clone(&self.rapid_gossip_sync))
2039		}
2040
2041		fn no_gossip_sync(
2042			&self,
2043		) -> GossipSync<
2044			PGS,
2045			RGS,
2046			Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
2047			Arc<test_utils::TestChainSource>,
2048			Arc<test_utils::TestLogger>,
2049		> {
2050			GossipSync::None
2051		}
2052	}
2053
2054	impl Drop for Node {
2055		fn drop(&mut self) {
2056			let data_dir = self.kv_store.get_data_dir();
2057			match fs::remove_dir_all(data_dir.clone()) {
2058				Err(e) => {
2059					println!("Failed to remove test store directory {}: {}", data_dir.display(), e)
2060				},
2061				_ => {},
2062			}
2063		}
2064	}
2065
2066	struct Persister {
2067		graph_error: Option<(std::io::ErrorKind, &'static str)>,
2068		graph_persistence_notifier: Option<SyncSender<()>>,
2069		manager_error: Option<(std::io::ErrorKind, &'static str)>,
2070		scorer_error: Option<(std::io::ErrorKind, &'static str)>,
2071		kv_store: FilesystemStore,
2072	}
2073
2074	impl Persister {
2075		fn new(data_dir: PathBuf) -> Self {
2076			let kv_store = FilesystemStore::new(data_dir);
2077			Self {
2078				graph_error: None,
2079				graph_persistence_notifier: None,
2080				manager_error: None,
2081				scorer_error: None,
2082				kv_store,
2083			}
2084		}
2085
2086		fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
2087			Self { graph_error: Some((error, message)), ..self }
2088		}
2089
2090		fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
2091			Self { graph_persistence_notifier: Some(sender), ..self }
2092		}
2093
2094		fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
2095			Self { manager_error: Some((error, message)), ..self }
2096		}
2097
2098		fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
2099			Self { scorer_error: Some((error, message)), ..self }
2100		}
2101
2102		pub fn get_data_dir(&self) -> PathBuf {
2103			self.kv_store.get_data_dir()
2104		}
2105	}
2106
2107	impl KVStoreSync for Persister {
2108		fn read(
2109			&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
2110		) -> lightning::io::Result<Vec<u8>> {
2111			self.kv_store.read(primary_namespace, secondary_namespace, key)
2112		}
2113
2114		fn write(
2115			&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
2116		) -> lightning::io::Result<()> {
2117			if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE
2118				&& secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE
2119				&& key == CHANNEL_MANAGER_PERSISTENCE_KEY
2120			{
2121				if let Some((error, message)) = self.manager_error {
2122					return Err(std::io::Error::new(error, message).into());
2123				}
2124			}
2125
2126			if primary_namespace == NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE
2127				&& secondary_namespace == NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE
2128				&& key == NETWORK_GRAPH_PERSISTENCE_KEY
2129			{
2130				if let Some(sender) = &self.graph_persistence_notifier {
2131					match sender.send(()) {
2132						Ok(()) => {},
2133						Err(std::sync::mpsc::SendError(())) => {
2134							println!("Persister failed to notify as receiver went away.")
2135						},
2136					}
2137				};
2138
2139				if let Some((error, message)) = self.graph_error {
2140					return Err(std::io::Error::new(error, message).into());
2141				}
2142			}
2143
2144			if primary_namespace == SCORER_PERSISTENCE_PRIMARY_NAMESPACE
2145				&& secondary_namespace == SCORER_PERSISTENCE_SECONDARY_NAMESPACE
2146				&& key == SCORER_PERSISTENCE_KEY
2147			{
2148				if let Some((error, message)) = self.scorer_error {
2149					return Err(std::io::Error::new(error, message).into());
2150				}
2151			}
2152
2153			self.kv_store.write(primary_namespace, secondary_namespace, key, buf)
2154		}
2155
2156		fn remove(
2157			&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
2158		) -> lightning::io::Result<()> {
2159			self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy)
2160		}
2161
2162		fn list(
2163			&self, primary_namespace: &str, secondary_namespace: &str,
2164		) -> lightning::io::Result<Vec<String>> {
2165			self.kv_store.list(primary_namespace, secondary_namespace)
2166		}
2167	}
2168
2169	struct TestScorer {
2170		event_expectations: Option<VecDeque<TestResult>>,
2171	}
2172
2173	#[derive(Debug)]
2174	enum TestResult {
2175		PaymentFailure { path: Path, short_channel_id: u64 },
2176		PaymentSuccess { path: Path },
2177		ProbeFailure { path: Path },
2178		ProbeSuccess { path: Path },
2179	}
2180
2181	impl TestScorer {
2182		fn new() -> Self {
2183			Self { event_expectations: None }
2184		}
2185
2186		fn expect(&mut self, expectation: TestResult) {
2187			self.event_expectations.get_or_insert_with(VecDeque::new).push_back(expectation);
2188		}
2189	}
2190
2191	impl lightning::util::ser::Writeable for TestScorer {
2192		fn write<W: lightning::util::ser::Writer>(
2193			&self, _: &mut W,
2194		) -> Result<(), lightning::io::Error> {
2195			Ok(())
2196		}
2197	}
2198
2199	impl ScoreLookUp for TestScorer {
2200		type ScoreParams = ();
2201		fn channel_penalty_msat(
2202			&self, _candidate: &CandidateRouteHop, _usage: ChannelUsage,
2203			_score_params: &Self::ScoreParams,
2204		) -> u64 {
2205			unimplemented!();
2206		}
2207	}
2208
2209	impl ScoreUpdate for TestScorer {
2210		fn payment_path_failed(
2211			&mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration,
2212		) {
2213			if let Some(expectations) = &mut self.event_expectations {
2214				match expectations.pop_front().unwrap() {
2215					TestResult::PaymentFailure { path, short_channel_id } => {
2216						assert_eq!(actual_path, &path);
2217						assert_eq!(actual_short_channel_id, short_channel_id);
2218					},
2219					TestResult::PaymentSuccess { path } => {
2220						panic!("Unexpected successful payment path: {:?}", path)
2221					},
2222					TestResult::ProbeFailure { path } => {
2223						panic!("Unexpected probe failure: {:?}", path)
2224					},
2225					TestResult::ProbeSuccess { path } => {
2226						panic!("Unexpected probe success: {:?}", path)
2227					},
2228				}
2229			}
2230		}
2231
2232		fn payment_path_successful(&mut self, actual_path: &Path, _: Duration) {
2233			if let Some(expectations) = &mut self.event_expectations {
2234				match expectations.pop_front().unwrap() {
2235					TestResult::PaymentFailure { path, .. } => {
2236						panic!("Unexpected payment path failure: {:?}", path)
2237					},
2238					TestResult::PaymentSuccess { path } => {
2239						assert_eq!(actual_path, &path);
2240					},
2241					TestResult::ProbeFailure { path } => {
2242						panic!("Unexpected probe failure: {:?}", path)
2243					},
2244					TestResult::ProbeSuccess { path } => {
2245						panic!("Unexpected probe success: {:?}", path)
2246					},
2247				}
2248			}
2249		}
2250
2251		fn probe_failed(&mut self, actual_path: &Path, _: u64, _: Duration) {
2252			if let Some(expectations) = &mut self.event_expectations {
2253				match expectations.pop_front().unwrap() {
2254					TestResult::PaymentFailure { path, .. } => {
2255						panic!("Unexpected payment path failure: {:?}", path)
2256					},
2257					TestResult::PaymentSuccess { path } => {
2258						panic!("Unexpected payment path success: {:?}", path)
2259					},
2260					TestResult::ProbeFailure { path } => {
2261						assert_eq!(actual_path, &path);
2262					},
2263					TestResult::ProbeSuccess { path } => {
2264						panic!("Unexpected probe success: {:?}", path)
2265					},
2266				}
2267			}
2268		}
2269		fn probe_successful(&mut self, actual_path: &Path, _: Duration) {
2270			if let Some(expectations) = &mut self.event_expectations {
2271				match expectations.pop_front().unwrap() {
2272					TestResult::PaymentFailure { path, .. } => {
2273						panic!("Unexpected payment path failure: {:?}", path)
2274					},
2275					TestResult::PaymentSuccess { path } => {
2276						panic!("Unexpected payment path success: {:?}", path)
2277					},
2278					TestResult::ProbeFailure { path } => {
2279						panic!("Unexpected probe failure: {:?}", path)
2280					},
2281					TestResult::ProbeSuccess { path } => {
2282						assert_eq!(actual_path, &path);
2283					},
2284				}
2285			}
2286		}
2287		fn time_passed(&mut self, _: Duration) {}
2288	}
2289
2290	#[cfg(c_bindings)]
2291	impl lightning::routing::scoring::Score for TestScorer {}
2292
2293	impl Drop for TestScorer {
2294		fn drop(&mut self) {
2295			if std::thread::panicking() {
2296				return;
2297			}
2298
2299			if let Some(event_expectations) = &self.event_expectations {
2300				if !event_expectations.is_empty() {
2301					panic!("Unsatisfied event expectations: {:?}", event_expectations);
2302				}
2303			}
2304		}
2305	}
2306
2307	struct TestWallet {}
2308
2309	impl ChangeDestinationSourceSync for TestWallet {
2310		fn get_change_destination_script(&self) -> Result<ScriptBuf, ()> {
2311			Ok(ScriptBuf::new())
2312		}
2313	}
2314
2315	fn get_full_filepath(filepath: String, filename: String) -> String {
2316		let mut path = PathBuf::from(filepath);
2317		path.push(filename);
2318		path.to_str().unwrap().to_string()
2319	}
2320
2321	fn create_nodes(num_nodes: usize, persist_dir: &str) -> (String, Vec<Node>) {
2322		let persist_temp_path = env::temp_dir().join(persist_dir);
2323		let persist_dir = persist_temp_path.to_string_lossy().to_string();
2324		let network = Network::Bitcoin;
2325		let mut nodes = Vec::new();
2326		for i in 0..num_nodes {
2327			let tx_broadcaster = Arc::new(test_utils::TestBroadcaster::new(network));
2328			let fee_estimator = Arc::new(test_utils::TestFeeEstimator::new(253));
2329			let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
2330			let genesis_block = genesis_block(network);
2331			let network_graph = Arc::new(NetworkGraph::new(network, Arc::clone(&logger)));
2332			let scorer = Arc::new(LockingWrapper::new(TestScorer::new()));
2333			let now = Duration::from_secs(genesis_block.header.time as u64);
2334			let seed = [i as u8; 32];
2335			let keys_manager =
2336				Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true));
2337			let router = Arc::new(DefaultRouter::new(
2338				Arc::clone(&network_graph),
2339				Arc::clone(&logger),
2340				Arc::clone(&keys_manager),
2341				Arc::clone(&scorer),
2342				Default::default(),
2343			));
2344			let msg_router = Arc::new(DefaultMessageRouter::new(
2345				Arc::clone(&network_graph),
2346				Arc::clone(&keys_manager),
2347			));
2348			let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
2349			let kv_store =
2350				Arc::new(Persister::new(format!("{}_persister_{}", &persist_dir, i).into()));
2351			let now = Duration::from_secs(genesis_block.header.time as u64);
2352			let keys_manager =
2353				Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true));
2354			let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(
2355				Some(Arc::clone(&chain_source)),
2356				Arc::clone(&tx_broadcaster),
2357				Arc::clone(&logger),
2358				Arc::clone(&fee_estimator),
2359				Arc::clone(&kv_store),
2360				Arc::clone(&keys_manager),
2361				keys_manager.get_peer_storage_key(),
2362			));
2363			let best_block = BestBlock::from_network(network);
2364			let params = ChainParameters { network, best_block };
2365			let manager = Arc::new(ChannelManager::new(
2366				Arc::clone(&fee_estimator),
2367				Arc::clone(&chain_monitor),
2368				Arc::clone(&tx_broadcaster),
2369				Arc::clone(&router),
2370				Arc::clone(&msg_router),
2371				Arc::clone(&logger),
2372				Arc::clone(&keys_manager),
2373				Arc::clone(&keys_manager),
2374				Arc::clone(&keys_manager),
2375				UserConfig::default(),
2376				params,
2377				genesis_block.header.time,
2378			));
2379			let messenger = Arc::new(OnionMessenger::new(
2380				Arc::clone(&keys_manager),
2381				Arc::clone(&keys_manager),
2382				Arc::clone(&logger),
2383				Arc::clone(&manager),
2384				Arc::clone(&msg_router),
2385				IgnoringMessageHandler {},
2386				Arc::clone(&manager),
2387				IgnoringMessageHandler {},
2388				IgnoringMessageHandler {},
2389			));
2390			let wallet = Arc::new(TestWallet {});
2391			let sweeper = Arc::new(OutputSweeperSync::new(
2392				best_block,
2393				Arc::clone(&tx_broadcaster),
2394				Arc::clone(&fee_estimator),
2395				None::<Arc<test_utils::TestChainSource>>,
2396				Arc::clone(&keys_manager),
2397				wallet,
2398				Arc::clone(&kv_store),
2399				Arc::clone(&logger),
2400			));
2401			let p2p_gossip_sync = Arc::new(P2PGossipSync::new(
2402				Arc::clone(&network_graph),
2403				Some(Arc::clone(&chain_source)),
2404				Arc::clone(&logger),
2405			));
2406			let rapid_gossip_sync =
2407				Arc::new(RapidGossipSync::new(Arc::clone(&network_graph), Arc::clone(&logger)));
2408			let msg_handler = MessageHandler {
2409				chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(
2410					ChainHash::using_genesis_block(Network::Testnet),
2411				)),
2412				route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()),
2413				onion_message_handler: Arc::clone(&messenger),
2414				custom_message_handler: IgnoringMessageHandler {},
2415				send_only_message_handler: IgnoringMessageHandler {},
2416			};
2417			let peer_manager = Arc::new(PeerManager::new(
2418				msg_handler,
2419				0,
2420				&seed,
2421				Arc::clone(&logger),
2422				Arc::clone(&keys_manager),
2423			));
2424			let liquidity_manager = Arc::new(
2425				LiquidityManagerSync::new(
2426					Arc::clone(&keys_manager),
2427					Arc::clone(&keys_manager),
2428					Arc::clone(&manager),
2429					None,
2430					None,
2431					Arc::clone(&kv_store),
2432					Arc::clone(&tx_broadcaster),
2433					None,
2434					None,
2435				)
2436				.unwrap(),
2437			);
2438			let node = Node {
2439				node: manager,
2440				p2p_gossip_sync,
2441				rapid_gossip_sync,
2442				peer_manager,
2443				liquidity_manager,
2444				chain_monitor,
2445				kv_store,
2446				tx_broadcaster,
2447				network_graph,
2448				logger,
2449				best_block,
2450				scorer,
2451				sweeper,
2452				messenger,
2453			};
2454			nodes.push(node);
2455		}
2456
2457		for i in 0..num_nodes {
2458			for j in (i + 1)..num_nodes {
2459				let init_i = Init {
2460					features: nodes[j].node.init_features(),
2461					networks: None,
2462					remote_network_address: None,
2463				};
2464				nodes[i]
2465					.node
2466					.peer_connected(nodes[j].node.get_our_node_id(), &init_i, true)
2467					.unwrap();
2468				let init_j = Init {
2469					features: nodes[i].node.init_features(),
2470					networks: None,
2471					remote_network_address: None,
2472				};
2473				nodes[j]
2474					.node
2475					.peer_connected(nodes[i].node.get_our_node_id(), &init_j, false)
2476					.unwrap();
2477			}
2478		}
2479
2480		(persist_dir, nodes)
2481	}
2482
2483	macro_rules! open_channel {
2484		($node_a: expr, $node_b: expr, $channel_value: expr) => {{
2485			begin_open_channel!($node_a, $node_b, $channel_value);
2486			let events = $node_a.node.get_and_clear_pending_events();
2487			assert_eq!(events.len(), 1);
2488			let (temporary_channel_id, tx) =
2489				handle_funding_generation_ready!(events[0], $channel_value);
2490			$node_a
2491				.node
2492				.funding_transaction_generated(
2493					temporary_channel_id,
2494					$node_b.node.get_our_node_id(),
2495					tx.clone(),
2496				)
2497				.unwrap();
2498			let msg_a = get_event_msg!(
2499				$node_a,
2500				MessageSendEvent::SendFundingCreated,
2501				$node_b.node.get_our_node_id()
2502			);
2503			$node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
2504			get_event!($node_b, Event::ChannelPending);
2505			let msg_b = get_event_msg!(
2506				$node_b,
2507				MessageSendEvent::SendFundingSigned,
2508				$node_a.node.get_our_node_id()
2509			);
2510			$node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
2511			get_event!($node_a, Event::ChannelPending);
2512			tx
2513		}};
2514	}
2515
2516	macro_rules! begin_open_channel {
2517		($node_a: expr, $node_b: expr, $channel_value: expr) => {{
2518			$node_a
2519				.node
2520				.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None, None)
2521				.unwrap();
2522			let msg_a = get_event_msg!(
2523				$node_a,
2524				MessageSendEvent::SendOpenChannel,
2525				$node_b.node.get_our_node_id()
2526			);
2527			$node_b.node.handle_open_channel($node_a.node.get_our_node_id(), &msg_a);
2528			let msg_b = get_event_msg!(
2529				$node_b,
2530				MessageSendEvent::SendAcceptChannel,
2531				$node_a.node.get_our_node_id()
2532			);
2533			$node_a.node.handle_accept_channel($node_b.node.get_our_node_id(), &msg_b);
2534		}};
2535	}
2536
2537	macro_rules! handle_funding_generation_ready {
2538		($event: expr, $channel_value: expr) => {{
2539			match $event {
2540				Event::FundingGenerationReady {
2541					temporary_channel_id,
2542					channel_value_satoshis,
2543					ref output_script,
2544					user_channel_id,
2545					..
2546				} => {
2547					assert_eq!(channel_value_satoshis, $channel_value);
2548					assert_eq!(user_channel_id, 42);
2549
2550					let tx = Transaction {
2551						version: Version::ONE,
2552						lock_time: LockTime::ZERO,
2553						input: Vec::new(),
2554						output: vec![TxOut {
2555							value: Amount::from_sat(channel_value_satoshis),
2556							script_pubkey: output_script.clone(),
2557						}],
2558					};
2559					(temporary_channel_id, tx)
2560				},
2561				_ => panic!("Unexpected event"),
2562			}
2563		}};
2564	}
2565
2566	fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
2567		for i in 1..=depth {
2568			let prev_blockhash = node.best_block.block_hash;
2569			let height = node.best_block.height + 1;
2570			let header = create_dummy_header(prev_blockhash, height);
2571			let txdata = vec![(0, tx)];
2572			node.best_block = BestBlock::new(header.block_hash(), height);
2573			match i {
2574				1 => {
2575					node.node.transactions_confirmed(&header, &txdata, height);
2576					node.chain_monitor.transactions_confirmed(&header, &txdata, height);
2577					node.sweeper.transactions_confirmed(&header, &txdata, height);
2578				},
2579				x if x == depth => {
2580					// We need the TestBroadcaster to know about the new height so that it doesn't think
2581					// we're violating the time lock requirements of transactions broadcasted at that
2582					// point.
2583					let block = (genesis_block(Network::Bitcoin), height);
2584					node.tx_broadcaster.blocks.lock().unwrap().push(block);
2585					node.node.best_block_updated(&header, height);
2586					node.chain_monitor.best_block_updated(&header, height);
2587					node.sweeper.best_block_updated(&header, height);
2588				},
2589				_ => {},
2590			}
2591		}
2592	}
2593
2594	fn advance_chain(node: &mut Node, num_blocks: u32) {
2595		for i in 1..=num_blocks {
2596			let prev_blockhash = node.best_block.block_hash;
2597			let height = node.best_block.height + 1;
2598			let header = create_dummy_header(prev_blockhash, height);
2599			node.best_block = BestBlock::new(header.block_hash(), height);
2600			if i == num_blocks {
2601				// We need the TestBroadcaster to know about the new height so that it doesn't think
2602				// we're violating the time lock requirements of transactions broadcasted at that
2603				// point.
2604				let block = (genesis_block(Network::Bitcoin), height);
2605				node.tx_broadcaster.blocks.lock().unwrap().push(block);
2606				node.node.best_block_updated(&header, height);
2607				node.chain_monitor.best_block_updated(&header, height);
2608				node.sweeper.best_block_updated(&header, height);
2609			}
2610		}
2611	}
2612
2613	fn confirm_transaction(node: &mut Node, tx: &Transaction) {
2614		confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
2615	}
2616
2617	#[test]
2618	fn test_background_processor() {
2619		// Test that when a new channel is created, the ChannelManager needs to be re-persisted with
2620		// updates. Also test that when new updates are available, the manager signals that it needs
2621		// re-persistence and is successfully re-persisted.
2622		let (persist_dir, nodes) = create_nodes(2, "test_background_processor");
2623
2624		// Go through the channel creation process so that each node has something to persist. Since
2625		// open_channel consumes events, it must complete before starting BackgroundProcessor to
2626		// avoid a race with processing events.
2627		let tx = open_channel!(nodes[0], nodes[1], 100000);
2628
2629		// Initiate the background processors to watch each node.
2630		let data_dir = nodes[0].kv_store.get_data_dir();
2631		let persister = Arc::new(Persister::new(data_dir));
2632		let event_handler = |_: _| Ok(());
2633		let bg_processor = BackgroundProcessor::start(
2634			persister,
2635			event_handler,
2636			Arc::clone(&nodes[0].chain_monitor),
2637			Arc::clone(&nodes[0].node),
2638			Some(Arc::clone(&nodes[0].messenger)),
2639			nodes[0].p2p_gossip_sync(),
2640			Arc::clone(&nodes[0].peer_manager),
2641			Some(Arc::clone(&nodes[0].liquidity_manager)),
2642			Some(Arc::clone(&nodes[0].sweeper)),
2643			Arc::clone(&nodes[0].logger),
2644			Some(Arc::clone(&nodes[0].scorer)),
2645		);
2646
2647		macro_rules! check_persisted_data {
2648			($node: expr, $filepath: expr) => {
2649				let mut expected_bytes = Vec::new();
2650				loop {
2651					expected_bytes.clear();
2652					match $node.write(&mut expected_bytes) {
2653						Ok(()) => match std::fs::read($filepath) {
2654							Ok(bytes) => {
2655								if bytes == expected_bytes {
2656									break;
2657								} else {
2658									continue;
2659								}
2660							},
2661							Err(_) => continue,
2662						},
2663						Err(e) => panic!("Unexpected error: {}", e),
2664					}
2665				}
2666			};
2667		}
2668
2669		// Check that the initial channel manager data is persisted as expected.
2670		let filepath =
2671			get_full_filepath(format!("{}_persister_0", &persist_dir), "manager".to_string());
2672		check_persisted_data!(nodes[0].node, filepath.clone());
2673
2674		loop {
2675			if !nodes[0].node.get_event_or_persist_condvar_value() {
2676				break;
2677			}
2678		}
2679
2680		// Force-close the channel.
2681		let error_message = "Channel force-closed";
2682		nodes[0]
2683			.node
2684			.force_close_broadcasting_latest_txn(
2685				&ChannelId::v1_from_funding_outpoint(OutPoint {
2686					txid: tx.compute_txid(),
2687					index: 0,
2688				}),
2689				&nodes[1].node.get_our_node_id(),
2690				error_message.to_string(),
2691			)
2692			.unwrap();
2693
2694		// Check that the force-close updates are persisted.
2695		check_persisted_data!(nodes[0].node, filepath.clone());
2696		loop {
2697			if !nodes[0].node.get_event_or_persist_condvar_value() {
2698				break;
2699			}
2700		}
2701
2702		// Check network graph is persisted
2703		let filepath =
2704			get_full_filepath(format!("{}_persister_0", &persist_dir), "network_graph".to_string());
2705		check_persisted_data!(nodes[0].network_graph, filepath.clone());
2706
2707		// Check scorer is persisted
2708		let filepath =
2709			get_full_filepath(format!("{}_persister_0", &persist_dir), "scorer".to_string());
2710		check_persisted_data!(nodes[0].scorer, filepath.clone());
2711
2712		if !std::thread::panicking() {
2713			bg_processor.stop().unwrap();
2714		}
2715	}
2716
2717	#[test]
2718	fn test_timer_tick_called() {
2719		// Test that:
2720		// - `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`,
2721		// - `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`,
2722		// - `PeerManager::timer_tick_occurred` is called every `PING_TIMER`, and
2723		// - `OnionMessageHandler::timer_tick_occurred` is called every `ONION_MESSAGE_HANDLER_TIMER`.
2724		let (_, nodes) = create_nodes(1, "test_timer_tick_called");
2725		let data_dir = nodes[0].kv_store.get_data_dir();
2726		let persister = Arc::new(Persister::new(data_dir));
2727		let event_handler = |_: _| Ok(());
2728		let bg_processor = BackgroundProcessor::start(
2729			persister,
2730			event_handler,
2731			Arc::clone(&nodes[0].chain_monitor),
2732			Arc::clone(&nodes[0].node),
2733			Some(Arc::clone(&nodes[0].messenger)),
2734			nodes[0].no_gossip_sync(),
2735			Arc::clone(&nodes[0].peer_manager),
2736			Some(Arc::clone(&nodes[0].liquidity_manager)),
2737			Some(Arc::clone(&nodes[0].sweeper)),
2738			Arc::clone(&nodes[0].logger),
2739			Some(Arc::clone(&nodes[0].scorer)),
2740		);
2741		loop {
2742			let log_entries = nodes[0].logger.lines.lock().unwrap();
2743			let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
2744			let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
2745			let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
2746			let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string();
2747			if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some()
2748				&& log_entries.get(&("lightning_background_processor", desired_log_2)).is_some()
2749				&& log_entries.get(&("lightning_background_processor", desired_log_3)).is_some()
2750				&& log_entries.get(&("lightning_background_processor", desired_log_4)).is_some()
2751			{
2752				break;
2753			}
2754		}
2755
2756		if !std::thread::panicking() {
2757			bg_processor.stop().unwrap();
2758		}
2759	}
2760
2761	#[test]
2762	fn test_channel_manager_persist_error() {
2763		// Test that if we encounter an error during manager persistence, the thread panics.
2764		let (_, nodes) = create_nodes(2, "test_persist_error");
2765		open_channel!(nodes[0], nodes[1], 100000);
2766
2767		let data_dir = nodes[0].kv_store.get_data_dir();
2768		let persister = Arc::new(
2769			Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
2770		);
2771		let event_handler = |_: _| Ok(());
2772		let bg_processor = BackgroundProcessor::start(
2773			persister,
2774			event_handler,
2775			Arc::clone(&nodes[0].chain_monitor),
2776			Arc::clone(&nodes[0].node),
2777			Some(Arc::clone(&nodes[0].messenger)),
2778			nodes[0].no_gossip_sync(),
2779			Arc::clone(&nodes[0].peer_manager),
2780			Some(Arc::clone(&nodes[0].liquidity_manager)),
2781			Some(Arc::clone(&nodes[0].sweeper)),
2782			Arc::clone(&nodes[0].logger),
2783			Some(Arc::clone(&nodes[0].scorer)),
2784		);
2785		match bg_processor.join() {
2786			Ok(_) => panic!("Expected error persisting manager"),
2787			Err(e) => {
2788				assert_eq!(e.kind(), std::io::ErrorKind::Other);
2789				assert_eq!(e.get_ref().unwrap().to_string(), "test");
2790			},
2791		}
2792	}
2793
2794	#[tokio::test]
2795	async fn test_channel_manager_persist_error_async() {
2796		// Test that if we encounter an error during manager persistence, the thread panics.
2797		let (_, nodes) = create_nodes(2, "test_persist_error_sync");
2798		open_channel!(nodes[0], nodes[1], 100000);
2799
2800		let data_dir = nodes[0].kv_store.get_data_dir();
2801		let kv_store_sync = Arc::new(
2802			Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
2803		);
2804		let kv_store = KVStoreSyncWrapper(kv_store_sync);
2805
2806		// Yes, you can unsafe { turn off the borrow checker }
2807		let lm_async: &'static LiquidityManager<_, _, _, _, _, _, _> = unsafe {
2808			&*(nodes[0].liquidity_manager.get_lm_async()
2809				as *const LiquidityManager<_, _, _, _, _, _, _>)
2810				as &'static LiquidityManager<_, _, _, _, _, _, _>
2811		};
2812		let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
2813			&*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
2814				as &'static OutputSweeper<_, _, _, _, _, _, _>
2815		};
2816
2817		let bp_future = super::process_events_async(
2818			kv_store,
2819			|_: _| async { Ok(()) },
2820			Arc::clone(&nodes[0].chain_monitor),
2821			Arc::clone(&nodes[0].node),
2822			Some(Arc::clone(&nodes[0].messenger)),
2823			nodes[0].rapid_gossip_sync(),
2824			Arc::clone(&nodes[0].peer_manager),
2825			Some(lm_async),
2826			Some(sweeper_async),
2827			Arc::clone(&nodes[0].logger),
2828			Some(Arc::clone(&nodes[0].scorer)),
2829			move |dur: Duration| {
2830				Box::pin(async move {
2831					tokio::time::sleep(dur).await;
2832					false // Never exit
2833				})
2834			},
2835			false,
2836			|| Some(Duration::ZERO),
2837		);
2838		match bp_future.await {
2839			Ok(_) => panic!("Expected error persisting manager"),
2840			Err(e) => {
2841				assert_eq!(e.kind(), lightning::io::ErrorKind::Other);
2842				assert_eq!(e.get_ref().unwrap().to_string(), "test");
2843			},
2844		}
2845	}
2846
2847	#[test]
2848	fn test_network_graph_persist_error() {
2849		// Test that if we encounter an error during network graph persistence, an error gets returned.
2850		let (_, nodes) = create_nodes(2, "test_persist_network_graph_error");
2851		let data_dir = nodes[0].kv_store.get_data_dir();
2852		let persister =
2853			Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
2854		let event_handler = |_: _| Ok(());
2855		let bg_processor = BackgroundProcessor::start(
2856			persister,
2857			event_handler,
2858			Arc::clone(&nodes[0].chain_monitor),
2859			Arc::clone(&nodes[0].node),
2860			Some(Arc::clone(&nodes[0].messenger)),
2861			nodes[0].p2p_gossip_sync(),
2862			Arc::clone(&nodes[0].peer_manager),
2863			Some(Arc::clone(&nodes[0].liquidity_manager)),
2864			Some(Arc::clone(&nodes[0].sweeper)),
2865			Arc::clone(&nodes[0].logger),
2866			Some(Arc::clone(&nodes[0].scorer)),
2867		);
2868
2869		match bg_processor.stop() {
2870			Ok(_) => panic!("Expected error persisting network graph"),
2871			Err(e) => {
2872				assert_eq!(e.kind(), std::io::ErrorKind::Other);
2873				assert_eq!(e.get_ref().unwrap().to_string(), "test");
2874			},
2875		}
2876	}
2877
2878	#[test]
2879	fn test_scorer_persist_error() {
2880		// Test that if we encounter an error during scorer persistence, an error gets returned.
2881		let (_, nodes) = create_nodes(2, "test_persist_scorer_error");
2882		let data_dir = nodes[0].kv_store.get_data_dir();
2883		let persister =
2884			Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
2885		let event_handler = |_: _| Ok(());
2886		let bg_processor = BackgroundProcessor::start(
2887			persister,
2888			event_handler,
2889			Arc::clone(&nodes[0].chain_monitor),
2890			Arc::clone(&nodes[0].node),
2891			Some(Arc::clone(&nodes[0].messenger)),
2892			nodes[0].no_gossip_sync(),
2893			Arc::clone(&nodes[0].peer_manager),
2894			Some(Arc::clone(&nodes[0].liquidity_manager)),
2895			Some(Arc::clone(&nodes[0].sweeper)),
2896			Arc::clone(&nodes[0].logger),
2897			Some(Arc::clone(&nodes[0].scorer)),
2898		);
2899
2900		match bg_processor.stop() {
2901			Ok(_) => panic!("Expected error persisting scorer"),
2902			Err(e) => {
2903				assert_eq!(e.kind(), std::io::ErrorKind::Other);
2904				assert_eq!(e.get_ref().unwrap().to_string(), "test");
2905			},
2906		}
2907	}
2908
2909	#[test]
2910	fn test_background_event_handling() {
2911		let (_, mut nodes) = create_nodes(2, "test_background_event_handling");
2912		let node_0_id = nodes[0].node.get_our_node_id();
2913		let node_1_id = nodes[1].node.get_our_node_id();
2914
2915		let channel_value = 100000;
2916		let data_dir = nodes[0].kv_store.get_data_dir();
2917		let persister = Arc::new(Persister::new(data_dir.clone()));
2918
2919		// Set up a background event handler for FundingGenerationReady events.
2920		let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
2921		let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
2922		let event_handler = move |event: Event| {
2923			match event {
2924				Event::FundingGenerationReady { .. } => funding_generation_send
2925					.send(handle_funding_generation_ready!(event, channel_value))
2926					.unwrap(),
2927				Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
2928				Event::ChannelReady { .. } => {},
2929				_ => panic!("Unexpected event: {:?}", event),
2930			}
2931			Ok(())
2932		};
2933
2934		let bg_processor = BackgroundProcessor::start(
2935			persister,
2936			event_handler,
2937			Arc::clone(&nodes[0].chain_monitor),
2938			Arc::clone(&nodes[0].node),
2939			Some(Arc::clone(&nodes[0].messenger)),
2940			nodes[0].no_gossip_sync(),
2941			Arc::clone(&nodes[0].peer_manager),
2942			Some(Arc::clone(&nodes[0].liquidity_manager)),
2943			Some(Arc::clone(&nodes[0].sweeper)),
2944			Arc::clone(&nodes[0].logger),
2945			Some(Arc::clone(&nodes[0].scorer)),
2946		);
2947
2948		// Open a channel and check that the FundingGenerationReady event was handled.
2949		begin_open_channel!(nodes[0], nodes[1], channel_value);
2950		let (temporary_channel_id, funding_tx) = funding_generation_recv
2951			.recv_timeout(EVENT_DEADLINE)
2952			.expect("FundingGenerationReady not handled within deadline");
2953		nodes[0]
2954			.node
2955			.funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
2956			.unwrap();
2957		let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
2958		nodes[1].node.handle_funding_created(node_0_id, &msg_0);
2959		get_event!(nodes[1], Event::ChannelPending);
2960		let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
2961		nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
2962		channel_pending_recv
2963			.recv_timeout(EVENT_DEADLINE)
2964			.expect("ChannelPending not handled within deadline");
2965
2966		// Confirm the funding transaction.
2967		confirm_transaction(&mut nodes[0], &funding_tx);
2968		let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, node_1_id);
2969		confirm_transaction(&mut nodes[1], &funding_tx);
2970		let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, node_0_id);
2971		nodes[0].node.handle_channel_ready(node_1_id, &bs_funding);
2972		let _as_channel_update =
2973			get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, node_1_id);
2974		nodes[1].node.handle_channel_ready(node_0_id, &as_funding);
2975		let _bs_channel_update =
2976			get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, node_0_id);
2977		let broadcast_funding =
2978			nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
2979		assert_eq!(broadcast_funding.compute_txid(), funding_tx.compute_txid());
2980		assert!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty());
2981
2982		if !std::thread::panicking() {
2983			bg_processor.stop().unwrap();
2984		}
2985
2986		// Set up a background event handler for SpendableOutputs events.
2987		let (sender, receiver) = std::sync::mpsc::sync_channel(1);
2988		let event_handler = move |event: Event| {
2989			match event {
2990				Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
2991				Event::ChannelReady { .. } => {},
2992				Event::ChannelClosed { .. } => {},
2993				_ => panic!("Unexpected event: {:?}", event),
2994			}
2995			Ok(())
2996		};
2997		let persister = Arc::new(Persister::new(data_dir));
2998		let bg_processor = BackgroundProcessor::start(
2999			persister,
3000			event_handler,
3001			Arc::clone(&nodes[0].chain_monitor),
3002			Arc::clone(&nodes[0].node),
3003			Some(Arc::clone(&nodes[0].messenger)),
3004			nodes[0].no_gossip_sync(),
3005			Arc::clone(&nodes[0].peer_manager),
3006			Some(Arc::clone(&nodes[0].liquidity_manager)),
3007			Some(Arc::clone(&nodes[0].sweeper)),
3008			Arc::clone(&nodes[0].logger),
3009			Some(Arc::clone(&nodes[0].scorer)),
3010		);
3011
3012		// Force close the channel and check that the SpendableOutputs event was handled.
3013		let error_message = "Channel force-closed";
3014		nodes[0]
3015			.node
3016			.force_close_broadcasting_latest_txn(
3017				&nodes[0].node.list_channels()[0].channel_id,
3018				&node_1_id,
3019				error_message.to_string(),
3020			)
3021			.unwrap();
3022		let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
3023		confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
3024
3025		let event =
3026			receiver.recv_timeout(EVENT_DEADLINE).expect("Events not handled within deadline");
3027		match event {
3028			Event::SpendableOutputs { outputs, channel_id } => {
3029				nodes[0]
3030					.sweeper
3031					.track_spendable_outputs(outputs, channel_id, false, Some(153))
3032					.unwrap();
3033			},
3034			_ => panic!("Unexpected event: {:?}", event),
3035		}
3036
3037		// Check we don't generate an initial sweeping tx until we reach the required height.
3038		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3039		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3040		if let Some(sweep_tx_0) = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop() {
3041			assert!(!tracked_output.is_spent_in(&sweep_tx_0));
3042			match tracked_output.status {
3043				OutputSpendStatus::PendingInitialBroadcast { delayed_until_height } => {
3044					assert_eq!(delayed_until_height, Some(153));
3045				},
3046				_ => panic!("Unexpected status"),
3047			}
3048		}
3049
3050		advance_chain(&mut nodes[0], 3);
3051
3052		let tx_broadcaster = Arc::clone(&nodes[0].tx_broadcaster);
3053		let wait_for_sweep_tx = || -> Transaction {
3054			loop {
3055				let sweep_tx = tx_broadcaster.txn_broadcasted.lock().unwrap().pop();
3056				if let Some(sweep_tx) = sweep_tx {
3057					return sweep_tx;
3058				}
3059
3060				std::thread::sleep(Duration::from_millis(10));
3061			}
3062		};
3063
3064		// Check we generate an initial sweeping tx.
3065		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3066		let sweep_tx_0 = wait_for_sweep_tx();
3067		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3068		match tracked_output.status {
3069			OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
3070				assert_eq!(sweep_tx_0.compute_txid(), latest_spending_tx.compute_txid());
3071			},
3072			_ => panic!("Unexpected status"),
3073		}
3074
3075		// Check we regenerate and rebroadcast the sweeping tx each block.
3076		advance_chain(&mut nodes[0], 1);
3077		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3078		let sweep_tx_1 = wait_for_sweep_tx();
3079		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3080		match tracked_output.status {
3081			OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
3082				assert_eq!(sweep_tx_1.compute_txid(), latest_spending_tx.compute_txid());
3083			},
3084			_ => panic!("Unexpected status"),
3085		}
3086		assert_ne!(sweep_tx_0, sweep_tx_1);
3087
3088		advance_chain(&mut nodes[0], 1);
3089		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3090		let sweep_tx_2 = wait_for_sweep_tx();
3091		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3092		match tracked_output.status {
3093			OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
3094				assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
3095			},
3096			_ => panic!("Unexpected status"),
3097		}
3098		assert_ne!(sweep_tx_0, sweep_tx_2);
3099		assert_ne!(sweep_tx_1, sweep_tx_2);
3100
3101		// Check we still track the spendable outputs up to ANTI_REORG_DELAY confirmations.
3102		confirm_transaction_depth(&mut nodes[0], &sweep_tx_2, 5);
3103		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3104		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3105		match tracked_output.status {
3106			OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
3107				assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
3108			},
3109			_ => panic!("Unexpected status"),
3110		}
3111
3112		// Check we still see the transaction as confirmed if we unconfirm any untracked
3113		// transaction. (We previously had a bug that would mark tracked transactions as
3114		// unconfirmed if any transaction at an unknown block height would be unconfirmed.)
3115		let unconf_txid = Txid::from_slice(&[0; 32]).unwrap();
3116		nodes[0].sweeper.transaction_unconfirmed(&unconf_txid);
3117
3118		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3119		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3120		match tracked_output.status {
3121			OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
3122				assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
3123			},
3124			_ => panic!("Unexpected status"),
3125		}
3126
3127		// Check we stop tracking the spendable outputs when one of the txs reaches
3128		// PRUNE_DELAY_BLOCKS confirmations.
3129		confirm_transaction_depth(&mut nodes[0], &sweep_tx_0, PRUNE_DELAY_BLOCKS);
3130		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 0);
3131
3132		if !std::thread::panicking() {
3133			bg_processor.stop().unwrap();
3134		}
3135	}
3136
3137	#[test]
3138	fn test_event_handling_failures_are_replayed() {
3139		let (_, nodes) = create_nodes(2, "test_event_handling_failures_are_replayed");
3140		let channel_value = 100000;
3141		let data_dir = nodes[0].kv_store.get_data_dir();
3142		let persister = Arc::new(Persister::new(data_dir.clone()));
3143
3144		let (first_event_send, first_event_recv) = std::sync::mpsc::sync_channel(1);
3145		let (second_event_send, second_event_recv) = std::sync::mpsc::sync_channel(1);
3146		let should_fail_event_handling = Arc::new(AtomicBool::new(true));
3147		let event_handler = move |event: Event| {
3148			if let Ok(true) = should_fail_event_handling.compare_exchange(
3149				true,
3150				false,
3151				Ordering::Acquire,
3152				Ordering::Relaxed,
3153			) {
3154				first_event_send.send(event).unwrap();
3155				return Err(ReplayEvent());
3156			}
3157
3158			second_event_send.send(event).unwrap();
3159			Ok(())
3160		};
3161
3162		let bg_processor = BackgroundProcessor::start(
3163			persister,
3164			event_handler,
3165			Arc::clone(&nodes[0].chain_monitor),
3166			Arc::clone(&nodes[0].node),
3167			Some(Arc::clone(&nodes[0].messenger)),
3168			nodes[0].no_gossip_sync(),
3169			Arc::clone(&nodes[0].peer_manager),
3170			Some(Arc::clone(&nodes[0].liquidity_manager)),
3171			Some(Arc::clone(&nodes[0].sweeper)),
3172			Arc::clone(&nodes[0].logger),
3173			Some(Arc::clone(&nodes[0].scorer)),
3174		);
3175
3176		begin_open_channel!(nodes[0], nodes[1], channel_value);
3177		assert_eq!(
3178			first_event_recv.recv_timeout(EVENT_DEADLINE).unwrap(),
3179			second_event_recv.recv_timeout(EVENT_DEADLINE).unwrap()
3180		);
3181
3182		if !std::thread::panicking() {
3183			bg_processor.stop().unwrap();
3184		}
3185	}
3186
3187	#[test]
3188	fn test_scorer_persistence() {
3189		let (_, nodes) = create_nodes(2, "test_scorer_persistence");
3190		let data_dir = nodes[0].kv_store.get_data_dir();
3191		let persister = Arc::new(Persister::new(data_dir));
3192		let event_handler = |_: _| Ok(());
3193		let bg_processor = BackgroundProcessor::start(
3194			persister,
3195			event_handler,
3196			Arc::clone(&nodes[0].chain_monitor),
3197			Arc::clone(&nodes[0].node),
3198			Some(Arc::clone(&nodes[0].messenger)),
3199			nodes[0].no_gossip_sync(),
3200			Arc::clone(&nodes[0].peer_manager),
3201			Some(Arc::clone(&nodes[0].liquidity_manager)),
3202			Some(Arc::clone(&nodes[0].sweeper)),
3203			Arc::clone(&nodes[0].logger),
3204			Some(Arc::clone(&nodes[0].scorer)),
3205		);
3206
3207		loop {
3208			let log_entries = nodes[0].logger.lines.lock().unwrap();
3209			let expected_log = "Calling time_passed and persisting scorer".to_string();
3210			if log_entries.get(&("lightning_background_processor", expected_log)).is_some() {
3211				break;
3212			}
3213		}
3214
3215		if !std::thread::panicking() {
3216			bg_processor.stop().unwrap();
3217		}
3218	}
3219
3220	macro_rules! do_test_not_pruning_network_graph_until_graph_sync_completion {
3221		($nodes: expr, $receive: expr, $sleep: expr) => {
3222			let features = ChannelFeatures::empty();
3223			$nodes[0]
3224				.network_graph
3225				.add_channel_from_partial_announcement(
3226					42,
3227					None,
3228					53,
3229					features,
3230					$nodes[0].node.get_our_node_id().into(),
3231					$nodes[1].node.get_our_node_id().into(),
3232				)
3233				.expect("Failed to update channel from partial announcement");
3234			let original_graph_description = $nodes[0].network_graph.to_string();
3235			assert!(original_graph_description.contains("42: features: 0000, node_one:"));
3236			assert_eq!($nodes[0].network_graph.read_only().channels().len(), 1);
3237
3238			loop {
3239				$sleep;
3240				let log_entries = $nodes[0].logger.lines.lock().unwrap();
3241				let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
3242				if *log_entries.get(&("lightning_background_processor", loop_counter)).unwrap_or(&0)
3243					> 1
3244				{
3245					// Wait until the loop has gone around at least twice.
3246					break;
3247				}
3248			}
3249
3250			let initialization_input = vec![
3251				76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99,
3252				247, 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227,
3253				98, 218, 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61,
3254				250, 251, 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6,
3255				67, 2, 36, 125, 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47,
3256				115, 172, 63, 136, 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158,
3257				1, 242, 121, 152, 106, 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95,
3258				65, 3, 83, 185, 58, 138, 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136,
3259				149, 185, 226, 156, 137, 175, 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219,
3260				175, 168, 77, 4, 143, 38, 128, 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2,
3261				27, 0, 0, 0, 1, 0, 0, 255, 2, 68, 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0,
3262				0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232, 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255,
3263				8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0, 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6,
3264				11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
3265			];
3266			$nodes[0]
3267				.rapid_gossip_sync
3268				.update_network_graph_no_std(&initialization_input[..], Some(1642291930))
3269				.unwrap();
3270
3271			// this should have added two channels and pruned the previous one.
3272			assert_eq!($nodes[0].network_graph.read_only().channels().len(), 2);
3273
3274			$receive.expect("Network graph not pruned within deadline");
3275
3276			// all channels should now be pruned
3277			assert_eq!($nodes[0].network_graph.read_only().channels().len(), 0);
3278		};
3279	}
3280
3281	#[test]
3282	fn test_not_pruning_network_graph_until_graph_sync_completion() {
3283		let (sender, receiver) = std::sync::mpsc::sync_channel(1);
3284
3285		let (_, nodes) =
3286			create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion");
3287		let data_dir = nodes[0].kv_store.get_data_dir();
3288		let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
3289
3290		let event_handler = |_: _| Ok(());
3291		let background_processor = BackgroundProcessor::start(
3292			persister,
3293			event_handler,
3294			Arc::clone(&nodes[0].chain_monitor),
3295			Arc::clone(&nodes[0].node),
3296			Some(Arc::clone(&nodes[0].messenger)),
3297			nodes[0].rapid_gossip_sync(),
3298			Arc::clone(&nodes[0].peer_manager),
3299			Some(Arc::clone(&nodes[0].liquidity_manager)),
3300			Some(Arc::clone(&nodes[0].sweeper)),
3301			Arc::clone(&nodes[0].logger),
3302			Some(Arc::clone(&nodes[0].scorer)),
3303		);
3304
3305		do_test_not_pruning_network_graph_until_graph_sync_completion!(
3306			nodes,
3307			receiver.recv_timeout(super::FIRST_NETWORK_PRUNE_TIMER * 5),
3308			std::thread::sleep(Duration::from_millis(1))
3309		);
3310
3311		background_processor.stop().unwrap();
3312	}
3313
3314	#[tokio::test]
3315	async fn test_not_pruning_network_graph_until_graph_sync_completion_async() {
3316		let (sender, receiver) = std::sync::mpsc::sync_channel(1);
3317
3318		let (_, nodes) =
3319			create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
3320		let data_dir = nodes[0].kv_store.get_data_dir();
3321		let kv_store_sync =
3322			Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
3323		let kv_store = KVStoreSyncWrapper(kv_store_sync);
3324
3325		// Yes, you can unsafe { turn off the borrow checker }
3326		let lm_async: &'static LiquidityManager<_, _, _, _, _, _, _> = unsafe {
3327			&*(nodes[0].liquidity_manager.get_lm_async()
3328				as *const LiquidityManager<_, _, _, _, _, _, _>)
3329				as &'static LiquidityManager<_, _, _, _, _, _, _>
3330		};
3331		let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
3332			&*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
3333				as &'static OutputSweeper<_, _, _, _, _, _, _>
3334		};
3335
3336		let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
3337		let bp_future = super::process_events_async(
3338			kv_store,
3339			|_: _| async { Ok(()) },
3340			Arc::clone(&nodes[0].chain_monitor),
3341			Arc::clone(&nodes[0].node),
3342			Some(Arc::clone(&nodes[0].messenger)),
3343			nodes[0].rapid_gossip_sync(),
3344			Arc::clone(&nodes[0].peer_manager),
3345			Some(lm_async),
3346			Some(sweeper_async),
3347			Arc::clone(&nodes[0].logger),
3348			Some(Arc::clone(&nodes[0].scorer)),
3349			move |dur: Duration| {
3350				let mut exit_receiver = exit_receiver.clone();
3351				Box::pin(async move {
3352					tokio::select! {
3353						_ = tokio::time::sleep(dur) => false,
3354						_ = exit_receiver.changed() => true,
3355					}
3356				})
3357			},
3358			false,
3359			|| Some(Duration::from_secs(1696300000)),
3360		);
3361
3362		let t1 = tokio::spawn(bp_future);
3363		let t2 = tokio::spawn(async move {
3364			do_test_not_pruning_network_graph_until_graph_sync_completion!(
3365				nodes,
3366				{
3367					let mut i = 0;
3368					loop {
3369						tokio::time::sleep(super::FIRST_NETWORK_PRUNE_TIMER).await;
3370						if let Ok(()) = receiver.try_recv() {
3371							break Ok::<(), ()>(());
3372						}
3373						assert!(i < 5);
3374						i += 1;
3375					}
3376				},
3377				tokio::time::sleep(Duration::from_millis(1)).await
3378			);
3379			exit_sender.send(()).unwrap();
3380		});
3381		let (r1, r2) = tokio::join!(t1, t2);
3382		r1.unwrap().unwrap();
3383		r2.unwrap()
3384	}
3385
3386	macro_rules! do_test_payment_path_scoring {
3387		($nodes: expr, $receive: expr) => {
3388			// Ensure that we update the scorer when relevant events are processed. In this case, we ensure
3389			// that we update the scorer upon a payment path succeeding (note that the channel must be
3390			// public or else we won't score it).
3391			// A background event handler for FundingGenerationReady events must be hooked up to a
3392			// running background processor.
3393			let scored_scid = 4242;
3394			let secp_ctx = Secp256k1::new();
3395			let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
3396			let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
3397
3398			let path = Path { hops: vec![RouteHop {
3399				pubkey: node_1_id,
3400				node_features: NodeFeatures::empty(),
3401				short_channel_id: scored_scid,
3402				channel_features: ChannelFeatures::empty(),
3403				fee_msat: 0,
3404				cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
3405				maybe_announced_channel: true,
3406			}], blinded_tail: None };
3407
3408			$nodes[0].scorer.write_lock().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
3409			$nodes[0].node.push_pending_event(Event::PaymentPathFailed {
3410				payment_id: None,
3411				payment_hash: PaymentHash([42; 32]),
3412				payment_failed_permanently: false,
3413				failure: PathFailure::OnPath { network_update: None },
3414				path: path.clone(),
3415				short_channel_id: Some(scored_scid),
3416				error_code: None,
3417				error_data: None,
3418				hold_times: Vec::new(),
3419			});
3420			let event = $receive.expect("PaymentPathFailed not handled within deadline");
3421			match event {
3422				Event::PaymentPathFailed { .. } => {},
3423				_ => panic!("Unexpected event"),
3424			}
3425
3426			// Ensure we'll score payments that were explicitly failed back by the destination as
3427			// ProbeSuccess.
3428			$nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
3429			$nodes[0].node.push_pending_event(Event::PaymentPathFailed {
3430				payment_id: None,
3431				payment_hash: PaymentHash([42; 32]),
3432				payment_failed_permanently: true,
3433				failure: PathFailure::OnPath { network_update: None },
3434				path: path.clone(),
3435				short_channel_id: None,
3436				error_code: None,
3437				error_data: None,
3438				hold_times: Vec::new(),
3439			});
3440			let event = $receive.expect("PaymentPathFailed not handled within deadline");
3441			match event {
3442				Event::PaymentPathFailed { .. } => {},
3443				_ => panic!("Unexpected event"),
3444			}
3445
3446			$nodes[0].scorer.write_lock().expect(TestResult::PaymentSuccess { path: path.clone() });
3447			$nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
3448				payment_id: PaymentId([42; 32]),
3449				payment_hash: None,
3450				path: path.clone(),
3451				hold_times: Vec::new(),
3452			});
3453			let event = $receive.expect("PaymentPathSuccessful not handled within deadline");
3454			match event {
3455				Event::PaymentPathSuccessful { .. } => {},
3456				_ => panic!("Unexpected event"),
3457			}
3458
3459			$nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
3460			$nodes[0].node.push_pending_event(Event::ProbeSuccessful {
3461				payment_id: PaymentId([42; 32]),
3462				payment_hash: PaymentHash([42; 32]),
3463				path: path.clone(),
3464			});
3465			let event = $receive.expect("ProbeSuccessful not handled within deadline");
3466			match event {
3467				Event::ProbeSuccessful  { .. } => {},
3468				_ => panic!("Unexpected event"),
3469			}
3470
3471			$nodes[0].scorer.write_lock().expect(TestResult::ProbeFailure { path: path.clone() });
3472			$nodes[0].node.push_pending_event(Event::ProbeFailed {
3473				payment_id: PaymentId([42; 32]),
3474				payment_hash: PaymentHash([42; 32]),
3475				path,
3476				short_channel_id: Some(scored_scid),
3477			});
3478			let event = $receive.expect("ProbeFailure not handled within deadline");
3479			match event {
3480				Event::ProbeFailed { .. } => {},
3481				_ => panic!("Unexpected event"),
3482			}
3483		}
3484	}
3485
3486	#[test]
3487	fn test_payment_path_scoring() {
3488		let (sender, receiver) = std::sync::mpsc::sync_channel(1);
3489		let event_handler = move |event: Event| {
3490			match event {
3491				Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
3492				Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
3493				Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
3494				Event::ProbeFailed { .. } => sender.send(event).unwrap(),
3495				_ => panic!("Unexpected event: {:?}", event),
3496			}
3497			Ok(())
3498		};
3499
3500		let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
3501		let data_dir = nodes[0].kv_store.get_data_dir();
3502		let persister = Arc::new(Persister::new(data_dir));
3503		let bg_processor = BackgroundProcessor::start(
3504			persister,
3505			event_handler,
3506			Arc::clone(&nodes[0].chain_monitor),
3507			Arc::clone(&nodes[0].node),
3508			Some(Arc::clone(&nodes[0].messenger)),
3509			nodes[0].no_gossip_sync(),
3510			Arc::clone(&nodes[0].peer_manager),
3511			Some(Arc::clone(&nodes[0].liquidity_manager)),
3512			Some(Arc::clone(&nodes[0].sweeper)),
3513			Arc::clone(&nodes[0].logger),
3514			Some(Arc::clone(&nodes[0].scorer)),
3515		);
3516
3517		do_test_payment_path_scoring!(nodes, receiver.recv_timeout(EVENT_DEADLINE));
3518
3519		if !std::thread::panicking() {
3520			bg_processor.stop().unwrap();
3521		}
3522
3523		let log_entries = nodes[0].logger.lines.lock().unwrap();
3524		let expected_log = "Persisting scorer after update".to_string();
3525		assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5);
3526	}
3527
3528	#[tokio::test]
3529	async fn test_payment_path_scoring_async() {
3530		let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
3531		let event_handler = move |event: Event| {
3532			let sender_ref = sender.clone();
3533			async move {
3534				match event {
3535					Event::PaymentPathFailed { .. } => sender_ref.send(event).await.unwrap(),
3536					Event::PaymentPathSuccessful { .. } => sender_ref.send(event).await.unwrap(),
3537					Event::ProbeSuccessful { .. } => sender_ref.send(event).await.unwrap(),
3538					Event::ProbeFailed { .. } => sender_ref.send(event).await.unwrap(),
3539					_ => panic!("Unexpected event: {:?}", event),
3540				}
3541				Ok(())
3542			}
3543		};
3544
3545		let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
3546		let data_dir = nodes[0].kv_store.get_data_dir();
3547		let kv_store_sync = Arc::new(Persister::new(data_dir));
3548		let kv_store = KVStoreSyncWrapper(kv_store_sync);
3549
3550		let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
3551
3552		// Yes, you can unsafe { turn off the borrow checker }
3553		let lm_async: &'static LiquidityManager<_, _, _, _, _, _, _> = unsafe {
3554			&*(nodes[0].liquidity_manager.get_lm_async()
3555				as *const LiquidityManager<_, _, _, _, _, _, _>)
3556				as &'static LiquidityManager<_, _, _, _, _, _, _>
3557		};
3558		let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
3559			&*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
3560				as &'static OutputSweeper<_, _, _, _, _, _, _>
3561		};
3562
3563		let bp_future = super::process_events_async(
3564			kv_store,
3565			event_handler,
3566			Arc::clone(&nodes[0].chain_monitor),
3567			Arc::clone(&nodes[0].node),
3568			Some(Arc::clone(&nodes[0].messenger)),
3569			nodes[0].no_gossip_sync(),
3570			Arc::clone(&nodes[0].peer_manager),
3571			Some(lm_async),
3572			Some(sweeper_async),
3573			Arc::clone(&nodes[0].logger),
3574			Some(Arc::clone(&nodes[0].scorer)),
3575			move |dur: Duration| {
3576				let mut exit_receiver = exit_receiver.clone();
3577				Box::pin(async move {
3578					tokio::select! {
3579						_ = tokio::time::sleep(dur) => false,
3580						_ = exit_receiver.changed() => true,
3581					}
3582				})
3583			},
3584			false,
3585			|| Some(Duration::ZERO),
3586		);
3587		let t1 = tokio::spawn(bp_future);
3588		let t2 = tokio::spawn(async move {
3589			do_test_payment_path_scoring!(nodes, receiver.recv().await);
3590			exit_sender.send(()).unwrap();
3591
3592			let log_entries = nodes[0].logger.lines.lock().unwrap();
3593			let expected_log = "Persisting scorer after update".to_string();
3594			assert_eq!(
3595				*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(),
3596				5
3597			);
3598		});
3599
3600		let (r1, r2) = tokio::join!(t1, t2);
3601		r1.unwrap().unwrap();
3602		r2.unwrap()
3603	}
3604
3605	#[tokio::test]
3606	#[cfg(not(c_bindings))]
3607	async fn test_no_consts() {
3608		// Compile-test the NO_* constants can be used.
3609		let (_, nodes) = create_nodes(1, "test_no_consts");
3610		let bg_processor = BackgroundProcessor::start(
3611			Arc::clone(&nodes[0].kv_store),
3612			move |_: Event| Ok(()),
3613			Arc::clone(&nodes[0].chain_monitor),
3614			Arc::clone(&nodes[0].node),
3615			crate::NO_ONION_MESSENGER,
3616			nodes[0].no_gossip_sync(),
3617			Arc::clone(&nodes[0].peer_manager),
3618			crate::NO_LIQUIDITY_MANAGER_SYNC,
3619			Some(Arc::clone(&nodes[0].sweeper)),
3620			Arc::clone(&nodes[0].logger),
3621			Some(Arc::clone(&nodes[0].scorer)),
3622		);
3623
3624		if !std::thread::panicking() {
3625			bg_processor.stop().unwrap();
3626		}
3627
3628		let kv_store = KVStoreSyncWrapper(Arc::clone(&nodes[0].kv_store));
3629		let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
3630		let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
3631			&*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
3632				as &'static OutputSweeper<_, _, _, _, _, _, _>
3633		};
3634		let bp_future = super::process_events_async(
3635			kv_store,
3636			move |_: Event| async move { Ok(()) },
3637			Arc::clone(&nodes[0].chain_monitor),
3638			Arc::clone(&nodes[0].node),
3639			crate::NO_ONION_MESSENGER,
3640			nodes[0].no_gossip_sync(),
3641			Arc::clone(&nodes[0].peer_manager),
3642			crate::NO_LIQUIDITY_MANAGER,
3643			Some(sweeper_async),
3644			Arc::clone(&nodes[0].logger),
3645			Some(Arc::clone(&nodes[0].scorer)),
3646			move |dur: Duration| {
3647				let mut exit_receiver = exit_receiver.clone();
3648				Box::pin(async move {
3649					tokio::select! {
3650						_ = tokio::time::sleep(dur) => false,
3651						_ = exit_receiver.changed() => true,
3652					}
3653				})
3654			},
3655			false,
3656			|| Some(Duration::ZERO),
3657		);
3658		let t1 = tokio::spawn(bp_future);
3659		exit_sender.send(()).unwrap();
3660		t1.await.unwrap().unwrap();
3661	}
3662}