lightning_background_processor/
lib.rs

1// This file is Copyright its original authors, visible in version control
2// history.
3//
4// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7// You may not use this file except in accordance with one or both of these
8// licenses.
9
10//! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
11//! running properly, and (2) either can or should be run in the background.
12#![cfg_attr(feature = "std", doc = "See docs for [`BackgroundProcessor`] for more details.")]
13#![deny(rustdoc::broken_intra_doc_links)]
14#![deny(rustdoc::private_intra_doc_links)]
15#![deny(missing_docs)]
16#![cfg_attr(docsrs, feature(doc_cfg))]
17#![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
18
19#[cfg(any(test, feature = "std"))]
20extern crate core;
21
22#[cfg(not(feature = "std"))]
23extern crate alloc;
24
25#[macro_use]
26extern crate lightning;
27extern crate lightning_rapid_gossip_sync;
28
29mod fwd_batch;
30
31use fwd_batch::BatchDelay;
32
33use lightning::chain;
34use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
35use lightning::chain::chainmonitor::{ChainMonitor, Persist};
36#[cfg(feature = "std")]
37use lightning::events::EventHandler;
38#[cfg(feature = "std")]
39use lightning::events::EventsProvider;
40use lightning::events::ReplayEvent;
41use lightning::events::{Event, PathFailure};
42use lightning::util::ser::Writeable;
43
44use lightning::ln::channelmanager::AChannelManager;
45use lightning::ln::msgs::OnionMessageHandler;
46use lightning::ln::peer_handler::APeerManager;
47use lightning::onion_message::messenger::AOnionMessenger;
48use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
49use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
50use lightning::routing::utxo::UtxoLookup;
51use lightning::sign::{
52	ChangeDestinationSource, ChangeDestinationSourceSync, EntropySource, OutputSpender,
53};
54use lightning::util::logger::Logger;
55use lightning::util::persist::{
56	KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
57	CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
58	NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
59	NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY,
60	SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
61};
62use lightning::util::sweep::{OutputSweeper, OutputSweeperSync};
63#[cfg(feature = "std")]
64use lightning::util::wakers::Sleeper;
65use lightning_rapid_gossip_sync::RapidGossipSync;
66
67use lightning_liquidity::ALiquidityManager;
68#[cfg(feature = "std")]
69use lightning_liquidity::ALiquidityManagerSync;
70
71use core::ops::Deref;
72use core::time::Duration;
73
74#[cfg(feature = "std")]
75use core::sync::atomic::{AtomicBool, Ordering};
76#[cfg(feature = "std")]
77use std::sync::Arc;
78#[cfg(feature = "std")]
79use std::thread::{self, JoinHandle};
80#[cfg(feature = "std")]
81use std::time::Instant;
82
83#[cfg(not(feature = "std"))]
84use alloc::boxed::Box;
85#[cfg(all(not(c_bindings), not(feature = "std")))]
86use alloc::sync::Arc;
87
88/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
89/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
90/// responsibilities are:
91/// * Processing [`Event`]s with a user-provided [`EventHandler`].
92/// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
93///   writing it to disk/backups by invoking the callback given to it at startup.
94///   [`ChannelManager`] persistence should be done in the background.
95/// * Calling [`ChannelManager::timer_tick_occurred`], [`ChainMonitor::rebroadcast_pending_claims`]
96///   and [`PeerManager::timer_tick_occurred`] at the appropriate intervals.
97/// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
98///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
99///
100/// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
101/// upon as doing so may result in high latency.
102///
103/// # Note
104///
105/// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
106/// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
107/// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
108/// unilateral chain closure fees are at risk.
109///
110/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
111/// [`ChannelManager::timer_tick_occurred`]: lightning::ln::channelmanager::ChannelManager::timer_tick_occurred
112/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
113/// [`Event`]: lightning::events::Event
114/// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred
115/// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
116#[cfg(feature = "std")]
117#[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
118pub struct BackgroundProcessor {
119	stop_thread: Arc<AtomicBool>,
120	thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
121}
122
123#[cfg(not(test))]
124const FRESHNESS_TIMER: Duration = Duration::from_secs(60);
125#[cfg(test)]
126const FRESHNESS_TIMER: Duration = Duration::from_secs(1);
127
128#[cfg(all(not(test), not(debug_assertions)))]
129const PING_TIMER: Duration = Duration::from_secs(10);
130/// Signature operations take a lot longer without compiler optimisations.
131/// Increasing the ping timer allows for this but slower devices will be disconnected if the
132/// timeout is reached.
133#[cfg(all(not(test), debug_assertions))]
134const PING_TIMER: Duration = Duration::from_secs(30);
135#[cfg(test)]
136const PING_TIMER: Duration = Duration::from_secs(1);
137
138#[cfg(not(test))]
139const ONION_MESSAGE_HANDLER_TIMER: Duration = Duration::from_secs(10);
140#[cfg(test)]
141const ONION_MESSAGE_HANDLER_TIMER: Duration = Duration::from_secs(1);
142
143/// Prune the network graph of stale entries hourly.
144const NETWORK_PRUNE_TIMER: Duration = Duration::from_secs(60 * 60);
145
146#[cfg(not(test))]
147const SCORER_PERSIST_TIMER: Duration = Duration::from_secs(60 * 5);
148#[cfg(test)]
149const SCORER_PERSIST_TIMER: Duration = Duration::from_secs(1);
150
151#[cfg(not(test))]
152const FIRST_NETWORK_PRUNE_TIMER: Duration = Duration::from_secs(60);
153#[cfg(test)]
154const FIRST_NETWORK_PRUNE_TIMER: Duration = Duration::from_secs(1);
155
156#[cfg(not(test))]
157const REBROADCAST_TIMER: Duration = Duration::from_secs(30);
158#[cfg(test)]
159const REBROADCAST_TIMER: Duration = Duration::from_secs(1);
160
161#[cfg(not(test))]
162const SWEEPER_TIMER: Duration = Duration::from_secs(30);
163#[cfg(test)]
164const SWEEPER_TIMER: Duration = Duration::from_secs(1);
165
166/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
167const fn min_duration(a: Duration, b: Duration) -> Duration {
168	if a.as_nanos() < b.as_nanos() {
169		a
170	} else {
171		b
172	}
173}
174const FASTEST_TIMER: Duration = min_duration(
175	min_duration(FRESHNESS_TIMER, PING_TIMER),
176	min_duration(SCORER_PERSIST_TIMER, min_duration(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)),
177);
178
179/// Either [`P2PGossipSync`] or [`RapidGossipSync`].
180pub enum GossipSync<
181	P: Deref<Target = P2PGossipSync<G, U, L>>,
182	R: Deref<Target = RapidGossipSync<G, L>>,
183	G: Deref<Target = NetworkGraph<L>>,
184	U: Deref,
185	L: Deref,
186> where
187	U::Target: UtxoLookup,
188	L::Target: Logger,
189{
190	/// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
191	P2P(P),
192	/// Rapid gossip sync from a trusted server.
193	Rapid(R),
194	/// No gossip sync.
195	None,
196}
197
198impl<
199		P: Deref<Target = P2PGossipSync<G, U, L>>,
200		R: Deref<Target = RapidGossipSync<G, L>>,
201		G: Deref<Target = NetworkGraph<L>>,
202		U: Deref,
203		L: Deref,
204	> GossipSync<P, R, G, U, L>
205where
206	U::Target: UtxoLookup,
207	L::Target: Logger,
208{
209	fn network_graph(&self) -> Option<&G> {
210		match self {
211			GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
212			GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
213			GossipSync::None => None,
214		}
215	}
216
217	fn prunable_network_graph(&self) -> Option<&G> {
218		match self {
219			GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
220			GossipSync::Rapid(gossip_sync) => {
221				if gossip_sync.is_initial_sync_complete() {
222					Some(gossip_sync.network_graph())
223				} else {
224					None
225				}
226			},
227			GossipSync::None => None,
228		}
229	}
230}
231
232/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
233impl<
234		P: Deref<Target = P2PGossipSync<G, U, L>>,
235		G: Deref<Target = NetworkGraph<L>>,
236		U: Deref,
237		L: Deref,
238	> GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
239where
240	U::Target: UtxoLookup,
241	L::Target: Logger,
242{
243	/// Initializes a new [`GossipSync::P2P`] variant.
244	pub fn p2p(gossip_sync: P) -> Self {
245		GossipSync::P2P(gossip_sync)
246	}
247}
248
249/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
250impl<
251		'a,
252		R: Deref<Target = RapidGossipSync<G, L>>,
253		G: Deref<Target = NetworkGraph<L>>,
254		L: Deref,
255	>
256	GossipSync<
257		&P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
258		R,
259		G,
260		&'a (dyn UtxoLookup + Send + Sync),
261		L,
262	> where
263	L::Target: Logger,
264{
265	/// Initializes a new [`GossipSync::Rapid`] variant.
266	pub fn rapid(gossip_sync: R) -> Self {
267		GossipSync::Rapid(gossip_sync)
268	}
269}
270
271/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
272impl<'a, L: Deref>
273	GossipSync<
274		&P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
275		&RapidGossipSync<&'a NetworkGraph<L>, L>,
276		&'a NetworkGraph<L>,
277		&'a (dyn UtxoLookup + Send + Sync),
278		L,
279	> where
280	L::Target: Logger,
281{
282	/// Initializes a new [`GossipSync::None`] variant.
283	pub fn none() -> Self {
284		GossipSync::None
285	}
286}
287
288fn handle_network_graph_update<L: Deref>(network_graph: &NetworkGraph<L>, event: &Event)
289where
290	L::Target: Logger,
291{
292	if let Event::PaymentPathFailed {
293		failure: PathFailure::OnPath { network_update: Some(ref upd) },
294		..
295	} = event
296	{
297		network_graph.handle_network_update(upd);
298	}
299}
300
301/// Updates scorer based on event and returns whether an update occurred so we can decide whether
302/// to persist.
303fn update_scorer<'a, S: Deref<Target = SC>, SC: 'a + WriteableScore<'a>>(
304	scorer: &'a S, event: &Event, duration_since_epoch: Duration,
305) -> bool {
306	match event {
307		Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
308			let mut score = scorer.write_lock();
309			score.payment_path_failed(path, *scid, duration_since_epoch);
310		},
311		Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
312			// Reached if the destination explicitly failed it back. We treat this as a successful probe
313			// because the payment made it all the way to the destination with sufficient liquidity.
314			let mut score = scorer.write_lock();
315			score.probe_successful(path, duration_since_epoch);
316		},
317		Event::PaymentPathSuccessful { path, .. } => {
318			let mut score = scorer.write_lock();
319			score.payment_path_successful(path, duration_since_epoch);
320		},
321		Event::ProbeSuccessful { path, .. } => {
322			let mut score = scorer.write_lock();
323			score.probe_successful(path, duration_since_epoch);
324		},
325		Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
326			let mut score = scorer.write_lock();
327			score.probe_failed(path, *scid, duration_since_epoch);
328		},
329		_ => return false,
330	}
331	true
332}
333
334#[cfg(all(not(c_bindings), feature = "std"))]
335type ScorerWrapper<T> = std::sync::RwLock<T>;
336
337#[cfg(all(not(c_bindings), not(feature = "std")))]
338type ScorerWrapper<T> = core::cell::RefCell<T>;
339
340#[cfg(not(c_bindings))]
341type DynRouter = lightning::routing::router::DefaultRouter<
342	&'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
343	&'static (dyn Logger + Send + Sync),
344	&'static (dyn EntropySource + Send + Sync),
345	&'static ScorerWrapper<
346		lightning::routing::scoring::ProbabilisticScorer<
347			&'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
348			&'static (dyn Logger + Send + Sync),
349		>,
350	>,
351	lightning::routing::scoring::ProbabilisticScoringFeeParameters,
352	lightning::routing::scoring::ProbabilisticScorer<
353		&'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
354		&'static (dyn Logger + Send + Sync),
355	>,
356>;
357
358#[cfg(not(c_bindings))]
359type DynMessageRouter = lightning::onion_message::messenger::DefaultMessageRouter<
360	&'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
361	&'static (dyn Logger + Send + Sync),
362	&'static (dyn EntropySource + Send + Sync),
363>;
364
365#[cfg(all(not(c_bindings), not(taproot)))]
366type DynSignerProvider = dyn lightning::sign::SignerProvider<EcdsaSigner = lightning::sign::InMemorySigner>
367	+ Send
368	+ Sync;
369
370#[cfg(all(not(c_bindings), taproot))]
371type DynSignerProvider = (dyn lightning::sign::SignerProvider<
372	EcdsaSigner = lightning::sign::InMemorySigner,
373	TaprootSigner = lightning::sign::InMemorySigner,
374> + Send
375     + Sync);
376
377#[cfg(not(c_bindings))]
378type DynChannelManager = lightning::ln::channelmanager::ChannelManager<
379	&'static (dyn chain::Watch<lightning::sign::InMemorySigner> + Send + Sync),
380	&'static (dyn BroadcasterInterface + Send + Sync),
381	&'static (dyn EntropySource + Send + Sync),
382	&'static (dyn lightning::sign::NodeSigner + Send + Sync),
383	&'static DynSignerProvider,
384	&'static (dyn FeeEstimator + Send + Sync),
385	&'static DynRouter,
386	&'static DynMessageRouter,
387	&'static (dyn Logger + Send + Sync),
388>;
389
390/// When initializing a background processor without an onion messenger, this can be used to avoid
391/// specifying a concrete `OnionMessenger` type.
392#[cfg(not(c_bindings))]
393pub const NO_ONION_MESSENGER: Option<
394	Arc<
395		dyn AOnionMessenger<
396				EntropySource = dyn EntropySource + Send + Sync,
397				ES = &(dyn EntropySource + Send + Sync),
398				NodeSigner = dyn lightning::sign::NodeSigner + Send + Sync,
399				NS = &(dyn lightning::sign::NodeSigner + Send + Sync),
400				Logger = dyn Logger + Send + Sync,
401				L = &'static (dyn Logger + Send + Sync),
402				NodeIdLookUp = DynChannelManager,
403				NL = &'static DynChannelManager,
404				MessageRouter = DynMessageRouter,
405				MR = &'static DynMessageRouter,
406				OffersMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
407				OMH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
408				AsyncPaymentsMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
409				APH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
410				DNSResolverMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
411				DRH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
412				CustomOnionMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
413				CMH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
414			> + Send
415			+ Sync,
416	>,
417> = None;
418
419/// When initializing a background processor without a liquidity manager, this can be used to avoid
420/// specifying a concrete `LiquidityManager` type.
421#[cfg(not(c_bindings))]
422pub const NO_LIQUIDITY_MANAGER: Option<
423	Arc<
424		dyn ALiquidityManager<
425				EntropySource = dyn EntropySource + Send + Sync,
426				ES = &(dyn EntropySource + Send + Sync),
427				NodeSigner = dyn lightning::sign::NodeSigner + Send + Sync,
428				NS = &(dyn lightning::sign::NodeSigner + Send + Sync),
429				AChannelManager = DynChannelManager,
430				CM = &DynChannelManager,
431				Filter = dyn chain::Filter + Send + Sync,
432				C = &(dyn chain::Filter + Send + Sync),
433				KVStore = dyn lightning::util::persist::KVStore + Send + Sync,
434				K = &(dyn lightning::util::persist::KVStore + Send + Sync),
435				TimeProvider = dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync,
436				TP = &(dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync),
437				BroadcasterInterface = dyn lightning::chain::chaininterface::BroadcasterInterface
438				                           + Send
439				                           + Sync,
440				T = &(dyn BroadcasterInterface + Send + Sync),
441			> + Send
442			+ Sync,
443	>,
444> = None;
445
446/// When initializing a background processor without a liquidity manager, this can be used to avoid
447/// specifying a concrete `LiquidityManagerSync` type.
448#[cfg(all(not(c_bindings), feature = "std"))]
449pub const NO_LIQUIDITY_MANAGER_SYNC: Option<
450	Arc<
451		dyn ALiquidityManagerSync<
452				EntropySource = dyn EntropySource + Send + Sync,
453				ES = &(dyn EntropySource + Send + Sync),
454				NodeSigner = dyn lightning::sign::NodeSigner + Send + Sync,
455				NS = &(dyn lightning::sign::NodeSigner + Send + Sync),
456				AChannelManager = DynChannelManager,
457				CM = &DynChannelManager,
458				Filter = dyn chain::Filter + Send + Sync,
459				C = &(dyn chain::Filter + Send + Sync),
460				KVStoreSync = dyn lightning::util::persist::KVStoreSync + Send + Sync,
461				KS = &(dyn lightning::util::persist::KVStoreSync + Send + Sync),
462				TimeProvider = dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync,
463				TP = &(dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync),
464				BroadcasterInterface = dyn lightning::chain::chaininterface::BroadcasterInterface
465				                           + Send
466				                           + Sync,
467				T = &(dyn BroadcasterInterface + Send + Sync),
468			> + Send
469			+ Sync,
470	>,
471> = None;
472
473pub(crate) mod futures_util {
474	use core::future::Future;
475	use core::marker::Unpin;
476	use core::pin::Pin;
477	use core::task::{Poll, RawWaker, RawWakerVTable, Waker};
478	pub(crate) struct Selector<
479		A: Future<Output = ()> + Unpin,
480		B: Future<Output = ()> + Unpin,
481		C: Future<Output = ()> + Unpin,
482		D: Future<Output = ()> + Unpin,
483		E: Future<Output = bool> + Unpin,
484	> {
485		pub a: A,
486		pub b: B,
487		pub c: C,
488		pub d: D,
489		pub e: E,
490	}
491
492	pub(crate) enum SelectorOutput {
493		A,
494		B,
495		C,
496		D,
497		E(bool),
498	}
499
500	impl<
501			A: Future<Output = ()> + Unpin,
502			B: Future<Output = ()> + Unpin,
503			C: Future<Output = ()> + Unpin,
504			D: Future<Output = ()> + Unpin,
505			E: Future<Output = bool> + Unpin,
506		> Future for Selector<A, B, C, D, E>
507	{
508		type Output = SelectorOutput;
509		fn poll(
510			mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>,
511		) -> Poll<SelectorOutput> {
512			match Pin::new(&mut self.a).poll(ctx) {
513				Poll::Ready(()) => {
514					return Poll::Ready(SelectorOutput::A);
515				},
516				Poll::Pending => {},
517			}
518			match Pin::new(&mut self.b).poll(ctx) {
519				Poll::Ready(()) => {
520					return Poll::Ready(SelectorOutput::B);
521				},
522				Poll::Pending => {},
523			}
524			match Pin::new(&mut self.c).poll(ctx) {
525				Poll::Ready(()) => {
526					return Poll::Ready(SelectorOutput::C);
527				},
528				Poll::Pending => {},
529			}
530			match Pin::new(&mut self.d).poll(ctx) {
531				Poll::Ready(()) => {
532					return Poll::Ready(SelectorOutput::D);
533				},
534				Poll::Pending => {},
535			}
536			match Pin::new(&mut self.e).poll(ctx) {
537				Poll::Ready(res) => {
538					return Poll::Ready(SelectorOutput::E(res));
539				},
540				Poll::Pending => {},
541			}
542			Poll::Pending
543		}
544	}
545
546	/// A selector that takes a future wrapped in an option that will be polled if it is `Some` and
547	/// will always be pending otherwise.
548	pub(crate) struct OptionalSelector<F: Future<Output = ()> + Unpin> {
549		pub optional_future: Option<F>,
550	}
551
552	impl<F: Future<Output = ()> + Unpin> Future for OptionalSelector<F> {
553		type Output = ();
554		fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
555			match self.optional_future.as_mut() {
556				Some(f) => match Pin::new(f).poll(ctx) {
557					Poll::Ready(()) => {
558						self.optional_future.take();
559						Poll::Ready(())
560					},
561					Poll::Pending => Poll::Pending,
562				},
563				None => Poll::Pending,
564			}
565		}
566	}
567
568	// If we want to poll a future without an async context to figure out if it has completed or
569	// not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
570	// but sadly there's a good bit of boilerplate here.
571	fn dummy_waker_clone(_: *const ()) -> RawWaker {
572		RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)
573	}
574	fn dummy_waker_action(_: *const ()) {}
575
576	const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
577		dummy_waker_clone,
578		dummy_waker_action,
579		dummy_waker_action,
580		dummy_waker_action,
581	);
582	pub(crate) fn dummy_waker() -> Waker {
583		unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) }
584	}
585
586	enum JoinerResult<ERR, F: Future<Output = Result<(), ERR>> + Unpin> {
587		Pending(Option<F>),
588		Ready(Result<(), ERR>),
589	}
590
591	pub(crate) struct Joiner<
592		ERR,
593		A: Future<Output = Result<(), ERR>> + Unpin,
594		B: Future<Output = Result<(), ERR>> + Unpin,
595		C: Future<Output = Result<(), ERR>> + Unpin,
596		D: Future<Output = Result<(), ERR>> + Unpin,
597		E: Future<Output = Result<(), ERR>> + Unpin,
598	> {
599		a: JoinerResult<ERR, A>,
600		b: JoinerResult<ERR, B>,
601		c: JoinerResult<ERR, C>,
602		d: JoinerResult<ERR, D>,
603		e: JoinerResult<ERR, E>,
604	}
605
606	impl<
607			ERR,
608			A: Future<Output = Result<(), ERR>> + Unpin,
609			B: Future<Output = Result<(), ERR>> + Unpin,
610			C: Future<Output = Result<(), ERR>> + Unpin,
611			D: Future<Output = Result<(), ERR>> + Unpin,
612			E: Future<Output = Result<(), ERR>> + Unpin,
613		> Joiner<ERR, A, B, C, D, E>
614	{
615		pub(crate) fn new() -> Self {
616			Self {
617				a: JoinerResult::Pending(None),
618				b: JoinerResult::Pending(None),
619				c: JoinerResult::Pending(None),
620				d: JoinerResult::Pending(None),
621				e: JoinerResult::Pending(None),
622			}
623		}
624
625		pub(crate) fn set_a(&mut self, fut: A) {
626			self.a = JoinerResult::Pending(Some(fut));
627		}
628		pub(crate) fn set_a_res(&mut self, res: Result<(), ERR>) {
629			self.a = JoinerResult::Ready(res);
630		}
631		pub(crate) fn set_b(&mut self, fut: B) {
632			self.b = JoinerResult::Pending(Some(fut));
633		}
634		pub(crate) fn set_c(&mut self, fut: C) {
635			self.c = JoinerResult::Pending(Some(fut));
636		}
637		pub(crate) fn set_d(&mut self, fut: D) {
638			self.d = JoinerResult::Pending(Some(fut));
639		}
640		pub(crate) fn set_e(&mut self, fut: E) {
641			self.e = JoinerResult::Pending(Some(fut));
642		}
643	}
644
645	impl<
646			ERR,
647			A: Future<Output = Result<(), ERR>> + Unpin,
648			B: Future<Output = Result<(), ERR>> + Unpin,
649			C: Future<Output = Result<(), ERR>> + Unpin,
650			D: Future<Output = Result<(), ERR>> + Unpin,
651			E: Future<Output = Result<(), ERR>> + Unpin,
652		> Future for Joiner<ERR, A, B, C, D, E>
653	where
654		Joiner<ERR, A, B, C, D, E>: Unpin,
655	{
656		type Output = [Result<(), ERR>; 5];
657		fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
658			let mut all_complete = true;
659			macro_rules! handle {
660				($val: ident) => {
661					match &mut (self.$val) {
662						JoinerResult::Pending(None) => {
663							self.$val = JoinerResult::Ready(Ok(()));
664						},
665						JoinerResult::<ERR, _>::Pending(Some(ref mut val)) => {
666							match Pin::new(val).poll(ctx) {
667								Poll::Ready(res) => {
668									self.$val = JoinerResult::Ready(res);
669								},
670								Poll::Pending => {
671									all_complete = false;
672								},
673							}
674						},
675						JoinerResult::Ready(_) => {},
676					}
677				};
678			}
679			handle!(a);
680			handle!(b);
681			handle!(c);
682			handle!(d);
683			handle!(e);
684
685			if all_complete {
686				let mut res = [Ok(()), Ok(()), Ok(()), Ok(()), Ok(())];
687				if let JoinerResult::Ready(ref mut val) = &mut self.a {
688					core::mem::swap(&mut res[0], val);
689				}
690				if let JoinerResult::Ready(ref mut val) = &mut self.b {
691					core::mem::swap(&mut res[1], val);
692				}
693				if let JoinerResult::Ready(ref mut val) = &mut self.c {
694					core::mem::swap(&mut res[2], val);
695				}
696				if let JoinerResult::Ready(ref mut val) = &mut self.d {
697					core::mem::swap(&mut res[3], val);
698				}
699				if let JoinerResult::Ready(ref mut val) = &mut self.e {
700					core::mem::swap(&mut res[4], val);
701				}
702				Poll::Ready(res)
703			} else {
704				Poll::Pending
705			}
706		}
707	}
708}
709use core::task;
710use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutput};
711
712/// Processes background events in a future.
713///
714/// `sleeper` should return a future which completes in the given amount of time and returns a
715/// boolean indicating whether the background processing should exit. Once `sleeper` returns a
716/// future which outputs `true`, the loop will exit and this function's future will complete.
717/// The `sleeper` future is free to return early after it has triggered the exit condition.
718///
719#[cfg_attr(
720	feature = "std",
721	doc = " See [`BackgroundProcessor::start`] for information on which actions this handles.\n"
722)]
723/// The `mobile_interruptable_platform` flag should be set if we're currently running on a
724/// mobile device, where we may need to check for interruption of the application regularly. If you
725/// are unsure, you should set the flag, as the performance impact of it is minimal unless there
726/// are hundreds or thousands of simultaneous process calls running.
727///
728/// The `fetch_time` parameter should return the current wall clock time, if one is available. If
729/// no time is available, some features may be disabled, however the node will still operate fine.
730///
731/// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
732/// could setup `process_events_async` like this:
733/// ```
734/// # use lightning::io;
735/// # use lightning::events::ReplayEvent;
736/// # use std::sync::{Arc, RwLock};
737/// # use std::sync::atomic::{AtomicBool, Ordering};
738/// # use std::time::SystemTime;
739/// # use lightning_background_processor::{process_events_async, GossipSync};
740/// # use core::future::Future;
741/// # use core::pin::Pin;
742/// # use lightning_liquidity::utils::time::TimeProvider;
743/// # struct Logger {}
744/// # impl lightning::util::logger::Logger for Logger {
745/// #     fn log(&self, _record: lightning::util::logger::Record) {}
746/// # }
747/// # struct StoreSync {}
748/// # impl lightning::util::persist::KVStoreSync for StoreSync {
749/// #     fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
750/// #     fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>) -> io::Result<()> { Ok(()) }
751/// #     fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
752/// #     fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
753/// # }
754/// # struct Store {}
755/// # impl lightning::util::persist::KVStore for Store {
756/// #     fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
757/// #     fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
758/// #     fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
759/// #     fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> { todo!() }
760/// # }
761/// # use core::time::Duration;
762/// # struct DefaultTimeProvider;
763/// #
764/// # impl TimeProvider for DefaultTimeProvider {
765/// #    fn duration_since_epoch(&self) -> Duration {
766/// #        use std::time::{SystemTime, UNIX_EPOCH};
767/// #        SystemTime::now().duration_since(UNIX_EPOCH).expect("system time before Unix epoch")
768/// #    }
769/// # }
770/// # struct EventHandler {}
771/// # impl EventHandler {
772/// #     async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) }
773/// # }
774/// # #[derive(Eq, PartialEq, Clone, Hash)]
775/// # struct SocketDescriptor {}
776/// # impl lightning::ln::peer_handler::SocketDescriptor for SocketDescriptor {
777/// #     fn send_data(&mut self, _data: &[u8], _continue_read: bool) -> usize { 0 }
778/// #     fn disconnect_socket(&mut self) {}
779/// # }
780/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
781/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
782/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
783/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
784/// # type OnionMessenger<B, F, FE> = lightning::onion_message::messenger::OnionMessenger<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<Logger>, Arc<ChannelManager<B, F, FE>>, Arc<lightning::onion_message::messenger::DefaultMessageRouter<Arc<NetworkGraph>, Arc<Logger>, Arc<lightning::sign::KeysManager>>>, Arc<ChannelManager<B, F, FE>>, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler>;
785/// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>, Arc<Store>, Arc<DefaultTimeProvider>, Arc<B>>;
786/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
787/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, StoreSync>;
788/// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<Store>, Arc<Logger>, Arc<O>>;
789///
790/// # struct Node<
791/// #     B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
792/// #     F: lightning::chain::Filter + Send + Sync + 'static,
793/// #     FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
794/// #     UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
795/// #     D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
796/// #     O: lightning::sign::OutputSpender + Send + Sync + 'static,
797/// # > {
798/// #     peer_manager: Arc<PeerManager<B, F, FE, UL>>,
799/// #     event_handler: Arc<EventHandler>,
800/// #     channel_manager: Arc<ChannelManager<B, F, FE>>,
801/// #     onion_messenger: Arc<OnionMessenger<B, F, FE>>,
802/// #     liquidity_manager: Arc<LiquidityManager<B, F, FE>>,
803/// #     chain_monitor: Arc<ChainMonitor<B, F, FE>>,
804/// #     gossip_sync: Arc<P2PGossipSync<UL>>,
805/// #     persister: Arc<Store>,
806/// #     logger: Arc<Logger>,
807/// #     scorer: Arc<Scorer>,
808/// #     sweeper: Arc<OutputSweeper<B, D, FE, F, O>>,
809/// # }
810/// #
811/// # async fn setup_background_processing<
812/// #     B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
813/// #     F: lightning::chain::Filter + Send + Sync + 'static,
814/// #     FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
815/// #     UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
816/// #     D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
817/// #     O: lightning::sign::OutputSpender + Send + Sync + 'static,
818/// # >(node: Node<B, F, FE, UL, D, O>) {
819///	let background_persister = Arc::clone(&node.persister);
820///	let background_event_handler = Arc::clone(&node.event_handler);
821///	let background_chain_mon = Arc::clone(&node.chain_monitor);
822///	let background_chan_man = Arc::clone(&node.channel_manager);
823///	let background_gossip_sync = GossipSync::p2p(Arc::clone(&node.gossip_sync));
824///	let background_peer_man = Arc::clone(&node.peer_manager);
825///	let background_onion_messenger = Arc::clone(&node.onion_messenger);
826///	let background_liquidity_manager = Arc::clone(&node.liquidity_manager);
827///	let background_logger = Arc::clone(&node.logger);
828///	let background_scorer = Arc::clone(&node.scorer);
829///	let background_sweeper = Arc::clone(&node.sweeper);
830///	// Setup the sleeper.
831#[cfg_attr(
832	feature = "std",
833	doc = "	let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());"
834)]
835#[cfg_attr(feature = "std", doc = "")]
836///	let sleeper = move |d| {
837#[cfg_attr(feature = "std", doc = "		let mut receiver = stop_receiver.clone();")]
838///		Box::pin(async move {
839///			tokio::select!{
840///				_ = tokio::time::sleep(d) => false,
841#[cfg_attr(feature = "std", doc = "				_ = receiver.changed() => true,")]
842///			}
843///		})
844///	};
845///
846///	let mobile_interruptable_platform = false;
847///
848#[cfg_attr(feature = "std", doc = "	let handle = tokio::spawn(async move {")]
849#[cfg_attr(
850	not(feature = "std"),
851	doc = "	let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();"
852)]
853#[cfg_attr(not(feature = "std"), doc = "	rt.block_on(async move {")]
854///		process_events_async(
855///			background_persister,
856///			|e| background_event_handler.handle_event(e),
857///			background_chain_mon,
858///			background_chan_man,
859///			Some(background_onion_messenger),
860///			background_gossip_sync,
861///			background_peer_man,
862///			Some(background_liquidity_manager),
863///			Some(background_sweeper),
864///			background_logger,
865///			Some(background_scorer),
866///			sleeper,
867///			mobile_interruptable_platform,
868///			|| Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap())
869///		)
870///		.await
871///		.expect("Failed to process events");
872///	});
873///
874///	// Stop the background processing.
875#[cfg_attr(feature = "std", doc = "	stop_sender.send(()).unwrap();")]
876#[cfg_attr(feature = "std", doc = "	handle.await.unwrap()")]
877///	# }
878///```
879pub async fn process_events_async<
880	'a,
881	UL: Deref,
882	CF: Deref,
883	T: Deref,
884	F: Deref,
885	G: Deref<Target = NetworkGraph<L>>,
886	L: Deref,
887	P: Deref,
888	EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
889	EventHandler: Fn(Event) -> EventHandlerFuture,
890	ES: Deref,
891	M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
892	CM: Deref,
893	OM: Deref,
894	PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
895	RGS: Deref<Target = RapidGossipSync<G, L>>,
896	PM: Deref,
897	LM: Deref,
898	D: Deref,
899	O: Deref,
900	K: Deref,
901	OS: Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
902	S: Deref<Target = SC>,
903	SC: for<'b> WriteableScore<'b>,
904	SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
905	Sleeper: Fn(Duration) -> SleepFuture,
906	FetchTime: Fn() -> Option<Duration>,
907>(
908	kv_store: K, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
909	onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
910	liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
911	sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
912) -> Result<(), lightning::io::Error>
913where
914	UL::Target: UtxoLookup,
915	CF::Target: chain::Filter,
916	T::Target: BroadcasterInterface,
917	F::Target: FeeEstimator,
918	L::Target: Logger,
919	P::Target: Persist<<CM::Target as AChannelManager>::Signer>,
920	ES::Target: EntropySource,
921	CM::Target: AChannelManager,
922	OM::Target: AOnionMessenger,
923	PM::Target: APeerManager,
924	LM::Target: ALiquidityManager,
925	O::Target: OutputSpender,
926	D::Target: ChangeDestinationSource,
927	K::Target: KVStore,
928{
929	let async_event_handler = |event| {
930		let network_graph = gossip_sync.network_graph();
931		let event_handler = &event_handler;
932		let scorer = &scorer;
933		let logger = &logger;
934		let kv_store = &kv_store;
935		let fetch_time = &fetch_time;
936		// We should be able to drop the Box once our MSRV is 1.68
937		Box::pin(async move {
938			if let Some(network_graph) = network_graph {
939				handle_network_graph_update(network_graph, &event)
940			}
941			if let Some(ref scorer) = scorer {
942				if let Some(duration_since_epoch) = fetch_time() {
943					if update_scorer(scorer, &event, duration_since_epoch) {
944						log_trace!(logger, "Persisting scorer after update");
945						if let Err(e) = kv_store
946							.write(
947								SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
948								SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
949								SCORER_PERSISTENCE_KEY,
950								scorer.encode(),
951							)
952							.await
953						{
954							log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
955							// We opt not to abort early on persistence failure here as persisting
956							// the scorer is non-critical and we still hope that it will have
957							// resolved itself when it is potentially critical in event handling
958							// below.
959						}
960					}
961				}
962			}
963			event_handler(event).await
964		})
965	};
966	let mut batch_delay = BatchDelay::new();
967
968	log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
969	channel_manager.get_cm().timer_tick_occurred();
970	log_trace!(logger, "Rebroadcasting monitor's pending claims on startup");
971	chain_monitor.rebroadcast_pending_claims();
972
973	let mut last_freshness_call = sleeper(FRESHNESS_TIMER);
974	let mut last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER);
975	let mut last_ping_call = sleeper(PING_TIMER);
976	let mut last_prune_call = sleeper(FIRST_NETWORK_PRUNE_TIMER);
977	let mut last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER);
978	let mut last_rebroadcast_call = sleeper(REBROADCAST_TIMER);
979	let mut last_sweeper_call = sleeper(SWEEPER_TIMER);
980	let mut have_pruned = false;
981	let mut have_decayed_scorer = false;
982
983	let mut last_forwards_processing_call = sleeper(batch_delay.get());
984
985	loop {
986		channel_manager.get_cm().process_pending_events_async(async_event_handler).await;
987		chain_monitor.process_pending_events_async(async_event_handler).await;
988		if let Some(om) = &onion_messenger {
989			om.get_om().process_pending_events_async(async_event_handler).await
990		}
991
992		// Note that the PeerManager::process_events may block on ChannelManager's locks,
993		// hence it comes last here. When the ChannelManager finishes whatever it's doing,
994		// we want to ensure we get into `persist_manager` as quickly as we can, especially
995		// without running the normal event processing above and handing events to users.
996		//
997		// Specifically, on an *extremely* slow machine, we may see ChannelManager start
998		// processing a message effectively at any point during this loop. In order to
999		// minimize the time between such processing completing and persisting the updated
1000		// ChannelManager, we want to minimize methods blocking on a ChannelManager
1001		// generally, and as a fallback place such blocking only immediately before
1002		// persistence.
1003		peer_manager.as_ref().process_events();
1004		match check_and_reset_sleeper(&mut last_forwards_processing_call, || {
1005			sleeper(batch_delay.next())
1006		}) {
1007			Some(false) => {
1008				channel_manager.get_cm().process_pending_htlc_forwards();
1009			},
1010			Some(true) => break,
1011			None => {},
1012		}
1013
1014		// We wait up to 100ms, but track how long it takes to detect being put to sleep,
1015		// see `await_start`'s use below.
1016		let mut await_start = None;
1017		if mobile_interruptable_platform {
1018			await_start = Some(sleeper(Duration::from_secs(1)));
1019		}
1020		let om_fut = if let Some(om) = onion_messenger.as_ref() {
1021			let fut = om.get_om().get_update_future();
1022			OptionalSelector { optional_future: Some(fut) }
1023		} else {
1024			OptionalSelector { optional_future: None }
1025		};
1026		let lm_fut = if let Some(lm) = liquidity_manager.as_ref() {
1027			let fut = lm.get_lm().get_pending_msgs_or_needs_persist_future();
1028			OptionalSelector { optional_future: Some(fut) }
1029		} else {
1030			OptionalSelector { optional_future: None }
1031		};
1032		let needs_processing = channel_manager.get_cm().needs_pending_htlc_processing();
1033		let sleep_delay = match (needs_processing, mobile_interruptable_platform) {
1034			(true, true) => batch_delay.get().min(Duration::from_millis(100)),
1035			(true, false) => batch_delay.get().min(FASTEST_TIMER),
1036			(false, true) => Duration::from_millis(100),
1037			(false, false) => FASTEST_TIMER,
1038		};
1039		let fut = Selector {
1040			a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
1041			b: chain_monitor.get_update_future(),
1042			c: om_fut,
1043			d: lm_fut,
1044			e: sleeper(sleep_delay),
1045		};
1046		match fut.await {
1047			SelectorOutput::A | SelectorOutput::B | SelectorOutput::C | SelectorOutput::D => {},
1048			SelectorOutput::E(exit) => {
1049				if exit {
1050					break;
1051				}
1052			},
1053		}
1054
1055		let await_slow = if mobile_interruptable_platform {
1056			// Specify a zero new sleeper timeout because we won't use the new sleeper. It is re-initialized in the next
1057			// loop iteration.
1058			match check_and_reset_sleeper(&mut await_start.unwrap(), || sleeper(Duration::ZERO)) {
1059				Some(true) => break,
1060				Some(false) => true,
1061				None => false,
1062			}
1063		} else {
1064			false
1065		};
1066		match check_and_reset_sleeper(&mut last_freshness_call, || sleeper(FRESHNESS_TIMER)) {
1067			Some(false) => {
1068				log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
1069				channel_manager.get_cm().timer_tick_occurred();
1070			},
1071			Some(true) => break,
1072			None => {},
1073		}
1074
1075		let mut futures = Joiner::new();
1076
1077		if channel_manager.get_cm().get_and_clear_needs_persistence() {
1078			log_trace!(logger, "Persisting ChannelManager...");
1079
1080			let fut = async {
1081				kv_store
1082					.write(
1083						CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1084						CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1085						CHANNEL_MANAGER_PERSISTENCE_KEY,
1086						channel_manager.get_cm().encode(),
1087					)
1088					.await
1089			};
1090			// TODO: Once our MSRV is 1.68 we should be able to drop the Box
1091			let mut fut = Box::pin(fut);
1092
1093			// Because persisting the ChannelManager is important to avoid accidental
1094			// force-closures, go ahead and poll the future once before we do slightly more
1095			// CPU-intensive tasks in the form of NetworkGraph pruning or scorer time-stepping
1096			// below. This will get it moving but won't block us for too long if the underlying
1097			// future is actually async.
1098			use core::future::Future;
1099			let mut waker = dummy_waker();
1100			let mut ctx = task::Context::from_waker(&mut waker);
1101			match core::pin::Pin::new(&mut fut).poll(&mut ctx) {
1102				task::Poll::Ready(res) => futures.set_a_res(res),
1103				task::Poll::Pending => futures.set_a(fut),
1104			}
1105
1106			log_trace!(logger, "Done persisting ChannelManager.");
1107		}
1108
1109		// Note that we want to run a graph prune once not long after startup before
1110		// falling back to our usual hourly prunes. This avoids short-lived clients never
1111		// pruning their network graph. We run once 60 seconds after startup before
1112		// continuing our normal cadence. For RGS, since 60 seconds is likely too long,
1113		// we prune after an initial sync completes.
1114		let prune_timer = if gossip_sync.prunable_network_graph().is_some() {
1115			NETWORK_PRUNE_TIMER
1116		} else {
1117			FIRST_NETWORK_PRUNE_TIMER
1118		};
1119		let prune_timer_elapsed = {
1120			match check_and_reset_sleeper(&mut last_prune_call, || sleeper(prune_timer)) {
1121				Some(false) => true,
1122				Some(true) => break,
1123				None => false,
1124			}
1125		};
1126
1127		let should_prune = match gossip_sync {
1128			GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
1129			_ => prune_timer_elapsed,
1130		};
1131		if should_prune {
1132			// The network graph must not be pruned while rapid sync completion is pending
1133			if let Some(network_graph) = gossip_sync.prunable_network_graph() {
1134				if let Some(duration_since_epoch) = fetch_time() {
1135					log_trace!(logger, "Pruning and persisting network graph.");
1136					network_graph.remove_stale_channels_and_tracking_with_time(
1137						duration_since_epoch.as_secs(),
1138					);
1139				} else {
1140					log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually.");
1141					log_trace!(logger, "Persisting network graph.");
1142				}
1143				let fut = async {
1144					if let Err(e) = kv_store
1145						.write(
1146							NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1147							NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1148							NETWORK_GRAPH_PERSISTENCE_KEY,
1149							network_graph.encode(),
1150						)
1151						.await
1152					{
1153						log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
1154					}
1155
1156					Ok(())
1157				};
1158
1159				// TODO: Once our MSRV is 1.68 we should be able to drop the Box
1160				futures.set_b(Box::pin(fut));
1161
1162				have_pruned = true;
1163			}
1164		}
1165		if !have_decayed_scorer {
1166			if let Some(ref scorer) = scorer {
1167				if let Some(duration_since_epoch) = fetch_time() {
1168					log_trace!(logger, "Calling time_passed on scorer at startup");
1169					scorer.write_lock().time_passed(duration_since_epoch);
1170				}
1171			}
1172			have_decayed_scorer = true;
1173		}
1174		match check_and_reset_sleeper(&mut last_scorer_persist_call, || {
1175			sleeper(SCORER_PERSIST_TIMER)
1176		}) {
1177			Some(false) => {
1178				if let Some(ref scorer) = scorer {
1179					if let Some(duration_since_epoch) = fetch_time() {
1180						log_trace!(logger, "Calling time_passed and persisting scorer");
1181						scorer.write_lock().time_passed(duration_since_epoch);
1182					} else {
1183						log_trace!(logger, "Persisting scorer");
1184					}
1185					let fut = async {
1186						if let Err(e) = kv_store
1187							.write(
1188								SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1189								SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1190								SCORER_PERSISTENCE_KEY,
1191								scorer.encode(),
1192							)
1193							.await
1194						{
1195							log_error!(
1196							logger,
1197							"Error: Failed to persist scorer, check your disk and permissions {}",
1198							e
1199						);
1200						}
1201
1202						Ok(())
1203					};
1204
1205					// TODO: Once our MSRV is 1.68 we should be able to drop the Box
1206					futures.set_c(Box::pin(fut));
1207				}
1208			},
1209			Some(true) => break,
1210			None => {},
1211		}
1212		match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) {
1213			Some(false) => {
1214				log_trace!(logger, "Regenerating sweeper spends if necessary");
1215				if let Some(ref sweeper) = sweeper {
1216					let fut = async {
1217						let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
1218
1219						Ok(())
1220					};
1221
1222					// TODO: Once our MSRV is 1.68 we should be able to drop the Box
1223					futures.set_d(Box::pin(fut));
1224				}
1225			},
1226			Some(true) => break,
1227			None => {},
1228		}
1229
1230		if let Some(liquidity_manager) = liquidity_manager.as_ref() {
1231			log_trace!(logger, "Persisting LiquidityManager...");
1232			let fut = async {
1233				liquidity_manager.get_lm().persist().await.map_err(|e| {
1234					log_error!(logger, "Persisting LiquidityManager failed: {}", e);
1235					e
1236				})
1237			};
1238			futures.set_e(Box::pin(fut));
1239		}
1240
1241		// Run persistence tasks in parallel and exit if any of them returns an error.
1242		for res in futures.await {
1243			res?;
1244		}
1245
1246		match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
1247			sleeper(ONION_MESSAGE_HANDLER_TIMER)
1248		}) {
1249			Some(false) => {
1250				if let Some(om) = &onion_messenger {
1251					log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
1252					om.get_om().timer_tick_occurred();
1253				}
1254			},
1255			Some(true) => break,
1256			None => {},
1257		}
1258
1259		// Peer manager timer tick. If we were interrupted on a mobile platform, we disconnect all peers.
1260		if await_slow {
1261			// On various platforms, we may be starved of CPU cycles for several reasons.
1262			// E.g. on iOS, if we've been in the background, we will be entirely paused.
1263			// Similarly, if we're on a desktop platform and the device has been asleep, we
1264			// may not get any cycles.
1265			// We detect this by checking if our max-100ms-sleep, above, ran longer than a
1266			// full second, at which point we assume sockets may have been killed (they
1267			// appear to be at least on some platforms, even if it has only been a second).
1268			// Note that we have to take care to not get here just because user event
1269			// processing was slow at the top of the loop. For example, the sample client
1270			// may call Bitcoin Core RPCs during event handling, which very often takes
1271			// more than a handful of seconds to complete, and shouldn't disconnect all our
1272			// peers.
1273			log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
1274			peer_manager.as_ref().disconnect_all_peers();
1275			last_ping_call = sleeper(PING_TIMER);
1276		} else {
1277			match check_and_reset_sleeper(&mut last_ping_call, || sleeper(PING_TIMER)) {
1278				Some(false) => {
1279					log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
1280					peer_manager.as_ref().timer_tick_occurred();
1281				},
1282				Some(true) => break,
1283				_ => {},
1284			}
1285		}
1286
1287		// Rebroadcast pending claims.
1288		match check_and_reset_sleeper(&mut last_rebroadcast_call, || sleeper(REBROADCAST_TIMER)) {
1289			Some(false) => {
1290				log_trace!(logger, "Rebroadcasting monitor's pending claims");
1291				chain_monitor.rebroadcast_pending_claims();
1292			},
1293			Some(true) => break,
1294			None => {},
1295		}
1296	}
1297	log_trace!(logger, "Terminating background processor.");
1298
1299	// After we exit, ensure we persist the ChannelManager one final time - this avoids
1300	// some races where users quit while channel updates were in-flight, with
1301	// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
1302	kv_store
1303		.write(
1304			CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1305			CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1306			CHANNEL_MANAGER_PERSISTENCE_KEY,
1307			channel_manager.get_cm().encode(),
1308		)
1309		.await?;
1310	if let Some(ref scorer) = scorer {
1311		kv_store
1312			.write(
1313				SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1314				SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1315				SCORER_PERSISTENCE_KEY,
1316				scorer.encode(),
1317			)
1318			.await?;
1319	}
1320	if let Some(network_graph) = gossip_sync.network_graph() {
1321		kv_store
1322			.write(
1323				NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1324				NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1325				NETWORK_GRAPH_PERSISTENCE_KEY,
1326				network_graph.encode(),
1327			)
1328			.await?;
1329	}
1330	Ok(())
1331}
1332
1333fn check_and_reset_sleeper<
1334	SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
1335>(
1336	fut: &mut SleepFuture, mut new_sleeper: impl FnMut() -> SleepFuture,
1337) -> Option<bool> {
1338	let mut waker = dummy_waker();
1339	let mut ctx = task::Context::from_waker(&mut waker);
1340	match core::pin::Pin::new(&mut *fut).poll(&mut ctx) {
1341		task::Poll::Ready(exit) => {
1342			*fut = new_sleeper();
1343			Some(exit)
1344		},
1345		task::Poll::Pending => None,
1346	}
1347}
1348
1349/// Async events processor that is based on [`process_events_async`] but allows for [`KVStoreSync`] to be used for
1350/// synchronous background persistence.
1351pub async fn process_events_async_with_kv_store_sync<
1352	UL: Deref,
1353	CF: Deref,
1354	T: Deref,
1355	F: Deref,
1356	G: Deref<Target = NetworkGraph<L>>,
1357	L: Deref,
1358	P: Deref,
1359	EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
1360	EventHandler: Fn(Event) -> EventHandlerFuture,
1361	ES: Deref,
1362	M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
1363	CM: Deref,
1364	OM: Deref,
1365	PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
1366	RGS: Deref<Target = RapidGossipSync<G, L>>,
1367	PM: Deref,
1368	LM: Deref,
1369	D: Deref,
1370	O: Deref,
1371	K: Deref,
1372	OS: Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>>,
1373	S: Deref<Target = SC>,
1374	SC: for<'b> WriteableScore<'b>,
1375	SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
1376	Sleeper: Fn(Duration) -> SleepFuture,
1377	FetchTime: Fn() -> Option<Duration>,
1378>(
1379	kv_store: K, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
1380	onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
1381	liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
1382	sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
1383) -> Result<(), lightning::io::Error>
1384where
1385	UL::Target: UtxoLookup,
1386	CF::Target: chain::Filter,
1387	T::Target: BroadcasterInterface,
1388	F::Target: FeeEstimator,
1389	L::Target: Logger,
1390	P::Target: Persist<<CM::Target as AChannelManager>::Signer>,
1391	ES::Target: EntropySource,
1392	CM::Target: AChannelManager,
1393	OM::Target: AOnionMessenger,
1394	PM::Target: APeerManager,
1395	LM::Target: ALiquidityManager,
1396	O::Target: OutputSpender,
1397	D::Target: ChangeDestinationSourceSync,
1398	K::Target: KVStoreSync,
1399{
1400	let kv_store = KVStoreSyncWrapper(kv_store);
1401	process_events_async(
1402		kv_store,
1403		event_handler,
1404		chain_monitor,
1405		channel_manager,
1406		onion_messenger,
1407		gossip_sync,
1408		peer_manager,
1409		liquidity_manager,
1410		sweeper.as_ref().map(|os| os.sweeper_async()),
1411		logger,
1412		scorer,
1413		sleeper,
1414		mobile_interruptable_platform,
1415		fetch_time,
1416	)
1417	.await
1418}
1419
1420#[cfg(feature = "std")]
1421impl BackgroundProcessor {
1422	/// Start a background thread that takes care of responsibilities enumerated in the [top-level
1423	/// documentation].
1424	///
1425	/// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
1426	/// [`KVStoreSync`] returns an error. In case of an error, the error is retrieved by calling
1427	/// either [`join`] or [`stop`].
1428	///
1429	/// # Data Persistence
1430	///
1431	/// [`KVStoreSync`] is responsible for writing out the [`ChannelManager`] to disk, and/or
1432	/// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
1433	/// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
1434	/// provided implementation.
1435	///
1436	/// [`KVStoreSync`] is also responsible for writing out the [`NetworkGraph`] to disk, if
1437	/// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
1438	/// See the `lightning-persister` crate for LDK's provided implementation.
1439	///
1440	/// Typically, users should either implement [`KVStoreSync`] to never return an
1441	/// error or call [`join`] and handle any error that may arise. For the latter case,
1442	/// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
1443	///
1444	/// # Event Handling
1445	///
1446	/// `event_handler` is responsible for handling events that users should be notified of (e.g.,
1447	/// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
1448	/// functionality implemented by other handlers.
1449	/// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
1450	///
1451	/// # Rapid Gossip Sync
1452	///
1453	/// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
1454	/// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
1455	/// until the [`RapidGossipSync`] instance completes its first sync.
1456	///
1457	/// [top-level documentation]: BackgroundProcessor
1458	/// [`join`]: Self::join
1459	/// [`stop`]: Self::stop
1460	/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
1461	/// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
1462	/// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
1463	/// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
1464	pub fn start<
1465		'a,
1466		UL: 'static + Deref,
1467		CF: 'static + Deref,
1468		T: 'static + Deref,
1469		F: 'static + Deref + Send,
1470		G: 'static + Deref<Target = NetworkGraph<L>>,
1471		L: 'static + Deref + Send,
1472		P: 'static + Deref,
1473		EH: 'static + EventHandler + Send,
1474		ES: 'static + Deref + Send,
1475		M: 'static
1476			+ Deref<
1477				Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
1478			>
1479			+ Send
1480			+ Sync,
1481		CM: 'static + Deref + Send,
1482		OM: 'static + Deref + Send,
1483		PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send,
1484		RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
1485		PM: 'static + Deref + Send,
1486		LM: 'static + Deref + Send,
1487		S: 'static + Deref<Target = SC> + Send + Sync,
1488		SC: for<'b> WriteableScore<'b>,
1489		D: 'static + Deref,
1490		O: 'static + Deref,
1491		K: 'static + Deref + Send,
1492		OS: 'static + Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>> + Send,
1493	>(
1494		kv_store: K, event_handler: EH, chain_monitor: M, channel_manager: CM,
1495		onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
1496		liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
1497	) -> Self
1498	where
1499		UL::Target: 'static + UtxoLookup,
1500		CF::Target: 'static + chain::Filter,
1501		T::Target: 'static + BroadcasterInterface,
1502		F::Target: 'static + FeeEstimator,
1503		L::Target: 'static + Logger,
1504		P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1505		ES::Target: 'static + EntropySource,
1506		CM::Target: AChannelManager,
1507		OM::Target: AOnionMessenger,
1508		PM::Target: APeerManager,
1509		LM::Target: ALiquidityManagerSync,
1510		D::Target: ChangeDestinationSourceSync,
1511		O::Target: 'static + OutputSpender,
1512		K::Target: 'static + KVStoreSync,
1513	{
1514		let stop_thread = Arc::new(AtomicBool::new(false));
1515		let stop_thread_clone = Arc::clone(&stop_thread);
1516		let handle = thread::spawn(move || -> Result<(), std::io::Error> {
1517			let event_handler = |event| {
1518				let network_graph = gossip_sync.network_graph();
1519				if let Some(network_graph) = network_graph {
1520					handle_network_graph_update(network_graph, &event)
1521				}
1522				if let Some(ref scorer) = scorer {
1523					use std::time::SystemTime;
1524					let duration_since_epoch = SystemTime::now()
1525						.duration_since(SystemTime::UNIX_EPOCH)
1526						.expect("Time should be sometime after 1970");
1527					if update_scorer(scorer, &event, duration_since_epoch) {
1528						log_trace!(logger, "Persisting scorer after update");
1529						if let Err(e) = kv_store.write(
1530							SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1531							SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1532							SCORER_PERSISTENCE_KEY,
1533							scorer.encode(),
1534						) {
1535							log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
1536						}
1537					}
1538				}
1539				event_handler.handle_event(event)
1540			};
1541			let mut batch_delay = BatchDelay::new();
1542
1543			log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
1544			channel_manager.get_cm().timer_tick_occurred();
1545			log_trace!(logger, "Rebroadcasting monitor's pending claims on startup");
1546			chain_monitor.rebroadcast_pending_claims();
1547
1548			let mut last_freshness_call = Instant::now();
1549			let mut last_onion_message_handler_call = Instant::now();
1550			let mut last_ping_call = Instant::now();
1551			let mut last_prune_call = Instant::now();
1552			let mut last_scorer_persist_call = Instant::now();
1553			let mut last_rebroadcast_call = Instant::now();
1554			let mut last_sweeper_call = Instant::now();
1555			let mut have_pruned = false;
1556			let mut have_decayed_scorer = false;
1557
1558			let mut cur_batch_delay = batch_delay.get();
1559			let mut last_forwards_processing_call = Instant::now();
1560
1561			loop {
1562				channel_manager.get_cm().process_pending_events(&event_handler);
1563				chain_monitor.process_pending_events(&event_handler);
1564				if let Some(om) = &onion_messenger {
1565					om.get_om().process_pending_events(&event_handler)
1566				};
1567
1568				// Note that the PeerManager::process_events may block on ChannelManager's locks,
1569				// hence it comes last here. When the ChannelManager finishes whatever it's doing,
1570				// we want to ensure we get into `persist_manager` as quickly as we can, especially
1571				// without running the normal event processing above and handing events to users.
1572				//
1573				// Specifically, on an *extremely* slow machine, we may see ChannelManager start
1574				// processing a message effectively at any point during this loop. In order to
1575				// minimize the time between such processing completing and persisting the updated
1576				// ChannelManager, we want to minimize methods blocking on a ChannelManager
1577				// generally, and as a fallback place such blocking only immediately before
1578				// persistence.
1579				peer_manager.as_ref().process_events();
1580				if last_forwards_processing_call.elapsed() > cur_batch_delay {
1581					channel_manager.get_cm().process_pending_htlc_forwards();
1582					cur_batch_delay = batch_delay.next();
1583					last_forwards_processing_call = Instant::now();
1584				}
1585				if stop_thread.load(Ordering::Acquire) {
1586					log_trace!(logger, "Terminating background processor.");
1587					break;
1588				}
1589				let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) {
1590					(Some(om), Some(lm)) => Sleeper::from_four_futures(
1591						&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1592						&chain_monitor.get_update_future(),
1593						&om.get_om().get_update_future(),
1594						&lm.get_lm().get_pending_msgs_or_needs_persist_future(),
1595					),
1596					(Some(om), None) => Sleeper::from_three_futures(
1597						&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1598						&chain_monitor.get_update_future(),
1599						&om.get_om().get_update_future(),
1600					),
1601					(None, Some(lm)) => Sleeper::from_three_futures(
1602						&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1603						&chain_monitor.get_update_future(),
1604						&lm.get_lm().get_pending_msgs_or_needs_persist_future(),
1605					),
1606					(None, None) => Sleeper::from_two_futures(
1607						&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1608						&chain_monitor.get_update_future(),
1609					),
1610				};
1611				let batch_delay = if channel_manager.get_cm().needs_pending_htlc_processing() {
1612					batch_delay.get()
1613				} else {
1614					Duration::MAX
1615				};
1616				let fastest_timeout = batch_delay.min(Duration::from_millis(100));
1617				sleeper.wait_timeout(fastest_timeout);
1618				if stop_thread.load(Ordering::Acquire) {
1619					log_trace!(logger, "Terminating background processor.");
1620					break;
1621				}
1622				if last_freshness_call.elapsed() > FRESHNESS_TIMER {
1623					log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
1624					channel_manager.get_cm().timer_tick_occurred();
1625					last_freshness_call = Instant::now();
1626				}
1627				if channel_manager.get_cm().get_and_clear_needs_persistence() {
1628					log_trace!(logger, "Persisting ChannelManager...");
1629					(kv_store.write(
1630						CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1631						CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1632						CHANNEL_MANAGER_PERSISTENCE_KEY,
1633						channel_manager.get_cm().encode(),
1634					))?;
1635					log_trace!(logger, "Done persisting ChannelManager.");
1636				}
1637
1638				if let Some(liquidity_manager) = liquidity_manager.as_ref() {
1639					log_trace!(logger, "Persisting LiquidityManager...");
1640					let _ = liquidity_manager.get_lm().persist().map_err(|e| {
1641						log_error!(logger, "Persisting LiquidityManager failed: {}", e);
1642					});
1643				}
1644
1645				// Note that we want to run a graph prune once not long after startup before
1646				// falling back to our usual hourly prunes. This avoids short-lived clients never
1647				// pruning their network graph. We run once 60 seconds after startup before
1648				// continuing our normal cadence. For RGS, since 60 seconds is likely too long,
1649				// we prune after an initial sync completes.
1650				let prune_timer =
1651					if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
1652				let prune_timer_elapsed = last_prune_call.elapsed() > prune_timer;
1653				let should_prune = match gossip_sync {
1654					GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
1655					_ => prune_timer_elapsed,
1656				};
1657				if should_prune {
1658					// The network graph must not be pruned while rapid sync completion is pending
1659					if let Some(network_graph) = gossip_sync.prunable_network_graph() {
1660						let duration_since_epoch = std::time::SystemTime::now()
1661							.duration_since(std::time::SystemTime::UNIX_EPOCH)
1662							.expect("Time should be sometime after 1970");
1663
1664						log_trace!(logger, "Pruning and persisting network graph.");
1665						network_graph.remove_stale_channels_and_tracking_with_time(
1666							duration_since_epoch.as_secs(),
1667						);
1668						if let Err(e) = kv_store.write(
1669							NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1670							NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1671							NETWORK_GRAPH_PERSISTENCE_KEY,
1672							network_graph.encode(),
1673						) {
1674							log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e);
1675						}
1676						have_pruned = true;
1677					}
1678					last_prune_call = Instant::now();
1679				}
1680				if !have_decayed_scorer {
1681					if let Some(ref scorer) = scorer {
1682						let duration_since_epoch = std::time::SystemTime::now()
1683							.duration_since(std::time::SystemTime::UNIX_EPOCH)
1684							.expect("Time should be sometime after 1970");
1685						log_trace!(logger, "Calling time_passed on scorer at startup");
1686						scorer.write_lock().time_passed(duration_since_epoch);
1687					}
1688					have_decayed_scorer = true;
1689				}
1690				if last_scorer_persist_call.elapsed() > SCORER_PERSIST_TIMER {
1691					if let Some(ref scorer) = scorer {
1692						let duration_since_epoch = std::time::SystemTime::now()
1693							.duration_since(std::time::SystemTime::UNIX_EPOCH)
1694							.expect("Time should be sometime after 1970");
1695						log_trace!(logger, "Calling time_passed and persisting scorer");
1696						scorer.write_lock().time_passed(duration_since_epoch);
1697						if let Err(e) = kv_store.write(
1698							SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1699							SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1700							SCORER_PERSISTENCE_KEY,
1701							scorer.encode(),
1702						) {
1703							log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
1704						}
1705					}
1706					last_scorer_persist_call = Instant::now();
1707				}
1708				if last_sweeper_call.elapsed() > SWEEPER_TIMER {
1709					log_trace!(logger, "Regenerating sweeper spends if necessary");
1710					if let Some(ref sweeper) = sweeper {
1711						let _ = sweeper.regenerate_and_broadcast_spend_if_necessary();
1712					}
1713					last_sweeper_call = Instant::now();
1714				}
1715				if last_onion_message_handler_call.elapsed() > ONION_MESSAGE_HANDLER_TIMER {
1716					if let Some(om) = &onion_messenger {
1717						log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
1718						om.get_om().timer_tick_occurred();
1719					}
1720					last_onion_message_handler_call = Instant::now();
1721				}
1722				if last_ping_call.elapsed() > PING_TIMER {
1723					log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
1724					peer_manager.as_ref().timer_tick_occurred();
1725					last_ping_call = Instant::now();
1726				}
1727				if last_rebroadcast_call.elapsed() > REBROADCAST_TIMER {
1728					log_trace!(logger, "Rebroadcasting monitor's pending claims");
1729					chain_monitor.rebroadcast_pending_claims();
1730					last_rebroadcast_call = Instant::now();
1731				}
1732			}
1733
1734			// After we exit, ensure we persist the ChannelManager one final time - this avoids
1735			// some races where users quit while channel updates were in-flight, with
1736			// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
1737			kv_store.write(
1738				CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1739				CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1740				CHANNEL_MANAGER_PERSISTENCE_KEY,
1741				channel_manager.get_cm().encode(),
1742			)?;
1743			if let Some(ref scorer) = scorer {
1744				kv_store.write(
1745					SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1746					SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1747					SCORER_PERSISTENCE_KEY,
1748					scorer.encode(),
1749				)?;
1750			}
1751			if let Some(network_graph) = gossip_sync.network_graph() {
1752				kv_store.write(
1753					NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1754					NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1755					NETWORK_GRAPH_PERSISTENCE_KEY,
1756					network_graph.encode(),
1757				)?;
1758			}
1759			Ok(())
1760		});
1761		Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
1762	}
1763
1764	/// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
1765	/// [`ChannelManager`].
1766	///
1767	/// # Panics
1768	///
1769	/// This function panics if the background thread has panicked such as while persisting or
1770	/// handling events.
1771	///
1772	/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
1773	pub fn join(mut self) -> Result<(), std::io::Error> {
1774		assert!(self.thread_handle.is_some());
1775		self.join_thread()
1776	}
1777
1778	/// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
1779	/// [`ChannelManager`].
1780	///
1781	/// # Panics
1782	///
1783	/// This function panics if the background thread has panicked such as while persisting or
1784	/// handling events.
1785	///
1786	/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
1787	pub fn stop(mut self) -> Result<(), std::io::Error> {
1788		assert!(self.thread_handle.is_some());
1789		self.stop_and_join_thread()
1790	}
1791
1792	fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
1793		self.stop_thread.store(true, Ordering::Release);
1794		self.join_thread()
1795	}
1796
1797	fn join_thread(&mut self) -> Result<(), std::io::Error> {
1798		match self.thread_handle.take() {
1799			Some(handle) => handle.join().unwrap(),
1800			None => Ok(()),
1801		}
1802	}
1803}
1804
1805#[cfg(feature = "std")]
1806impl Drop for BackgroundProcessor {
1807	fn drop(&mut self) {
1808		self.stop_and_join_thread().unwrap();
1809	}
1810}
1811
1812#[cfg(all(feature = "std", test))]
1813mod tests {
1814	use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
1815	use bitcoin::constants::{genesis_block, ChainHash};
1816	use bitcoin::hashes::Hash;
1817	use bitcoin::locktime::absolute::LockTime;
1818	use bitcoin::network::Network;
1819	use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
1820	use bitcoin::transaction::Version;
1821	use bitcoin::transaction::{Transaction, TxOut};
1822	use bitcoin::{Amount, ScriptBuf, Txid};
1823	use core::sync::atomic::{AtomicBool, Ordering};
1824	use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
1825	use lightning::chain::transaction::OutPoint;
1826	use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1827	use lightning::events::{Event, PathFailure, ReplayEvent};
1828	use lightning::ln::channelmanager;
1829	use lightning::ln::channelmanager::{
1830		ChainParameters, PaymentId, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA,
1831	};
1832	use lightning::ln::functional_test_utils::*;
1833	use lightning::ln::msgs::{BaseMessageHandler, ChannelMessageHandler, Init, MessageSendEvent};
1834	use lightning::ln::peer_handler::{
1835		IgnoringMessageHandler, MessageHandler, PeerManager, SocketDescriptor,
1836	};
1837	use lightning::ln::types::ChannelId;
1838	use lightning::onion_message::messenger::{DefaultMessageRouter, OnionMessenger};
1839	use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
1840	use lightning::routing::router::{CandidateRouteHop, DefaultRouter, Path, RouteHop};
1841	use lightning::routing::scoring::{ChannelUsage, LockableScore, ScoreLookUp, ScoreUpdate};
1842	use lightning::sign::{ChangeDestinationSourceSync, InMemorySigner, KeysManager, NodeSigner};
1843	use lightning::types::features::{ChannelFeatures, NodeFeatures};
1844	use lightning::types::payment::PaymentHash;
1845	use lightning::util::config::UserConfig;
1846	use lightning::util::persist::{
1847		KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
1848		CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1849		CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
1850		NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1851		SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1852		SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1853	};
1854	use lightning::util::ser::Writeable;
1855	use lightning::util::sweep::{
1856		OutputSpendStatus, OutputSweeper, OutputSweeperSync, PRUNE_DELAY_BLOCKS,
1857	};
1858	use lightning::util::test_utils;
1859	use lightning::{get_event, get_event_msg};
1860	use lightning_liquidity::utils::time::DefaultTimeProvider;
1861	use lightning_liquidity::{ALiquidityManagerSync, LiquidityManager, LiquidityManagerSync};
1862	use lightning_persister::fs_store::FilesystemStore;
1863	use lightning_rapid_gossip_sync::RapidGossipSync;
1864	use std::collections::VecDeque;
1865	use std::path::PathBuf;
1866	use std::sync::mpsc::SyncSender;
1867	use std::sync::Arc;
1868	use std::time::Duration;
1869	use std::{env, fs};
1870
1871	const EVENT_DEADLINE: Duration =
1872		Duration::from_millis(5 * (FRESHNESS_TIMER.as_millis() as u64));
1873
1874	#[derive(Clone, Hash, PartialEq, Eq)]
1875	struct TestDescriptor {}
1876	impl SocketDescriptor for TestDescriptor {
1877		fn send_data(&mut self, _data: &[u8], _continue_read: bool) -> usize {
1878			0
1879		}
1880
1881		fn disconnect_socket(&mut self) {}
1882	}
1883
1884	#[cfg(c_bindings)]
1885	type LockingWrapper<T> = lightning::routing::scoring::MultiThreadedLockableScore<T>;
1886	#[cfg(not(c_bindings))]
1887	type LockingWrapper<T> = std::sync::Mutex<T>;
1888
1889	type ChannelManager = channelmanager::ChannelManager<
1890		Arc<ChainMonitor>,
1891		Arc<test_utils::TestBroadcaster>,
1892		Arc<KeysManager>,
1893		Arc<KeysManager>,
1894		Arc<KeysManager>,
1895		Arc<test_utils::TestFeeEstimator>,
1896		Arc<
1897			DefaultRouter<
1898				Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1899				Arc<test_utils::TestLogger>,
1900				Arc<KeysManager>,
1901				Arc<LockingWrapper<TestScorer>>,
1902				(),
1903				TestScorer,
1904			>,
1905		>,
1906		Arc<
1907			DefaultMessageRouter<
1908				Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1909				Arc<test_utils::TestLogger>,
1910				Arc<KeysManager>,
1911			>,
1912		>,
1913		Arc<test_utils::TestLogger>,
1914	>;
1915
1916	type ChainMonitor = chainmonitor::ChainMonitor<
1917		InMemorySigner,
1918		Arc<test_utils::TestChainSource>,
1919		Arc<test_utils::TestBroadcaster>,
1920		Arc<test_utils::TestFeeEstimator>,
1921		Arc<test_utils::TestLogger>,
1922		Arc<Persister>,
1923		Arc<KeysManager>,
1924	>;
1925
1926	type PGS = Arc<
1927		P2PGossipSync<
1928			Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1929			Arc<test_utils::TestChainSource>,
1930			Arc<test_utils::TestLogger>,
1931		>,
1932	>;
1933	type RGS = Arc<
1934		RapidGossipSync<
1935			Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1936			Arc<test_utils::TestLogger>,
1937		>,
1938	>;
1939
1940	type OM = OnionMessenger<
1941		Arc<KeysManager>,
1942		Arc<KeysManager>,
1943		Arc<test_utils::TestLogger>,
1944		Arc<ChannelManager>,
1945		Arc<
1946			DefaultMessageRouter<
1947				Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1948				Arc<test_utils::TestLogger>,
1949				Arc<KeysManager>,
1950			>,
1951		>,
1952		IgnoringMessageHandler,
1953		Arc<ChannelManager>,
1954		IgnoringMessageHandler,
1955		IgnoringMessageHandler,
1956	>;
1957
1958	type LM = LiquidityManagerSync<
1959		Arc<KeysManager>,
1960		Arc<KeysManager>,
1961		Arc<ChannelManager>,
1962		Arc<dyn Filter + Sync + Send>,
1963		Arc<Persister>,
1964		DefaultTimeProvider,
1965		Arc<test_utils::TestBroadcaster>,
1966	>;
1967
1968	struct Node {
1969		node: Arc<ChannelManager>,
1970		messenger: Arc<OM>,
1971		p2p_gossip_sync: PGS,
1972		rapid_gossip_sync: RGS,
1973		peer_manager: Arc<
1974			PeerManager<
1975				TestDescriptor,
1976				Arc<test_utils::TestChannelMessageHandler>,
1977				Arc<test_utils::TestRoutingMessageHandler>,
1978				Arc<OM>,
1979				Arc<test_utils::TestLogger>,
1980				IgnoringMessageHandler,
1981				Arc<KeysManager>,
1982				IgnoringMessageHandler,
1983			>,
1984		>,
1985		liquidity_manager: Arc<LM>,
1986		chain_monitor: Arc<ChainMonitor>,
1987		kv_store: Arc<Persister>,
1988		tx_broadcaster: Arc<test_utils::TestBroadcaster>,
1989		network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1990		logger: Arc<test_utils::TestLogger>,
1991		best_block: BestBlock,
1992		scorer: Arc<LockingWrapper<TestScorer>>,
1993		sweeper: Arc<
1994			OutputSweeperSync<
1995				Arc<test_utils::TestBroadcaster>,
1996				Arc<TestWallet>,
1997				Arc<test_utils::TestFeeEstimator>,
1998				Arc<test_utils::TestChainSource>,
1999				Arc<Persister>,
2000				Arc<test_utils::TestLogger>,
2001				Arc<KeysManager>,
2002			>,
2003		>,
2004	}
2005
2006	impl Node {
2007		fn p2p_gossip_sync(
2008			&self,
2009		) -> GossipSync<
2010			PGS,
2011			RGS,
2012			Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
2013			Arc<test_utils::TestChainSource>,
2014			Arc<test_utils::TestLogger>,
2015		> {
2016			GossipSync::P2P(Arc::clone(&self.p2p_gossip_sync))
2017		}
2018
2019		fn rapid_gossip_sync(
2020			&self,
2021		) -> GossipSync<
2022			PGS,
2023			RGS,
2024			Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
2025			Arc<test_utils::TestChainSource>,
2026			Arc<test_utils::TestLogger>,
2027		> {
2028			GossipSync::Rapid(Arc::clone(&self.rapid_gossip_sync))
2029		}
2030
2031		fn no_gossip_sync(
2032			&self,
2033		) -> GossipSync<
2034			PGS,
2035			RGS,
2036			Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
2037			Arc<test_utils::TestChainSource>,
2038			Arc<test_utils::TestLogger>,
2039		> {
2040			GossipSync::None
2041		}
2042	}
2043
2044	impl Drop for Node {
2045		fn drop(&mut self) {
2046			let data_dir = self.kv_store.get_data_dir();
2047			match fs::remove_dir_all(data_dir.clone()) {
2048				Err(e) => {
2049					println!("Failed to remove test store directory {}: {}", data_dir.display(), e)
2050				},
2051				_ => {},
2052			}
2053		}
2054	}
2055
2056	struct Persister {
2057		graph_error: Option<(std::io::ErrorKind, &'static str)>,
2058		graph_persistence_notifier: Option<SyncSender<()>>,
2059		manager_error: Option<(std::io::ErrorKind, &'static str)>,
2060		scorer_error: Option<(std::io::ErrorKind, &'static str)>,
2061		kv_store: FilesystemStore,
2062	}
2063
2064	impl Persister {
2065		fn new(data_dir: PathBuf) -> Self {
2066			let kv_store = FilesystemStore::new(data_dir);
2067			Self {
2068				graph_error: None,
2069				graph_persistence_notifier: None,
2070				manager_error: None,
2071				scorer_error: None,
2072				kv_store,
2073			}
2074		}
2075
2076		fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
2077			Self { graph_error: Some((error, message)), ..self }
2078		}
2079
2080		fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
2081			Self { graph_persistence_notifier: Some(sender), ..self }
2082		}
2083
2084		fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
2085			Self { manager_error: Some((error, message)), ..self }
2086		}
2087
2088		fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
2089			Self { scorer_error: Some((error, message)), ..self }
2090		}
2091
2092		pub fn get_data_dir(&self) -> PathBuf {
2093			self.kv_store.get_data_dir()
2094		}
2095	}
2096
2097	impl KVStoreSync for Persister {
2098		fn read(
2099			&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
2100		) -> lightning::io::Result<Vec<u8>> {
2101			self.kv_store.read(primary_namespace, secondary_namespace, key)
2102		}
2103
2104		fn write(
2105			&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
2106		) -> lightning::io::Result<()> {
2107			if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE
2108				&& secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE
2109				&& key == CHANNEL_MANAGER_PERSISTENCE_KEY
2110			{
2111				if let Some((error, message)) = self.manager_error {
2112					return Err(std::io::Error::new(error, message).into());
2113				}
2114			}
2115
2116			if primary_namespace == NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE
2117				&& secondary_namespace == NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE
2118				&& key == NETWORK_GRAPH_PERSISTENCE_KEY
2119			{
2120				if let Some(sender) = &self.graph_persistence_notifier {
2121					match sender.send(()) {
2122						Ok(()) => {},
2123						Err(std::sync::mpsc::SendError(())) => {
2124							println!("Persister failed to notify as receiver went away.")
2125						},
2126					}
2127				};
2128
2129				if let Some((error, message)) = self.graph_error {
2130					return Err(std::io::Error::new(error, message).into());
2131				}
2132			}
2133
2134			if primary_namespace == SCORER_PERSISTENCE_PRIMARY_NAMESPACE
2135				&& secondary_namespace == SCORER_PERSISTENCE_SECONDARY_NAMESPACE
2136				&& key == SCORER_PERSISTENCE_KEY
2137			{
2138				if let Some((error, message)) = self.scorer_error {
2139					return Err(std::io::Error::new(error, message).into());
2140				}
2141			}
2142
2143			self.kv_store.write(primary_namespace, secondary_namespace, key, buf)
2144		}
2145
2146		fn remove(
2147			&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
2148		) -> lightning::io::Result<()> {
2149			self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy)
2150		}
2151
2152		fn list(
2153			&self, primary_namespace: &str, secondary_namespace: &str,
2154		) -> lightning::io::Result<Vec<String>> {
2155			self.kv_store.list(primary_namespace, secondary_namespace)
2156		}
2157	}
2158
2159	struct TestScorer {
2160		event_expectations: Option<VecDeque<TestResult>>,
2161	}
2162
2163	#[derive(Debug)]
2164	enum TestResult {
2165		PaymentFailure { path: Path, short_channel_id: u64 },
2166		PaymentSuccess { path: Path },
2167		ProbeFailure { path: Path },
2168		ProbeSuccess { path: Path },
2169	}
2170
2171	impl TestScorer {
2172		fn new() -> Self {
2173			Self { event_expectations: None }
2174		}
2175
2176		fn expect(&mut self, expectation: TestResult) {
2177			self.event_expectations.get_or_insert_with(VecDeque::new).push_back(expectation);
2178		}
2179	}
2180
2181	impl lightning::util::ser::Writeable for TestScorer {
2182		fn write<W: lightning::util::ser::Writer>(
2183			&self, _: &mut W,
2184		) -> Result<(), lightning::io::Error> {
2185			Ok(())
2186		}
2187	}
2188
2189	impl ScoreLookUp for TestScorer {
2190		type ScoreParams = ();
2191		fn channel_penalty_msat(
2192			&self, _candidate: &CandidateRouteHop, _usage: ChannelUsage,
2193			_score_params: &Self::ScoreParams,
2194		) -> u64 {
2195			unimplemented!();
2196		}
2197	}
2198
2199	impl ScoreUpdate for TestScorer {
2200		fn payment_path_failed(
2201			&mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration,
2202		) {
2203			if let Some(expectations) = &mut self.event_expectations {
2204				match expectations.pop_front().unwrap() {
2205					TestResult::PaymentFailure { path, short_channel_id } => {
2206						assert_eq!(actual_path, &path);
2207						assert_eq!(actual_short_channel_id, short_channel_id);
2208					},
2209					TestResult::PaymentSuccess { path } => {
2210						panic!("Unexpected successful payment path: {:?}", path)
2211					},
2212					TestResult::ProbeFailure { path } => {
2213						panic!("Unexpected probe failure: {:?}", path)
2214					},
2215					TestResult::ProbeSuccess { path } => {
2216						panic!("Unexpected probe success: {:?}", path)
2217					},
2218				}
2219			}
2220		}
2221
2222		fn payment_path_successful(&mut self, actual_path: &Path, _: Duration) {
2223			if let Some(expectations) = &mut self.event_expectations {
2224				match expectations.pop_front().unwrap() {
2225					TestResult::PaymentFailure { path, .. } => {
2226						panic!("Unexpected payment path failure: {:?}", path)
2227					},
2228					TestResult::PaymentSuccess { path } => {
2229						assert_eq!(actual_path, &path);
2230					},
2231					TestResult::ProbeFailure { path } => {
2232						panic!("Unexpected probe failure: {:?}", path)
2233					},
2234					TestResult::ProbeSuccess { path } => {
2235						panic!("Unexpected probe success: {:?}", path)
2236					},
2237				}
2238			}
2239		}
2240
2241		fn probe_failed(&mut self, actual_path: &Path, _: u64, _: Duration) {
2242			if let Some(expectations) = &mut self.event_expectations {
2243				match expectations.pop_front().unwrap() {
2244					TestResult::PaymentFailure { path, .. } => {
2245						panic!("Unexpected payment path failure: {:?}", path)
2246					},
2247					TestResult::PaymentSuccess { path } => {
2248						panic!("Unexpected payment path success: {:?}", path)
2249					},
2250					TestResult::ProbeFailure { path } => {
2251						assert_eq!(actual_path, &path);
2252					},
2253					TestResult::ProbeSuccess { path } => {
2254						panic!("Unexpected probe success: {:?}", path)
2255					},
2256				}
2257			}
2258		}
2259		fn probe_successful(&mut self, actual_path: &Path, _: Duration) {
2260			if let Some(expectations) = &mut self.event_expectations {
2261				match expectations.pop_front().unwrap() {
2262					TestResult::PaymentFailure { path, .. } => {
2263						panic!("Unexpected payment path failure: {:?}", path)
2264					},
2265					TestResult::PaymentSuccess { path } => {
2266						panic!("Unexpected payment path success: {:?}", path)
2267					},
2268					TestResult::ProbeFailure { path } => {
2269						panic!("Unexpected probe failure: {:?}", path)
2270					},
2271					TestResult::ProbeSuccess { path } => {
2272						assert_eq!(actual_path, &path);
2273					},
2274				}
2275			}
2276		}
2277		fn time_passed(&mut self, _: Duration) {}
2278	}
2279
2280	#[cfg(c_bindings)]
2281	impl lightning::routing::scoring::Score for TestScorer {}
2282
2283	impl Drop for TestScorer {
2284		fn drop(&mut self) {
2285			if std::thread::panicking() {
2286				return;
2287			}
2288
2289			if let Some(event_expectations) = &self.event_expectations {
2290				if !event_expectations.is_empty() {
2291					panic!("Unsatisfied event expectations: {:?}", event_expectations);
2292				}
2293			}
2294		}
2295	}
2296
2297	struct TestWallet {}
2298
2299	impl ChangeDestinationSourceSync for TestWallet {
2300		fn get_change_destination_script(&self) -> Result<ScriptBuf, ()> {
2301			Ok(ScriptBuf::new())
2302		}
2303	}
2304
2305	fn get_full_filepath(filepath: String, filename: String) -> String {
2306		let mut path = PathBuf::from(filepath);
2307		path.push(filename);
2308		path.to_str().unwrap().to_string()
2309	}
2310
2311	fn create_nodes(num_nodes: usize, persist_dir: &str) -> (String, Vec<Node>) {
2312		let persist_temp_path = env::temp_dir().join(persist_dir);
2313		let persist_dir = persist_temp_path.to_string_lossy().to_string();
2314		let network = Network::Bitcoin;
2315		let mut nodes = Vec::new();
2316		for i in 0..num_nodes {
2317			let tx_broadcaster = Arc::new(test_utils::TestBroadcaster::new(network));
2318			let fee_estimator = Arc::new(test_utils::TestFeeEstimator::new(253));
2319			let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
2320			let genesis_block = genesis_block(network);
2321			let network_graph = Arc::new(NetworkGraph::new(network, Arc::clone(&logger)));
2322			let scorer = Arc::new(LockingWrapper::new(TestScorer::new()));
2323			let now = Duration::from_secs(genesis_block.header.time as u64);
2324			let seed = [i as u8; 32];
2325			let keys_manager =
2326				Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true));
2327			let router = Arc::new(DefaultRouter::new(
2328				Arc::clone(&network_graph),
2329				Arc::clone(&logger),
2330				Arc::clone(&keys_manager),
2331				Arc::clone(&scorer),
2332				Default::default(),
2333			));
2334			let msg_router = Arc::new(DefaultMessageRouter::new(
2335				Arc::clone(&network_graph),
2336				Arc::clone(&keys_manager),
2337			));
2338			let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
2339			let kv_store =
2340				Arc::new(Persister::new(format!("{}_persister_{}", &persist_dir, i).into()));
2341			let now = Duration::from_secs(genesis_block.header.time as u64);
2342			let keys_manager =
2343				Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true));
2344			let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(
2345				Some(Arc::clone(&chain_source)),
2346				Arc::clone(&tx_broadcaster),
2347				Arc::clone(&logger),
2348				Arc::clone(&fee_estimator),
2349				Arc::clone(&kv_store),
2350				Arc::clone(&keys_manager),
2351				keys_manager.get_peer_storage_key(),
2352			));
2353			let best_block = BestBlock::from_network(network);
2354			let params = ChainParameters { network, best_block };
2355			let manager = Arc::new(ChannelManager::new(
2356				Arc::clone(&fee_estimator),
2357				Arc::clone(&chain_monitor),
2358				Arc::clone(&tx_broadcaster),
2359				Arc::clone(&router),
2360				Arc::clone(&msg_router),
2361				Arc::clone(&logger),
2362				Arc::clone(&keys_manager),
2363				Arc::clone(&keys_manager),
2364				Arc::clone(&keys_manager),
2365				UserConfig::default(),
2366				params,
2367				genesis_block.header.time,
2368			));
2369			let messenger = Arc::new(OnionMessenger::new(
2370				Arc::clone(&keys_manager),
2371				Arc::clone(&keys_manager),
2372				Arc::clone(&logger),
2373				Arc::clone(&manager),
2374				Arc::clone(&msg_router),
2375				IgnoringMessageHandler {},
2376				Arc::clone(&manager),
2377				IgnoringMessageHandler {},
2378				IgnoringMessageHandler {},
2379			));
2380			let wallet = Arc::new(TestWallet {});
2381			let sweeper = Arc::new(OutputSweeperSync::new(
2382				best_block,
2383				Arc::clone(&tx_broadcaster),
2384				Arc::clone(&fee_estimator),
2385				None::<Arc<test_utils::TestChainSource>>,
2386				Arc::clone(&keys_manager),
2387				wallet,
2388				Arc::clone(&kv_store),
2389				Arc::clone(&logger),
2390			));
2391			let p2p_gossip_sync = Arc::new(P2PGossipSync::new(
2392				Arc::clone(&network_graph),
2393				Some(Arc::clone(&chain_source)),
2394				Arc::clone(&logger),
2395			));
2396			let rapid_gossip_sync =
2397				Arc::new(RapidGossipSync::new(Arc::clone(&network_graph), Arc::clone(&logger)));
2398			let msg_handler = MessageHandler {
2399				chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(
2400					ChainHash::using_genesis_block(Network::Testnet),
2401				)),
2402				route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()),
2403				onion_message_handler: Arc::clone(&messenger),
2404				custom_message_handler: IgnoringMessageHandler {},
2405				send_only_message_handler: IgnoringMessageHandler {},
2406			};
2407			let peer_manager = Arc::new(PeerManager::new(
2408				msg_handler,
2409				0,
2410				&seed,
2411				Arc::clone(&logger),
2412				Arc::clone(&keys_manager),
2413			));
2414			let liquidity_manager = Arc::new(
2415				LiquidityManagerSync::new(
2416					Arc::clone(&keys_manager),
2417					Arc::clone(&keys_manager),
2418					Arc::clone(&manager),
2419					None,
2420					None,
2421					Arc::clone(&kv_store),
2422					Arc::clone(&tx_broadcaster),
2423					None,
2424					None,
2425				)
2426				.unwrap(),
2427			);
2428			let node = Node {
2429				node: manager,
2430				p2p_gossip_sync,
2431				rapid_gossip_sync,
2432				peer_manager,
2433				liquidity_manager,
2434				chain_monitor,
2435				kv_store,
2436				tx_broadcaster,
2437				network_graph,
2438				logger,
2439				best_block,
2440				scorer,
2441				sweeper,
2442				messenger,
2443			};
2444			nodes.push(node);
2445		}
2446
2447		for i in 0..num_nodes {
2448			for j in (i + 1)..num_nodes {
2449				let init_i = Init {
2450					features: nodes[j].node.init_features(),
2451					networks: None,
2452					remote_network_address: None,
2453				};
2454				nodes[i]
2455					.node
2456					.peer_connected(nodes[j].node.get_our_node_id(), &init_i, true)
2457					.unwrap();
2458				let init_j = Init {
2459					features: nodes[i].node.init_features(),
2460					networks: None,
2461					remote_network_address: None,
2462				};
2463				nodes[j]
2464					.node
2465					.peer_connected(nodes[i].node.get_our_node_id(), &init_j, false)
2466					.unwrap();
2467			}
2468		}
2469
2470		(persist_dir, nodes)
2471	}
2472
2473	macro_rules! open_channel {
2474		($node_a: expr, $node_b: expr, $channel_value: expr) => {{
2475			begin_open_channel!($node_a, $node_b, $channel_value);
2476			let events = $node_a.node.get_and_clear_pending_events();
2477			assert_eq!(events.len(), 1);
2478			let (temporary_channel_id, tx) =
2479				handle_funding_generation_ready!(events[0], $channel_value);
2480			$node_a
2481				.node
2482				.funding_transaction_generated(
2483					temporary_channel_id,
2484					$node_b.node.get_our_node_id(),
2485					tx.clone(),
2486				)
2487				.unwrap();
2488			let msg_a = get_event_msg!(
2489				$node_a,
2490				MessageSendEvent::SendFundingCreated,
2491				$node_b.node.get_our_node_id()
2492			);
2493			$node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
2494			get_event!($node_b, Event::ChannelPending);
2495			let msg_b = get_event_msg!(
2496				$node_b,
2497				MessageSendEvent::SendFundingSigned,
2498				$node_a.node.get_our_node_id()
2499			);
2500			$node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
2501			get_event!($node_a, Event::ChannelPending);
2502			tx
2503		}};
2504	}
2505
2506	macro_rules! begin_open_channel {
2507		($node_a: expr, $node_b: expr, $channel_value: expr) => {{
2508			$node_a
2509				.node
2510				.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None, None)
2511				.unwrap();
2512			let msg_a = get_event_msg!(
2513				$node_a,
2514				MessageSendEvent::SendOpenChannel,
2515				$node_b.node.get_our_node_id()
2516			);
2517			$node_b.node.handle_open_channel($node_a.node.get_our_node_id(), &msg_a);
2518			let msg_b = get_event_msg!(
2519				$node_b,
2520				MessageSendEvent::SendAcceptChannel,
2521				$node_a.node.get_our_node_id()
2522			);
2523			$node_a.node.handle_accept_channel($node_b.node.get_our_node_id(), &msg_b);
2524		}};
2525	}
2526
2527	macro_rules! handle_funding_generation_ready {
2528		($event: expr, $channel_value: expr) => {{
2529			match $event {
2530				Event::FundingGenerationReady {
2531					temporary_channel_id,
2532					channel_value_satoshis,
2533					ref output_script,
2534					user_channel_id,
2535					..
2536				} => {
2537					assert_eq!(channel_value_satoshis, $channel_value);
2538					assert_eq!(user_channel_id, 42);
2539
2540					let tx = Transaction {
2541						version: Version::ONE,
2542						lock_time: LockTime::ZERO,
2543						input: Vec::new(),
2544						output: vec![TxOut {
2545							value: Amount::from_sat(channel_value_satoshis),
2546							script_pubkey: output_script.clone(),
2547						}],
2548					};
2549					(temporary_channel_id, tx)
2550				},
2551				_ => panic!("Unexpected event"),
2552			}
2553		}};
2554	}
2555
2556	fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
2557		for i in 1..=depth {
2558			let prev_blockhash = node.best_block.block_hash;
2559			let height = node.best_block.height + 1;
2560			let header = create_dummy_header(prev_blockhash, height);
2561			let txdata = vec![(0, tx)];
2562			node.best_block = BestBlock::new(header.block_hash(), height);
2563			match i {
2564				1 => {
2565					node.node.transactions_confirmed(&header, &txdata, height);
2566					node.chain_monitor.transactions_confirmed(&header, &txdata, height);
2567					node.sweeper.transactions_confirmed(&header, &txdata, height);
2568				},
2569				x if x == depth => {
2570					// We need the TestBroadcaster to know about the new height so that it doesn't think
2571					// we're violating the time lock requirements of transactions broadcasted at that
2572					// point.
2573					let block = (genesis_block(Network::Bitcoin), height);
2574					node.tx_broadcaster.blocks.lock().unwrap().push(block);
2575					node.node.best_block_updated(&header, height);
2576					node.chain_monitor.best_block_updated(&header, height);
2577					node.sweeper.best_block_updated(&header, height);
2578				},
2579				_ => {},
2580			}
2581		}
2582	}
2583
2584	fn advance_chain(node: &mut Node, num_blocks: u32) {
2585		for i in 1..=num_blocks {
2586			let prev_blockhash = node.best_block.block_hash;
2587			let height = node.best_block.height + 1;
2588			let header = create_dummy_header(prev_blockhash, height);
2589			node.best_block = BestBlock::new(header.block_hash(), height);
2590			if i == num_blocks {
2591				// We need the TestBroadcaster to know about the new height so that it doesn't think
2592				// we're violating the time lock requirements of transactions broadcasted at that
2593				// point.
2594				let block = (genesis_block(Network::Bitcoin), height);
2595				node.tx_broadcaster.blocks.lock().unwrap().push(block);
2596				node.node.best_block_updated(&header, height);
2597				node.chain_monitor.best_block_updated(&header, height);
2598				node.sweeper.best_block_updated(&header, height);
2599			}
2600		}
2601	}
2602
2603	fn confirm_transaction(node: &mut Node, tx: &Transaction) {
2604		confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
2605	}
2606
2607	#[test]
2608	fn test_background_processor() {
2609		// Test that when a new channel is created, the ChannelManager needs to be re-persisted with
2610		// updates. Also test that when new updates are available, the manager signals that it needs
2611		// re-persistence and is successfully re-persisted.
2612		let (persist_dir, nodes) = create_nodes(2, "test_background_processor");
2613
2614		// Go through the channel creation process so that each node has something to persist. Since
2615		// open_channel consumes events, it must complete before starting BackgroundProcessor to
2616		// avoid a race with processing events.
2617		let tx = open_channel!(nodes[0], nodes[1], 100000);
2618
2619		// Initiate the background processors to watch each node.
2620		let data_dir = nodes[0].kv_store.get_data_dir();
2621		let persister = Arc::new(Persister::new(data_dir));
2622		let event_handler = |_: _| Ok(());
2623		let bg_processor = BackgroundProcessor::start(
2624			persister,
2625			event_handler,
2626			Arc::clone(&nodes[0].chain_monitor),
2627			Arc::clone(&nodes[0].node),
2628			Some(Arc::clone(&nodes[0].messenger)),
2629			nodes[0].p2p_gossip_sync(),
2630			Arc::clone(&nodes[0].peer_manager),
2631			Some(Arc::clone(&nodes[0].liquidity_manager)),
2632			Some(Arc::clone(&nodes[0].sweeper)),
2633			Arc::clone(&nodes[0].logger),
2634			Some(Arc::clone(&nodes[0].scorer)),
2635		);
2636
2637		macro_rules! check_persisted_data {
2638			($node: expr, $filepath: expr) => {
2639				let mut expected_bytes = Vec::new();
2640				loop {
2641					expected_bytes.clear();
2642					match $node.write(&mut expected_bytes) {
2643						Ok(()) => match std::fs::read($filepath) {
2644							Ok(bytes) => {
2645								if bytes == expected_bytes {
2646									break;
2647								} else {
2648									continue;
2649								}
2650							},
2651							Err(_) => continue,
2652						},
2653						Err(e) => panic!("Unexpected error: {}", e),
2654					}
2655				}
2656			};
2657		}
2658
2659		// Check that the initial channel manager data is persisted as expected.
2660		let filepath =
2661			get_full_filepath(format!("{}_persister_0", &persist_dir), "manager".to_string());
2662		check_persisted_data!(nodes[0].node, filepath.clone());
2663
2664		loop {
2665			if !nodes[0].node.get_event_or_persist_condvar_value() {
2666				break;
2667			}
2668		}
2669
2670		// Force-close the channel.
2671		let error_message = "Channel force-closed";
2672		nodes[0]
2673			.node
2674			.force_close_broadcasting_latest_txn(
2675				&ChannelId::v1_from_funding_outpoint(OutPoint {
2676					txid: tx.compute_txid(),
2677					index: 0,
2678				}),
2679				&nodes[1].node.get_our_node_id(),
2680				error_message.to_string(),
2681			)
2682			.unwrap();
2683
2684		// Check that the force-close updates are persisted.
2685		check_persisted_data!(nodes[0].node, filepath.clone());
2686		loop {
2687			if !nodes[0].node.get_event_or_persist_condvar_value() {
2688				break;
2689			}
2690		}
2691
2692		// Check network graph is persisted
2693		let filepath =
2694			get_full_filepath(format!("{}_persister_0", &persist_dir), "network_graph".to_string());
2695		check_persisted_data!(nodes[0].network_graph, filepath.clone());
2696
2697		// Check scorer is persisted
2698		let filepath =
2699			get_full_filepath(format!("{}_persister_0", &persist_dir), "scorer".to_string());
2700		check_persisted_data!(nodes[0].scorer, filepath.clone());
2701
2702		if !std::thread::panicking() {
2703			bg_processor.stop().unwrap();
2704		}
2705	}
2706
2707	#[test]
2708	fn test_timer_tick_called() {
2709		// Test that:
2710		// - `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`,
2711		// - `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`,
2712		// - `PeerManager::timer_tick_occurred` is called every `PING_TIMER`, and
2713		// - `OnionMessageHandler::timer_tick_occurred` is called every `ONION_MESSAGE_HANDLER_TIMER`.
2714		let (_, nodes) = create_nodes(1, "test_timer_tick_called");
2715		let data_dir = nodes[0].kv_store.get_data_dir();
2716		let persister = Arc::new(Persister::new(data_dir));
2717		let event_handler = |_: _| Ok(());
2718		let bg_processor = BackgroundProcessor::start(
2719			persister,
2720			event_handler,
2721			Arc::clone(&nodes[0].chain_monitor),
2722			Arc::clone(&nodes[0].node),
2723			Some(Arc::clone(&nodes[0].messenger)),
2724			nodes[0].no_gossip_sync(),
2725			Arc::clone(&nodes[0].peer_manager),
2726			Some(Arc::clone(&nodes[0].liquidity_manager)),
2727			Some(Arc::clone(&nodes[0].sweeper)),
2728			Arc::clone(&nodes[0].logger),
2729			Some(Arc::clone(&nodes[0].scorer)),
2730		);
2731		loop {
2732			let log_entries = nodes[0].logger.lines.lock().unwrap();
2733			let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
2734			let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
2735			let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
2736			let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string();
2737			if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some()
2738				&& log_entries.get(&("lightning_background_processor", desired_log_2)).is_some()
2739				&& log_entries.get(&("lightning_background_processor", desired_log_3)).is_some()
2740				&& log_entries.get(&("lightning_background_processor", desired_log_4)).is_some()
2741			{
2742				break;
2743			}
2744		}
2745
2746		if !std::thread::panicking() {
2747			bg_processor.stop().unwrap();
2748		}
2749	}
2750
2751	#[test]
2752	fn test_channel_manager_persist_error() {
2753		// Test that if we encounter an error during manager persistence, the thread panics.
2754		let (_, nodes) = create_nodes(2, "test_persist_error");
2755		open_channel!(nodes[0], nodes[1], 100000);
2756
2757		let data_dir = nodes[0].kv_store.get_data_dir();
2758		let persister = Arc::new(
2759			Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
2760		);
2761		let event_handler = |_: _| Ok(());
2762		let bg_processor = BackgroundProcessor::start(
2763			persister,
2764			event_handler,
2765			Arc::clone(&nodes[0].chain_monitor),
2766			Arc::clone(&nodes[0].node),
2767			Some(Arc::clone(&nodes[0].messenger)),
2768			nodes[0].no_gossip_sync(),
2769			Arc::clone(&nodes[0].peer_manager),
2770			Some(Arc::clone(&nodes[0].liquidity_manager)),
2771			Some(Arc::clone(&nodes[0].sweeper)),
2772			Arc::clone(&nodes[0].logger),
2773			Some(Arc::clone(&nodes[0].scorer)),
2774		);
2775		match bg_processor.join() {
2776			Ok(_) => panic!("Expected error persisting manager"),
2777			Err(e) => {
2778				assert_eq!(e.kind(), std::io::ErrorKind::Other);
2779				assert_eq!(e.get_ref().unwrap().to_string(), "test");
2780			},
2781		}
2782	}
2783
2784	#[tokio::test]
2785	async fn test_channel_manager_persist_error_async() {
2786		// Test that if we encounter an error during manager persistence, the thread panics.
2787		let (_, nodes) = create_nodes(2, "test_persist_error_sync");
2788		open_channel!(nodes[0], nodes[1], 100000);
2789
2790		let data_dir = nodes[0].kv_store.get_data_dir();
2791		let kv_store_sync = Arc::new(
2792			Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
2793		);
2794		let kv_store = KVStoreSyncWrapper(kv_store_sync);
2795
2796		// Yes, you can unsafe { turn off the borrow checker }
2797		let lm_async: &'static LiquidityManager<_, _, _, _, _, _, _> = unsafe {
2798			&*(nodes[0].liquidity_manager.get_lm_async()
2799				as *const LiquidityManager<_, _, _, _, _, _, _>)
2800				as &'static LiquidityManager<_, _, _, _, _, _, _>
2801		};
2802		let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
2803			&*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
2804				as &'static OutputSweeper<_, _, _, _, _, _, _>
2805		};
2806
2807		let bp_future = super::process_events_async(
2808			kv_store,
2809			|_: _| async { Ok(()) },
2810			Arc::clone(&nodes[0].chain_monitor),
2811			Arc::clone(&nodes[0].node),
2812			Some(Arc::clone(&nodes[0].messenger)),
2813			nodes[0].rapid_gossip_sync(),
2814			Arc::clone(&nodes[0].peer_manager),
2815			Some(lm_async),
2816			Some(sweeper_async),
2817			Arc::clone(&nodes[0].logger),
2818			Some(Arc::clone(&nodes[0].scorer)),
2819			move |dur: Duration| {
2820				Box::pin(async move {
2821					tokio::time::sleep(dur).await;
2822					false // Never exit
2823				})
2824			},
2825			false,
2826			|| Some(Duration::ZERO),
2827		);
2828		match bp_future.await {
2829			Ok(_) => panic!("Expected error persisting manager"),
2830			Err(e) => {
2831				assert_eq!(e.kind(), lightning::io::ErrorKind::Other);
2832				assert_eq!(e.get_ref().unwrap().to_string(), "test");
2833			},
2834		}
2835	}
2836
2837	#[test]
2838	fn test_network_graph_persist_error() {
2839		// Test that if we encounter an error during network graph persistence, an error gets returned.
2840		let (_, nodes) = create_nodes(2, "test_persist_network_graph_error");
2841		let data_dir = nodes[0].kv_store.get_data_dir();
2842		let persister =
2843			Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
2844		let event_handler = |_: _| Ok(());
2845		let bg_processor = BackgroundProcessor::start(
2846			persister,
2847			event_handler,
2848			Arc::clone(&nodes[0].chain_monitor),
2849			Arc::clone(&nodes[0].node),
2850			Some(Arc::clone(&nodes[0].messenger)),
2851			nodes[0].p2p_gossip_sync(),
2852			Arc::clone(&nodes[0].peer_manager),
2853			Some(Arc::clone(&nodes[0].liquidity_manager)),
2854			Some(Arc::clone(&nodes[0].sweeper)),
2855			Arc::clone(&nodes[0].logger),
2856			Some(Arc::clone(&nodes[0].scorer)),
2857		);
2858
2859		match bg_processor.stop() {
2860			Ok(_) => panic!("Expected error persisting network graph"),
2861			Err(e) => {
2862				assert_eq!(e.kind(), std::io::ErrorKind::Other);
2863				assert_eq!(e.get_ref().unwrap().to_string(), "test");
2864			},
2865		}
2866	}
2867
2868	#[test]
2869	fn test_scorer_persist_error() {
2870		// Test that if we encounter an error during scorer persistence, an error gets returned.
2871		let (_, nodes) = create_nodes(2, "test_persist_scorer_error");
2872		let data_dir = nodes[0].kv_store.get_data_dir();
2873		let persister =
2874			Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
2875		let event_handler = |_: _| Ok(());
2876		let bg_processor = BackgroundProcessor::start(
2877			persister,
2878			event_handler,
2879			Arc::clone(&nodes[0].chain_monitor),
2880			Arc::clone(&nodes[0].node),
2881			Some(Arc::clone(&nodes[0].messenger)),
2882			nodes[0].no_gossip_sync(),
2883			Arc::clone(&nodes[0].peer_manager),
2884			Some(Arc::clone(&nodes[0].liquidity_manager)),
2885			Some(Arc::clone(&nodes[0].sweeper)),
2886			Arc::clone(&nodes[0].logger),
2887			Some(Arc::clone(&nodes[0].scorer)),
2888		);
2889
2890		match bg_processor.stop() {
2891			Ok(_) => panic!("Expected error persisting scorer"),
2892			Err(e) => {
2893				assert_eq!(e.kind(), std::io::ErrorKind::Other);
2894				assert_eq!(e.get_ref().unwrap().to_string(), "test");
2895			},
2896		}
2897	}
2898
2899	#[test]
2900	fn test_background_event_handling() {
2901		let (_, mut nodes) = create_nodes(2, "test_background_event_handling");
2902		let node_0_id = nodes[0].node.get_our_node_id();
2903		let node_1_id = nodes[1].node.get_our_node_id();
2904
2905		let channel_value = 100000;
2906		let data_dir = nodes[0].kv_store.get_data_dir();
2907		let persister = Arc::new(Persister::new(data_dir.clone()));
2908
2909		// Set up a background event handler for FundingGenerationReady events.
2910		let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
2911		let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
2912		let event_handler = move |event: Event| {
2913			match event {
2914				Event::FundingGenerationReady { .. } => funding_generation_send
2915					.send(handle_funding_generation_ready!(event, channel_value))
2916					.unwrap(),
2917				Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
2918				Event::ChannelReady { .. } => {},
2919				_ => panic!("Unexpected event: {:?}", event),
2920			}
2921			Ok(())
2922		};
2923
2924		let bg_processor = BackgroundProcessor::start(
2925			persister,
2926			event_handler,
2927			Arc::clone(&nodes[0].chain_monitor),
2928			Arc::clone(&nodes[0].node),
2929			Some(Arc::clone(&nodes[0].messenger)),
2930			nodes[0].no_gossip_sync(),
2931			Arc::clone(&nodes[0].peer_manager),
2932			Some(Arc::clone(&nodes[0].liquidity_manager)),
2933			Some(Arc::clone(&nodes[0].sweeper)),
2934			Arc::clone(&nodes[0].logger),
2935			Some(Arc::clone(&nodes[0].scorer)),
2936		);
2937
2938		// Open a channel and check that the FundingGenerationReady event was handled.
2939		begin_open_channel!(nodes[0], nodes[1], channel_value);
2940		let (temporary_channel_id, funding_tx) = funding_generation_recv
2941			.recv_timeout(EVENT_DEADLINE)
2942			.expect("FundingGenerationReady not handled within deadline");
2943		nodes[0]
2944			.node
2945			.funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
2946			.unwrap();
2947		let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
2948		nodes[1].node.handle_funding_created(node_0_id, &msg_0);
2949		get_event!(nodes[1], Event::ChannelPending);
2950		let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
2951		nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
2952		channel_pending_recv
2953			.recv_timeout(EVENT_DEADLINE)
2954			.expect("ChannelPending not handled within deadline");
2955
2956		// Confirm the funding transaction.
2957		confirm_transaction(&mut nodes[0], &funding_tx);
2958		let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, node_1_id);
2959		confirm_transaction(&mut nodes[1], &funding_tx);
2960		let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, node_0_id);
2961		nodes[0].node.handle_channel_ready(node_1_id, &bs_funding);
2962		let _as_channel_update =
2963			get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, node_1_id);
2964		nodes[1].node.handle_channel_ready(node_0_id, &as_funding);
2965		let _bs_channel_update =
2966			get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, node_0_id);
2967		let broadcast_funding =
2968			nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
2969		assert_eq!(broadcast_funding.compute_txid(), funding_tx.compute_txid());
2970		assert!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty());
2971
2972		if !std::thread::panicking() {
2973			bg_processor.stop().unwrap();
2974		}
2975
2976		// Set up a background event handler for SpendableOutputs events.
2977		let (sender, receiver) = std::sync::mpsc::sync_channel(1);
2978		let event_handler = move |event: Event| {
2979			match event {
2980				Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
2981				Event::ChannelReady { .. } => {},
2982				Event::ChannelClosed { .. } => {},
2983				_ => panic!("Unexpected event: {:?}", event),
2984			}
2985			Ok(())
2986		};
2987		let persister = Arc::new(Persister::new(data_dir));
2988		let bg_processor = BackgroundProcessor::start(
2989			persister,
2990			event_handler,
2991			Arc::clone(&nodes[0].chain_monitor),
2992			Arc::clone(&nodes[0].node),
2993			Some(Arc::clone(&nodes[0].messenger)),
2994			nodes[0].no_gossip_sync(),
2995			Arc::clone(&nodes[0].peer_manager),
2996			Some(Arc::clone(&nodes[0].liquidity_manager)),
2997			Some(Arc::clone(&nodes[0].sweeper)),
2998			Arc::clone(&nodes[0].logger),
2999			Some(Arc::clone(&nodes[0].scorer)),
3000		);
3001
3002		// Force close the channel and check that the SpendableOutputs event was handled.
3003		let error_message = "Channel force-closed";
3004		nodes[0]
3005			.node
3006			.force_close_broadcasting_latest_txn(
3007				&nodes[0].node.list_channels()[0].channel_id,
3008				&node_1_id,
3009				error_message.to_string(),
3010			)
3011			.unwrap();
3012		let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
3013		confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
3014
3015		let event =
3016			receiver.recv_timeout(EVENT_DEADLINE).expect("Events not handled within deadline");
3017		match event {
3018			Event::SpendableOutputs { outputs, channel_id } => {
3019				nodes[0]
3020					.sweeper
3021					.track_spendable_outputs(outputs, channel_id, false, Some(153))
3022					.unwrap();
3023			},
3024			_ => panic!("Unexpected event: {:?}", event),
3025		}
3026
3027		// Check we don't generate an initial sweeping tx until we reach the required height.
3028		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3029		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3030		if let Some(sweep_tx_0) = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop() {
3031			assert!(!tracked_output.is_spent_in(&sweep_tx_0));
3032			match tracked_output.status {
3033				OutputSpendStatus::PendingInitialBroadcast { delayed_until_height } => {
3034					assert_eq!(delayed_until_height, Some(153));
3035				},
3036				_ => panic!("Unexpected status"),
3037			}
3038		}
3039
3040		advance_chain(&mut nodes[0], 3);
3041
3042		let tx_broadcaster = Arc::clone(&nodes[0].tx_broadcaster);
3043		let wait_for_sweep_tx = || -> Transaction {
3044			loop {
3045				let sweep_tx = tx_broadcaster.txn_broadcasted.lock().unwrap().pop();
3046				if let Some(sweep_tx) = sweep_tx {
3047					return sweep_tx;
3048				}
3049
3050				std::thread::sleep(Duration::from_millis(10));
3051			}
3052		};
3053
3054		// Check we generate an initial sweeping tx.
3055		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3056		let sweep_tx_0 = wait_for_sweep_tx();
3057		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3058		match tracked_output.status {
3059			OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
3060				assert_eq!(sweep_tx_0.compute_txid(), latest_spending_tx.compute_txid());
3061			},
3062			_ => panic!("Unexpected status"),
3063		}
3064
3065		// Check we regenerate and rebroadcast the sweeping tx each block.
3066		advance_chain(&mut nodes[0], 1);
3067		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3068		let sweep_tx_1 = wait_for_sweep_tx();
3069		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3070		match tracked_output.status {
3071			OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
3072				assert_eq!(sweep_tx_1.compute_txid(), latest_spending_tx.compute_txid());
3073			},
3074			_ => panic!("Unexpected status"),
3075		}
3076		assert_ne!(sweep_tx_0, sweep_tx_1);
3077
3078		advance_chain(&mut nodes[0], 1);
3079		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3080		let sweep_tx_2 = wait_for_sweep_tx();
3081		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3082		match tracked_output.status {
3083			OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
3084				assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
3085			},
3086			_ => panic!("Unexpected status"),
3087		}
3088		assert_ne!(sweep_tx_0, sweep_tx_2);
3089		assert_ne!(sweep_tx_1, sweep_tx_2);
3090
3091		// Check we still track the spendable outputs up to ANTI_REORG_DELAY confirmations.
3092		confirm_transaction_depth(&mut nodes[0], &sweep_tx_2, 5);
3093		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3094		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3095		match tracked_output.status {
3096			OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
3097				assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
3098			},
3099			_ => panic!("Unexpected status"),
3100		}
3101
3102		// Check we still see the transaction as confirmed if we unconfirm any untracked
3103		// transaction. (We previously had a bug that would mark tracked transactions as
3104		// unconfirmed if any transaction at an unknown block height would be unconfirmed.)
3105		let unconf_txid = Txid::from_slice(&[0; 32]).unwrap();
3106		nodes[0].sweeper.transaction_unconfirmed(&unconf_txid);
3107
3108		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3109		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3110		match tracked_output.status {
3111			OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
3112				assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
3113			},
3114			_ => panic!("Unexpected status"),
3115		}
3116
3117		// Check we stop tracking the spendable outputs when one of the txs reaches
3118		// PRUNE_DELAY_BLOCKS confirmations.
3119		confirm_transaction_depth(&mut nodes[0], &sweep_tx_0, PRUNE_DELAY_BLOCKS);
3120		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 0);
3121
3122		if !std::thread::panicking() {
3123			bg_processor.stop().unwrap();
3124		}
3125	}
3126
3127	#[test]
3128	fn test_event_handling_failures_are_replayed() {
3129		let (_, nodes) = create_nodes(2, "test_event_handling_failures_are_replayed");
3130		let channel_value = 100000;
3131		let data_dir = nodes[0].kv_store.get_data_dir();
3132		let persister = Arc::new(Persister::new(data_dir.clone()));
3133
3134		let (first_event_send, first_event_recv) = std::sync::mpsc::sync_channel(1);
3135		let (second_event_send, second_event_recv) = std::sync::mpsc::sync_channel(1);
3136		let should_fail_event_handling = Arc::new(AtomicBool::new(true));
3137		let event_handler = move |event: Event| {
3138			if let Ok(true) = should_fail_event_handling.compare_exchange(
3139				true,
3140				false,
3141				Ordering::Acquire,
3142				Ordering::Relaxed,
3143			) {
3144				first_event_send.send(event).unwrap();
3145				return Err(ReplayEvent());
3146			}
3147
3148			second_event_send.send(event).unwrap();
3149			Ok(())
3150		};
3151
3152		let bg_processor = BackgroundProcessor::start(
3153			persister,
3154			event_handler,
3155			Arc::clone(&nodes[0].chain_monitor),
3156			Arc::clone(&nodes[0].node),
3157			Some(Arc::clone(&nodes[0].messenger)),
3158			nodes[0].no_gossip_sync(),
3159			Arc::clone(&nodes[0].peer_manager),
3160			Some(Arc::clone(&nodes[0].liquidity_manager)),
3161			Some(Arc::clone(&nodes[0].sweeper)),
3162			Arc::clone(&nodes[0].logger),
3163			Some(Arc::clone(&nodes[0].scorer)),
3164		);
3165
3166		begin_open_channel!(nodes[0], nodes[1], channel_value);
3167		assert_eq!(
3168			first_event_recv.recv_timeout(EVENT_DEADLINE).unwrap(),
3169			second_event_recv.recv_timeout(EVENT_DEADLINE).unwrap()
3170		);
3171
3172		if !std::thread::panicking() {
3173			bg_processor.stop().unwrap();
3174		}
3175	}
3176
3177	#[test]
3178	fn test_scorer_persistence() {
3179		let (_, nodes) = create_nodes(2, "test_scorer_persistence");
3180		let data_dir = nodes[0].kv_store.get_data_dir();
3181		let persister = Arc::new(Persister::new(data_dir));
3182		let event_handler = |_: _| Ok(());
3183		let bg_processor = BackgroundProcessor::start(
3184			persister,
3185			event_handler,
3186			Arc::clone(&nodes[0].chain_monitor),
3187			Arc::clone(&nodes[0].node),
3188			Some(Arc::clone(&nodes[0].messenger)),
3189			nodes[0].no_gossip_sync(),
3190			Arc::clone(&nodes[0].peer_manager),
3191			Some(Arc::clone(&nodes[0].liquidity_manager)),
3192			Some(Arc::clone(&nodes[0].sweeper)),
3193			Arc::clone(&nodes[0].logger),
3194			Some(Arc::clone(&nodes[0].scorer)),
3195		);
3196
3197		loop {
3198			let log_entries = nodes[0].logger.lines.lock().unwrap();
3199			let expected_log = "Calling time_passed and persisting scorer".to_string();
3200			if log_entries.get(&("lightning_background_processor", expected_log)).is_some() {
3201				break;
3202			}
3203		}
3204
3205		if !std::thread::panicking() {
3206			bg_processor.stop().unwrap();
3207		}
3208	}
3209
3210	macro_rules! do_test_not_pruning_network_graph_until_graph_sync_completion {
3211		($nodes: expr, $receive: expr, $sleep: expr) => {
3212			let features = ChannelFeatures::empty();
3213			$nodes[0]
3214				.network_graph
3215				.add_channel_from_partial_announcement(
3216					42,
3217					None,
3218					53,
3219					features,
3220					$nodes[0].node.get_our_node_id().into(),
3221					$nodes[1].node.get_our_node_id().into(),
3222				)
3223				.expect("Failed to update channel from partial announcement");
3224			let original_graph_description = $nodes[0].network_graph.to_string();
3225			assert!(original_graph_description.contains("42: features: 0000, node_one:"));
3226			assert_eq!($nodes[0].network_graph.read_only().channels().len(), 1);
3227
3228			loop {
3229				$sleep;
3230				let log_entries = $nodes[0].logger.lines.lock().unwrap();
3231				let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
3232				if *log_entries.get(&("lightning_background_processor", loop_counter)).unwrap_or(&0)
3233					> 1
3234				{
3235					// Wait until the loop has gone around at least twice.
3236					break;
3237				}
3238			}
3239
3240			let initialization_input = vec![
3241				76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99,
3242				247, 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227,
3243				98, 218, 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61,
3244				250, 251, 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6,
3245				67, 2, 36, 125, 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47,
3246				115, 172, 63, 136, 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158,
3247				1, 242, 121, 152, 106, 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95,
3248				65, 3, 83, 185, 58, 138, 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136,
3249				149, 185, 226, 156, 137, 175, 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219,
3250				175, 168, 77, 4, 143, 38, 128, 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2,
3251				27, 0, 0, 0, 1, 0, 0, 255, 2, 68, 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0,
3252				0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232, 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255,
3253				8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0, 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6,
3254				11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
3255			];
3256			$nodes[0]
3257				.rapid_gossip_sync
3258				.update_network_graph_no_std(&initialization_input[..], Some(1642291930))
3259				.unwrap();
3260
3261			// this should have added two channels and pruned the previous one.
3262			assert_eq!($nodes[0].network_graph.read_only().channels().len(), 2);
3263
3264			$receive.expect("Network graph not pruned within deadline");
3265
3266			// all channels should now be pruned
3267			assert_eq!($nodes[0].network_graph.read_only().channels().len(), 0);
3268		};
3269	}
3270
3271	#[test]
3272	fn test_not_pruning_network_graph_until_graph_sync_completion() {
3273		let (sender, receiver) = std::sync::mpsc::sync_channel(1);
3274
3275		let (_, nodes) =
3276			create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion");
3277		let data_dir = nodes[0].kv_store.get_data_dir();
3278		let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
3279
3280		let event_handler = |_: _| Ok(());
3281		let background_processor = BackgroundProcessor::start(
3282			persister,
3283			event_handler,
3284			Arc::clone(&nodes[0].chain_monitor),
3285			Arc::clone(&nodes[0].node),
3286			Some(Arc::clone(&nodes[0].messenger)),
3287			nodes[0].rapid_gossip_sync(),
3288			Arc::clone(&nodes[0].peer_manager),
3289			Some(Arc::clone(&nodes[0].liquidity_manager)),
3290			Some(Arc::clone(&nodes[0].sweeper)),
3291			Arc::clone(&nodes[0].logger),
3292			Some(Arc::clone(&nodes[0].scorer)),
3293		);
3294
3295		do_test_not_pruning_network_graph_until_graph_sync_completion!(
3296			nodes,
3297			receiver.recv_timeout(super::FIRST_NETWORK_PRUNE_TIMER * 5),
3298			std::thread::sleep(Duration::from_millis(1))
3299		);
3300
3301		background_processor.stop().unwrap();
3302	}
3303
3304	#[tokio::test]
3305	async fn test_not_pruning_network_graph_until_graph_sync_completion_async() {
3306		let (sender, receiver) = std::sync::mpsc::sync_channel(1);
3307
3308		let (_, nodes) =
3309			create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
3310		let data_dir = nodes[0].kv_store.get_data_dir();
3311		let kv_store_sync =
3312			Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
3313		let kv_store = KVStoreSyncWrapper(kv_store_sync);
3314
3315		// Yes, you can unsafe { turn off the borrow checker }
3316		let lm_async: &'static LiquidityManager<_, _, _, _, _, _, _> = unsafe {
3317			&*(nodes[0].liquidity_manager.get_lm_async()
3318				as *const LiquidityManager<_, _, _, _, _, _, _>)
3319				as &'static LiquidityManager<_, _, _, _, _, _, _>
3320		};
3321		let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
3322			&*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
3323				as &'static OutputSweeper<_, _, _, _, _, _, _>
3324		};
3325
3326		let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
3327		let bp_future = super::process_events_async(
3328			kv_store,
3329			|_: _| async { Ok(()) },
3330			Arc::clone(&nodes[0].chain_monitor),
3331			Arc::clone(&nodes[0].node),
3332			Some(Arc::clone(&nodes[0].messenger)),
3333			nodes[0].rapid_gossip_sync(),
3334			Arc::clone(&nodes[0].peer_manager),
3335			Some(lm_async),
3336			Some(sweeper_async),
3337			Arc::clone(&nodes[0].logger),
3338			Some(Arc::clone(&nodes[0].scorer)),
3339			move |dur: Duration| {
3340				let mut exit_receiver = exit_receiver.clone();
3341				Box::pin(async move {
3342					tokio::select! {
3343						_ = tokio::time::sleep(dur) => false,
3344						_ = exit_receiver.changed() => true,
3345					}
3346				})
3347			},
3348			false,
3349			|| Some(Duration::from_secs(1696300000)),
3350		);
3351
3352		let t1 = tokio::spawn(bp_future);
3353		let t2 = tokio::spawn(async move {
3354			do_test_not_pruning_network_graph_until_graph_sync_completion!(
3355				nodes,
3356				{
3357					let mut i = 0;
3358					loop {
3359						tokio::time::sleep(super::FIRST_NETWORK_PRUNE_TIMER).await;
3360						if let Ok(()) = receiver.try_recv() {
3361							break Ok::<(), ()>(());
3362						}
3363						assert!(i < 5);
3364						i += 1;
3365					}
3366				},
3367				tokio::time::sleep(Duration::from_millis(1)).await
3368			);
3369			exit_sender.send(()).unwrap();
3370		});
3371		let (r1, r2) = tokio::join!(t1, t2);
3372		r1.unwrap().unwrap();
3373		r2.unwrap()
3374	}
3375
3376	macro_rules! do_test_payment_path_scoring {
3377		($nodes: expr, $receive: expr) => {
3378			// Ensure that we update the scorer when relevant events are processed. In this case, we ensure
3379			// that we update the scorer upon a payment path succeeding (note that the channel must be
3380			// public or else we won't score it).
3381			// A background event handler for FundingGenerationReady events must be hooked up to a
3382			// running background processor.
3383			let scored_scid = 4242;
3384			let secp_ctx = Secp256k1::new();
3385			let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
3386			let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
3387
3388			let path = Path { hops: vec![RouteHop {
3389				pubkey: node_1_id,
3390				node_features: NodeFeatures::empty(),
3391				short_channel_id: scored_scid,
3392				channel_features: ChannelFeatures::empty(),
3393				fee_msat: 0,
3394				cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
3395				maybe_announced_channel: true,
3396			}], blinded_tail: None };
3397
3398			$nodes[0].scorer.write_lock().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
3399			$nodes[0].node.push_pending_event(Event::PaymentPathFailed {
3400				payment_id: None,
3401				payment_hash: PaymentHash([42; 32]),
3402				payment_failed_permanently: false,
3403				failure: PathFailure::OnPath { network_update: None },
3404				path: path.clone(),
3405				short_channel_id: Some(scored_scid),
3406				error_code: None,
3407				error_data: None,
3408				hold_times: Vec::new(),
3409			});
3410			let event = $receive.expect("PaymentPathFailed not handled within deadline");
3411			match event {
3412				Event::PaymentPathFailed { .. } => {},
3413				_ => panic!("Unexpected event"),
3414			}
3415
3416			// Ensure we'll score payments that were explicitly failed back by the destination as
3417			// ProbeSuccess.
3418			$nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
3419			$nodes[0].node.push_pending_event(Event::PaymentPathFailed {
3420				payment_id: None,
3421				payment_hash: PaymentHash([42; 32]),
3422				payment_failed_permanently: true,
3423				failure: PathFailure::OnPath { network_update: None },
3424				path: path.clone(),
3425				short_channel_id: None,
3426				error_code: None,
3427				error_data: None,
3428				hold_times: Vec::new(),
3429			});
3430			let event = $receive.expect("PaymentPathFailed not handled within deadline");
3431			match event {
3432				Event::PaymentPathFailed { .. } => {},
3433				_ => panic!("Unexpected event"),
3434			}
3435
3436			$nodes[0].scorer.write_lock().expect(TestResult::PaymentSuccess { path: path.clone() });
3437			$nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
3438				payment_id: PaymentId([42; 32]),
3439				payment_hash: None,
3440				path: path.clone(),
3441				hold_times: Vec::new(),
3442			});
3443			let event = $receive.expect("PaymentPathSuccessful not handled within deadline");
3444			match event {
3445				Event::PaymentPathSuccessful { .. } => {},
3446				_ => panic!("Unexpected event"),
3447			}
3448
3449			$nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
3450			$nodes[0].node.push_pending_event(Event::ProbeSuccessful {
3451				payment_id: PaymentId([42; 32]),
3452				payment_hash: PaymentHash([42; 32]),
3453				path: path.clone(),
3454			});
3455			let event = $receive.expect("ProbeSuccessful not handled within deadline");
3456			match event {
3457				Event::ProbeSuccessful  { .. } => {},
3458				_ => panic!("Unexpected event"),
3459			}
3460
3461			$nodes[0].scorer.write_lock().expect(TestResult::ProbeFailure { path: path.clone() });
3462			$nodes[0].node.push_pending_event(Event::ProbeFailed {
3463				payment_id: PaymentId([42; 32]),
3464				payment_hash: PaymentHash([42; 32]),
3465				path,
3466				short_channel_id: Some(scored_scid),
3467			});
3468			let event = $receive.expect("ProbeFailure not handled within deadline");
3469			match event {
3470				Event::ProbeFailed { .. } => {},
3471				_ => panic!("Unexpected event"),
3472			}
3473		}
3474	}
3475
3476	#[test]
3477	fn test_payment_path_scoring() {
3478		let (sender, receiver) = std::sync::mpsc::sync_channel(1);
3479		let event_handler = move |event: Event| {
3480			match event {
3481				Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
3482				Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
3483				Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
3484				Event::ProbeFailed { .. } => sender.send(event).unwrap(),
3485				_ => panic!("Unexpected event: {:?}", event),
3486			}
3487			Ok(())
3488		};
3489
3490		let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
3491		let data_dir = nodes[0].kv_store.get_data_dir();
3492		let persister = Arc::new(Persister::new(data_dir));
3493		let bg_processor = BackgroundProcessor::start(
3494			persister,
3495			event_handler,
3496			Arc::clone(&nodes[0].chain_monitor),
3497			Arc::clone(&nodes[0].node),
3498			Some(Arc::clone(&nodes[0].messenger)),
3499			nodes[0].no_gossip_sync(),
3500			Arc::clone(&nodes[0].peer_manager),
3501			Some(Arc::clone(&nodes[0].liquidity_manager)),
3502			Some(Arc::clone(&nodes[0].sweeper)),
3503			Arc::clone(&nodes[0].logger),
3504			Some(Arc::clone(&nodes[0].scorer)),
3505		);
3506
3507		do_test_payment_path_scoring!(nodes, receiver.recv_timeout(EVENT_DEADLINE));
3508
3509		if !std::thread::panicking() {
3510			bg_processor.stop().unwrap();
3511		}
3512
3513		let log_entries = nodes[0].logger.lines.lock().unwrap();
3514		let expected_log = "Persisting scorer after update".to_string();
3515		assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5);
3516	}
3517
3518	#[tokio::test]
3519	async fn test_payment_path_scoring_async() {
3520		let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
3521		let event_handler = move |event: Event| {
3522			let sender_ref = sender.clone();
3523			async move {
3524				match event {
3525					Event::PaymentPathFailed { .. } => sender_ref.send(event).await.unwrap(),
3526					Event::PaymentPathSuccessful { .. } => sender_ref.send(event).await.unwrap(),
3527					Event::ProbeSuccessful { .. } => sender_ref.send(event).await.unwrap(),
3528					Event::ProbeFailed { .. } => sender_ref.send(event).await.unwrap(),
3529					_ => panic!("Unexpected event: {:?}", event),
3530				}
3531				Ok(())
3532			}
3533		};
3534
3535		let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
3536		let data_dir = nodes[0].kv_store.get_data_dir();
3537		let kv_store_sync = Arc::new(Persister::new(data_dir));
3538		let kv_store = KVStoreSyncWrapper(kv_store_sync);
3539
3540		let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
3541
3542		// Yes, you can unsafe { turn off the borrow checker }
3543		let lm_async: &'static LiquidityManager<_, _, _, _, _, _, _> = unsafe {
3544			&*(nodes[0].liquidity_manager.get_lm_async()
3545				as *const LiquidityManager<_, _, _, _, _, _, _>)
3546				as &'static LiquidityManager<_, _, _, _, _, _, _>
3547		};
3548		let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
3549			&*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
3550				as &'static OutputSweeper<_, _, _, _, _, _, _>
3551		};
3552
3553		let bp_future = super::process_events_async(
3554			kv_store,
3555			event_handler,
3556			Arc::clone(&nodes[0].chain_monitor),
3557			Arc::clone(&nodes[0].node),
3558			Some(Arc::clone(&nodes[0].messenger)),
3559			nodes[0].no_gossip_sync(),
3560			Arc::clone(&nodes[0].peer_manager),
3561			Some(lm_async),
3562			Some(sweeper_async),
3563			Arc::clone(&nodes[0].logger),
3564			Some(Arc::clone(&nodes[0].scorer)),
3565			move |dur: Duration| {
3566				let mut exit_receiver = exit_receiver.clone();
3567				Box::pin(async move {
3568					tokio::select! {
3569						_ = tokio::time::sleep(dur) => false,
3570						_ = exit_receiver.changed() => true,
3571					}
3572				})
3573			},
3574			false,
3575			|| Some(Duration::ZERO),
3576		);
3577		let t1 = tokio::spawn(bp_future);
3578		let t2 = tokio::spawn(async move {
3579			do_test_payment_path_scoring!(nodes, receiver.recv().await);
3580			exit_sender.send(()).unwrap();
3581
3582			let log_entries = nodes[0].logger.lines.lock().unwrap();
3583			let expected_log = "Persisting scorer after update".to_string();
3584			assert_eq!(
3585				*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(),
3586				5
3587			);
3588		});
3589
3590		let (r1, r2) = tokio::join!(t1, t2);
3591		r1.unwrap().unwrap();
3592		r2.unwrap()
3593	}
3594
3595	#[tokio::test]
3596	#[cfg(not(c_bindings))]
3597	async fn test_no_consts() {
3598		// Compile-test the NO_* constants can be used.
3599		let (_, nodes) = create_nodes(1, "test_no_consts");
3600		let bg_processor = BackgroundProcessor::start(
3601			Arc::clone(&nodes[0].kv_store),
3602			move |_: Event| Ok(()),
3603			Arc::clone(&nodes[0].chain_monitor),
3604			Arc::clone(&nodes[0].node),
3605			crate::NO_ONION_MESSENGER,
3606			nodes[0].no_gossip_sync(),
3607			Arc::clone(&nodes[0].peer_manager),
3608			crate::NO_LIQUIDITY_MANAGER_SYNC,
3609			Some(Arc::clone(&nodes[0].sweeper)),
3610			Arc::clone(&nodes[0].logger),
3611			Some(Arc::clone(&nodes[0].scorer)),
3612		);
3613
3614		if !std::thread::panicking() {
3615			bg_processor.stop().unwrap();
3616		}
3617
3618		let kv_store = KVStoreSyncWrapper(Arc::clone(&nodes[0].kv_store));
3619		let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
3620		let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
3621			&*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
3622				as &'static OutputSweeper<_, _, _, _, _, _, _>
3623		};
3624		let bp_future = super::process_events_async(
3625			kv_store,
3626			move |_: Event| async move { Ok(()) },
3627			Arc::clone(&nodes[0].chain_monitor),
3628			Arc::clone(&nodes[0].node),
3629			crate::NO_ONION_MESSENGER,
3630			nodes[0].no_gossip_sync(),
3631			Arc::clone(&nodes[0].peer_manager),
3632			crate::NO_LIQUIDITY_MANAGER,
3633			Some(sweeper_async),
3634			Arc::clone(&nodes[0].logger),
3635			Some(Arc::clone(&nodes[0].scorer)),
3636			move |dur: Duration| {
3637				let mut exit_receiver = exit_receiver.clone();
3638				Box::pin(async move {
3639					tokio::select! {
3640						_ = tokio::time::sleep(dur) => false,
3641						_ = exit_receiver.changed() => true,
3642					}
3643				})
3644			},
3645			false,
3646			|| Some(Duration::ZERO),
3647		);
3648		let t1 = tokio::spawn(bp_future);
3649		exit_sender.send(()).unwrap();
3650		t1.await.unwrap().unwrap();
3651	}
3652}